> ## Download and Preprocess Weather data

We will download the hourly weather datasets in New York City in training and testing periods through Visual Crossing Corporation

Link to download: [Weather dataset for training](https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/retrievebulkdataset?&key=JA3CCWYXBJX45H9RVREF562LW&taskId=b97034cf0db66fc29b340c0f8ccfa6a7&zip=false)

[Weather dataset for testing](https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/retrievebulkdataset?&key=JA3CCWYXBJX45H9RVREF562LW&taskId=86f0f4cffd8ebd6e913d6e20c46bf1cd&zip=false)

Then we will select relevant weather features that might affect road conditions and perform basic preprocessing steps on the weather datasets

> Import libraries and functions

In [18]:
%run ../scripts/'download fix schema.py'
%run ../scripts/'preprocess.py'
from pyspark.sql import SparkSession,  functions as F

In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Weather")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/08/25 11:44:26 WARN Utils: Your hostname, LAPTOP-LVDQD9N6 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/25 11:44:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 11:44:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


> Download weather datasets

In [14]:
# Download weather dataset from Dec 2022 to May 2023 for training data
download_weather('train')

In [15]:
# Download weather dataset from Jan 2024 to March 2024 for testing data
download_weather('test')

> Read weather datasets

In [13]:
# Read weather for training set
train_weather = spark.read.csv("../data/landing/train_weather.csv", header=True, inferSchema=True)
train_weather.show()

+-------------------+----+----+---------+--------+---------+----------+----------------+-----------------+
|           datetime|temp|snow|snowdepth|windgust|windspeed|visibility|      conditions|             icon|
+-------------------+----+----+---------+--------+---------+----------+----------------+-----------------+
|2022-12-01 00:00:00| 5.2| 0.0|      0.0|    42.9|     24.8|      16.0|           Clear|      clear-night|
|2022-12-01 01:00:00| 4.5| 0.0|      0.0|    40.8|     26.4|      16.0|        Overcast|           cloudy|
|2022-12-01 02:00:00| 3.6| 0.0|      0.0|    36.3|     21.3|      16.0|           Clear|      clear-night|
|2022-12-01 03:00:00| 3.1| 0.0|      0.0|    34.5|     18.2|      16.0|           Clear|      clear-night|
|2022-12-01 04:00:00| 3.0| 0.0|      0.0|    37.8|     18.7|      16.0|           Clear|      clear-night|
|2022-12-01 05:00:00| 3.0| 0.0|      0.0|    38.8|     17.6|      16.0|           Clear|      clear-night|
|2022-12-01 06:00:00| 3.0| 0.0|      

In [14]:
# Read weather for testing set
test_weather = spark.read.csv("../data/landing/test_weather.csv", header=True, inferSchema=True)
test_weather.show()

+-------------------+----+----+---------+--------+---------+----------+----------------+-------------------+
|           datetime|temp|snow|snowdepth|windgust|windspeed|visibility|      conditions|               icon|
+-------------------+----+----+---------+--------+---------+----------+----------------+-------------------+
|2024-01-01 00:00:00| 5.7| 0.0|      0.0|    14.8|     14.5|      16.0|        Overcast|             cloudy|
|2024-01-01 01:00:00| 5.7| 0.0|      0.0|    18.4|     14.5|      16.0|        Overcast|             cloudy|
|2024-01-01 02:00:00| 5.7| 0.0|      0.0|    16.6|     11.1|      16.0|        Overcast|             cloudy|
|2024-01-01 03:00:00| 5.0| 0.0|      0.0|    14.8|     10.7|      16.0|        Overcast|             cloudy|
|2024-01-01 04:00:00| 5.0| 0.0|      0.0|    13.0|      7.5|      16.0|        Overcast|             cloudy|
|2024-01-01 05:00:00| 5.0| 0.0|      0.0|     7.6|      7.2|      16.0|        Overcast|             cloudy|
|2024-01-01 06:00:0

In [5]:
# Count number of instances 
print("Number of instances in training weather data:", train_weather.count())
print("Number of instances in testing weather data:", test_weather.count())

Number of instances in training weather data: 4367
Number of instances in testing weather data: 2183


> Preprocess weather datasets

We will merge information from snow and snowdepth into one column. Then flag rainy days based on the 'conditions' column and only select relevant features that are believed to have an effect on road conditions, such as snow, wind or rain.

In [15]:
# Apply these preprocessing steps for training weather dataset
train_weather = preprocess_weather(train_weather)
train_weather.show()

+-------------------+----+---------+--------+----------+----+
|           datetime|temp|snowdepth|windgust|visibility|rain|
+-------------------+----+---------+--------+----------+----+
|2022-12-01 00:00:00| 5.2|      0.0|    42.9|      16.0|   0|
|2022-12-01 01:00:00| 4.5|      0.0|    40.8|      16.0|   0|
|2022-12-01 02:00:00| 3.6|      0.0|    36.3|      16.0|   0|
|2022-12-01 03:00:00| 3.1|      0.0|    34.5|      16.0|   0|
|2022-12-01 04:00:00| 3.0|      0.0|    37.8|      16.0|   0|
|2022-12-01 05:00:00| 3.0|      0.0|    38.8|      16.0|   0|
|2022-12-01 06:00:00| 3.0|      0.0|    44.3|      16.0|   0|
|2022-12-01 07:00:00| 2.5|      0.0|    39.2|      16.0|   0|
|2022-12-01 08:00:00| 3.0|      0.0|    35.3|      16.0|   0|
|2022-12-01 09:00:00| 3.4|      0.0|    38.9|      16.0|   0|
|2022-12-01 10:00:00| 4.2|      0.0|    46.8|      16.0|   0|
|2022-12-01 11:00:00| 5.0|      0.0|    35.3|      16.0|   0|
|2022-12-01 12:00:00| 4.5|      0.0|    45.6|      16.0|   0|
|2022-12

In [16]:
# Preprocess testing weather dataset
test_weather = preprocess_weather(test_weather)
test_weather.show()

+-------------------+----+---------+--------+----------+----+
|           datetime|temp|snowdepth|windgust|visibility|rain|
+-------------------+----+---------+--------+----------+----+
|2024-01-01 00:00:00| 5.7|      0.0|    14.8|      16.0|   0|
|2024-01-01 01:00:00| 5.7|      0.0|    18.4|      16.0|   0|
|2024-01-01 02:00:00| 5.7|      0.0|    16.6|      16.0|   0|
|2024-01-01 03:00:00| 5.0|      0.0|    14.8|      16.0|   0|
|2024-01-01 04:00:00| 5.0|      0.0|    13.0|      16.0|   0|
|2024-01-01 05:00:00| 5.0|      0.0|     7.6|      16.0|   0|
|2024-01-01 06:00:00| 5.0|      0.0|     7.6|      16.0|   0|
|2024-01-01 07:00:00| 4.3|      0.0|     7.6|      15.8|   1|
|2024-01-01 08:00:00| 4.9|      0.0|     7.6|      15.9|   1|
|2024-01-01 09:00:00| 5.0|      0.0|     7.6|      16.0|   1|
|2024-01-01 10:00:00| 5.6|      0.0|     7.6|      16.0|   0|
|2024-01-01 11:00:00| 6.7|      0.0|    60.5|      16.0|   0|
|2024-01-01 12:00:00| 7.8|      0.0|     9.4|      16.0|   0|
|2024-01

In [19]:
# Check weather datasets after feature selection
check_weather_data(train_weather)

Missing values:
+--------+----+---------+--------+----------+----+
|datetime|temp|snowdepth|windgust|visibility|rain|
+--------+----+---------+--------+----------+----+
|       0|   0|        0|       0|         0|   0|
+--------+----+---------+--------+----------+----+

datetime:
	Latest: 2023-06-01 09:00:00 
	Earliest: 2022-12-01 11:00:00
Descriptive statistics
+-------+-----------------+-------------------+------------------+------------------+
|summary|             temp|          snowdepth|          windgust|        visibility|
+-------+-----------------+-------------------+------------------+------------------+
|  count|             4367|               4367|              4367|              4367|
|   mean|8.823379894664528|0.04785894206549119| 26.21458667277305|14.950080146553736|
| stddev|6.810479094407608| 0.5053668635435273|13.807370388910412|3.0179072161306997|
|    min|            -14.2|                0.0|               2.5|               0.7|
|    max|             31.9|     

In [20]:
check_weather_data(test_weather)

Missing values:
+--------+----+---------+--------+----------+----+
|datetime|temp|snowdepth|windgust|visibility|rain|
+--------+----+---------+--------+----------+----+
|       1|   1|        1|       1|         1|   0|
+--------+----+---------+--------+----------+----+

datetime:
	Latest: 2024-04-01 10:00:00 
	Earliest: 2024-01-01 11:00:00
Descriptive statistics
+-------+-----------------+------------------+------------------+------------------+
|summary|             temp|         snowdepth|          windgust|        visibility|
+-------+-----------------+------------------+------------------+------------------+
|  count|             2183|              2183|              2183|              2183|
|   mean|5.300229042601909|0.5775080164910672|28.600137425561154|14.871598717361437|
| stddev|4.555539223777588|1.9437914564630978| 15.61965127456251| 3.028414104646071|
|    min|             -7.2|               0.0|               3.6|               0.6|
|    max|             22.2|            

We detect one row in testing weather dataset with missing value. Because this is a very small instance in this dataset, we decide the remove this row. Both training and testing weather dataset is filtered to be within the correct timeframe.

In [21]:
# Filter and remove missing values
train_weather = filter_weather('train', train_weather)
test_weather = filter_weather('test', test_weather)

> Save processed weather datasets

In [23]:
# Save processed training weather dataset as parquet in the curated layer
train_weather \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/train_weather.parquet')

                                                                                

In [24]:
# Save processed testing weather dataset as parquet in the curated layer
test_weather \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/test_weather.parquet')