In [0]:
# Import
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import (
    col,
    date_format,
    greatest,
    isnan,
    when,
    count
)

In [0]:
acc_storage_key=dbutils.secrets.get(
        scope="Toronto_key_vault_secret", key="torontodatalakestorage-key"
    )
spark.conf.set("fs.azure.account.key.torontodatalakestorage.dfs.core.windows.net",acc_storage_key)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.azure.account.key.torontodatalakestorage.dfs.core.windows.net",acc_storage_key)

## Read data

In [0]:
# Read Toronto weather csv file

# Define the file path
toronto_weather_path = f'abfss://toronto-data@torontodatalakestorage.dfs.core.windows.net/raw/toronto_weather_{datetime.today().strftime("%Y%m%d")}.csv'

# Read the CSV file into the RDD and skip the first two lines
rdd = spark.sparkContext.textFile(toronto_weather_path).zipWithIndex().filter(lambda x: x[1] > 2).map(lambda x: x[0])

# Convert the RDD to a DataFrame without the headers
toronto_weather_raw = spark.read.csv(rdd, header=True)

# Display the DataFrame
toronto_weather_raw.show()

+----------+-----------------------+-----------------------+-----------------------+------------------------+-----------------+----------------+---------------------+---------------------+----------------------+-------------+-----------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------------------+-------------------------------+
|      time|weather_code (wmo code)|temperature_2m_max (°C)|temperature_2m_min (°C)|temperature_2m_mean (°C)|sunrise (iso8601)|sunset (iso8601)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|snowfall_sum (cm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation_sum (MJ/m²)|et0_fao_evapotranspiration (mm)|
+----------+-----------------------+-----------------------+-----------------------+------------------------+-----------------+----------------+---------------------+--

In [0]:
toronto_weather_raw.printSchema()

root
 |-- time: string (nullable = true)
 |-- weather_code (wmo code): string (nullable = true)
 |-- temperature_2m_max (°C): string (nullable = true)
 |-- temperature_2m_min (°C): string (nullable = true)
 |-- temperature_2m_mean (°C): string (nullable = true)
 |-- sunrise (iso8601): string (nullable = true)
 |-- sunset (iso8601): string (nullable = true)
 |-- daylight_duration (s): string (nullable = true)
 |-- sunshine_duration (s): string (nullable = true)
 |-- precipitation_sum (mm): string (nullable = true)
 |-- rain_sum (mm): string (nullable = true)
 |-- snowfall_sum (cm): string (nullable = true)
 |-- precipitation_hours (h): string (nullable = true)
 |-- wind_speed_10m_max (km/h): string (nullable = true)
 |-- wind_gusts_10m_max (km/h): string (nullable = true)
 |-- wind_direction_10m_dominant (°): string (nullable = true)
 |-- shortwave_radiation_sum (MJ/m²): string (nullable = true)
 |-- et0_fao_evapotranspiration (mm): string (nullable = true)



In [0]:
toronto_weather_raw.summary().show()

+-------+----------+-----------------------+-----------------------+-----------------------+------------------------+-----------------+----------------+---------------------+---------------------+----------------------+-------------+-----------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------------------+-------------------------------+
|summary|      time|weather_code (wmo code)|temperature_2m_max (°C)|temperature_2m_min (°C)|temperature_2m_mean (°C)|sunrise (iso8601)|sunset (iso8601)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|snowfall_sum (cm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation_sum (MJ/m²)|et0_fao_evapotranspiration (mm)|
+-------+----------+-----------------------+-----------------------+-----------------------+------------------------+-----------------+----------------+

In [0]:
# Read Toronto traffic collisions json files

# Define the file path
toronto_collisions_path = f'abfss://toronto-data@torontodatalakestorage.dfs.core.windows.net/raw/toronto_traffic_collisions_{datetime.today().strftime("%Y%m%d")}/*.json'

df_temp = spark.read.json(toronto_collisions_path, multiLine=True)
# Get acutal data from json
toronto_collisions_raw = df_temp.selectExpr("inline(features.attributes)")
toronto_collisions_raw.show()

+----------+-------+--------+---------------+----------+--------------+--------+-----------------+------------------+------------------+----------+--------------------+--------+-------------+-------+--------+---------+--------+---------+-------------+----------+
|AUTOMOBILE|BICYCLE|DIVISION|EVENT_UNIQUE_ID|FATALITIES|FTR_COLLISIONS|HOOD_158|INJURY_COLLISIONS|         LAT_WGS84|        LONG_WGS84|MOTORCYCLE|   NEIGHBOURHOOD_158|OBJECTID|     OCC_DATE|OCC_DOW|OCC_HOUR|OCC_MONTH|OCC_YEAR|PASSENGER|PD_COLLISIONS|PEDESTRIAN|
+----------+-------+--------+---------------+----------+--------------+--------+-----------------+------------------+------------------+----------+--------------------+--------+-------------+-------+--------+---------+--------+---------+-------------+----------+
|       YES|    YES|     D55| GO-20142815344|         0|            NO|     068|              YES| 43.67619633749256|-79.35886348045678|        NO|North Riverdale (68)|   42001|1409461200000| Sunday|      18|   

In [0]:
toronto_collisions_raw.printSchema()

root
 |-- AUTOMOBILE: string (nullable = true)
 |-- BICYCLE: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- EVENT_UNIQUE_ID: string (nullable = true)
 |-- FATALITIES: long (nullable = true)
 |-- FTR_COLLISIONS: string (nullable = true)
 |-- HOOD_158: string (nullable = true)
 |-- INJURY_COLLISIONS: string (nullable = true)
 |-- LAT_WGS84: double (nullable = true)
 |-- LONG_WGS84: double (nullable = true)
 |-- MOTORCYCLE: string (nullable = true)
 |-- NEIGHBOURHOOD_158: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- OCC_DATE: long (nullable = true)
 |-- OCC_DOW: string (nullable = true)
 |-- OCC_HOUR: string (nullable = true)
 |-- OCC_MONTH: string (nullable = true)
 |-- OCC_YEAR: string (nullable = true)
 |-- PASSENGER: string (nullable = true)
 |-- PD_COLLISIONS: string (nullable = true)
 |-- PEDESTRIAN: string (nullable = true)



In [0]:
toronto_collisions_raw.summary().show()

+-------+----------+-------+--------+---------------+--------------------+--------------+----------------+-----------------+------------------+------------------+----------+--------------------+------------------+--------------------+---------+------------------+---------+--------+---------+-------------+----------+
|summary|AUTOMOBILE|BICYCLE|DIVISION|EVENT_UNIQUE_ID|          FATALITIES|FTR_COLLISIONS|        HOOD_158|INJURY_COLLISIONS|         LAT_WGS84|        LONG_WGS84|MOTORCYCLE|   NEIGHBOURHOOD_158|          OBJECTID|            OCC_DATE|  OCC_DOW|          OCC_HOUR|OCC_MONTH|OCC_YEAR|PASSENGER|PD_COLLISIONS|PEDESTRIAN|
+-------+----------+-------+--------+---------------+--------------------+--------------+----------------+-----------------+------------------+------------------+----------+--------------------+------------------+--------------------+---------+------------------+---------+--------+---------+-------------+----------+
|  count|     44000|  44000|   44000|         

## Cleaning Toronto weather data

In [0]:
# remove weather code column
toronto_weather_transformed = toronto_weather_raw.drop("weather_code (wmo code)")

In [0]:
# find any NaN values in toronto_weather_table
toronto_weather_transformed.select(
    [
        count(
            when(
                col(c).contains("None")
                | col(c).contains("NULL")
                | (col(c) == "")
                | col(c).isNull()
                | isnan(c),
                c,
            )
        ).alias(c)
        for c in toronto_weather_transformed.columns
    ]
).show()

+----+-----------------------+-----------------------+------------------------+-----------------+----------------+---------------------+---------------------+----------------------+-------------+-----------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------------------+-------------------------------+
|time|temperature_2m_max (°C)|temperature_2m_min (°C)|temperature_2m_mean (°C)|sunrise (iso8601)|sunset (iso8601)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|snowfall_sum (cm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation_sum (MJ/m²)|et0_fao_evapotranspiration (mm)|
+----+-----------------------+-----------------------+------------------------+-----------------+----------------+---------------------+---------------------+----------------------+-------------+-----------------+---------------

In [0]:
# remove nan records
toronto_weather_transformed = toronto_weather_transformed.filter(
    ~greatest(*[col(c).contains("NaN") for c in toronto_weather_transformed.columns])
    == True
)

In [0]:
# remove any duplicate data about weather for specific day if exists
toronto_weather_transformed = toronto_weather_transformed.dropDuplicates(
    subset=["time"]
)

In [0]:
toronto_weather_transformed = (
    toronto_weather_transformed.withColumn("time", col("time").cast("date"))
    .withColumnRenamed("time", "date")
    .withColumn("temperature_2m_max (°C)", col("temperature_2m_max (°C)").cast("float"))
    .withColumnRenamed("temperature_2m_max (°C)", "temperature_2m_max(C)")
    .withColumn("temperature_2m_min (°C)", col("temperature_2m_min (°C)").cast("float"))
    .withColumnRenamed("temperature_2m_min (°C)", "temperature_2m_min(C)")
    .withColumn(
        "temperature_2m_mean (°C)", col("temperature_2m_mean (°C)").cast("float")
    )
    .withColumnRenamed("temperature_2m_mean (°C)", "temperature_2m_mean(C)")
    .withColumn(
        "sunrise (iso8601)",
        date_format(col("sunrise (iso8601)"), "yyyy-MM-dd HH:mm").cast("timestamp"),
    )
    .withColumnRenamed("sunrise (iso8601)", "sunrise_time")
    .withColumn(
        "sunset (iso8601)",
        date_format(col("sunset (iso8601)"), "yyyy-MM-dd HH:mm").cast("timestamp"),
    )
    .withColumnRenamed("sunset (iso8601)", "sunset_time")
    .withColumn("daylight_duration (s)", col("daylight_duration (s)").cast("float"))
    .withColumnRenamed("daylight_duration (s)", "daylight_duration(s)")
    .withColumn("sunshine_duration (s)", col("sunshine_duration (s)").cast("float"))
    .withColumnRenamed("sunshine_duration (s)", "sunshine_duration(s)")
    .withColumn("precipitation_sum (mm)", col("precipitation_sum (mm)").cast("float"))
    .withColumnRenamed("precipitation_sum (mm)", "precipitation_sum(mm)")
    .withColumn("rain_sum (mm)", col("rain_sum (mm)").cast("float"))
    .withColumnRenamed("rain_sum (mm)", "rain_sum(mm)")
    .withColumn("snowfall_sum (cm)", col("snowfall_sum (cm)").cast("float"))
    .withColumnRenamed("snowfall_sum (cm)", "snowfall_sum(cm)")
    .withColumn(
        "precipitation_hours (h)", col("precipitation_hours (h)").cast("integer")
    )
    .withColumnRenamed("precipitation_hours (h)", "precipitation_hours(h)")
    .withColumn(
        "wind_speed_10m_max (km/h)", col("wind_speed_10m_max (km/h)").cast("float")
    )
    .withColumnRenamed("wind_speed_10m_max (km/h)", "wind_speed_10m_max(km/h)")
    .withColumn(
        "wind_gusts_10m_max (km/h)", col("wind_gusts_10m_max (km/h)").cast("float")
    )
    .withColumnRenamed("wind_gusts_10m_max (km/h)", "wind_gusts_10m_max(km/h)")
    .withColumn(
        "wind_direction_10m_dominant (°)",
        col("wind_direction_10m_dominant (°)").cast("float"),
    )
    .withColumnRenamed(
        "wind_direction_10m_dominant (°)", "wind_direction_10m_dominant(degrees)"
    )
    .withColumn(
        "shortwave_radiation_sum (MJ/m²)",
        col("shortwave_radiation_sum (MJ/m²)").cast("float"),
    )
    .withColumnRenamed(
        "shortwave_radiation_sum (MJ/m²)", "shortwave_radiation_sum(MJ/m2)"
    )
    .withColumn(
        "et0_fao_evapotranspiration (mm)",
        col("et0_fao_evapotranspiration (mm)").cast("float"),
    )
    .withColumnRenamed(
        "et0_fao_evapotranspiration (mm)", "et0_fao_evapotranspiration(mm)"
    )
)

In [0]:
toronto_weather_transformed.printSchema()

root
 |-- date: date (nullable = true)
 |-- temperature_2m_max(C): float (nullable = true)
 |-- temperature_2m_min(C): float (nullable = true)
 |-- temperature_2m_mean(C): float (nullable = true)
 |-- sunrise_time: timestamp (nullable = true)
 |-- sunset_time: timestamp (nullable = true)
 |-- daylight_duration(s): float (nullable = true)
 |-- sunshine_duration(s): float (nullable = true)
 |-- precipitation_sum(mm): float (nullable = true)
 |-- rain_sum(mm): float (nullable = true)
 |-- snowfall_sum(cm): float (nullable = true)
 |-- precipitation_hours(h): integer (nullable = true)
 |-- wind_speed_10m_max(km/h): float (nullable = true)
 |-- wind_gusts_10m_max(km/h): float (nullable = true)
 |-- wind_direction_10m_dominant(degrees): float (nullable = true)
 |-- shortwave_radiation_sum(MJ/m2): float (nullable = true)
 |-- et0_fao_evapotranspiration(mm): float (nullable = true)



In [0]:
toronto_weather_transformed = toronto_weather_transformed.toDF(
    *[c.upper() for c in toronto_weather_transformed.columns]
)

In [0]:
toronto_weather_transformed = toronto_weather_transformed.sort("date", ascending=False)

In [0]:
toronto_weather_transformed.show()

+----------+---------------------+---------------------+----------------------+-------------------+-------------------+--------------------+--------------------+---------------------+------------+----------------+----------------------+------------------------+------------------------+------------------------------------+------------------------------+------------------------------+
|      DATE|TEMPERATURE_2M_MAX(C)|TEMPERATURE_2M_MIN(C)|TEMPERATURE_2M_MEAN(C)|       SUNRISE_TIME|        SUNSET_TIME|DAYLIGHT_DURATION(S)|SUNSHINE_DURATION(S)|PRECIPITATION_SUM(MM)|RAIN_SUM(MM)|SNOWFALL_SUM(CM)|PRECIPITATION_HOURS(H)|WIND_SPEED_10M_MAX(KM/H)|WIND_GUSTS_10M_MAX(KM/H)|WIND_DIRECTION_10M_DOMINANT(DEGREES)|SHORTWAVE_RADIATION_SUM(MJ/M2)|ET0_FAO_EVAPOTRANSPIRATION(MM)|
+----------+---------------------+---------------------+----------------------+-------------------+-------------------+--------------------+--------------------+---------------------+------------+----------------+---------------

## Cleaning toronto traffic collisions data

In [0]:
toronto_collisions_raw.printSchema()

root
 |-- AUTOMOBILE: string (nullable = true)
 |-- BICYCLE: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- EVENT_UNIQUE_ID: string (nullable = true)
 |-- FATALITIES: long (nullable = true)
 |-- FTR_COLLISIONS: string (nullable = true)
 |-- HOOD_158: string (nullable = true)
 |-- INJURY_COLLISIONS: string (nullable = true)
 |-- LAT_WGS84: double (nullable = true)
 |-- LONG_WGS84: double (nullable = true)
 |-- MOTORCYCLE: string (nullable = true)
 |-- NEIGHBOURHOOD_158: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- OCC_DATE: long (nullable = true)
 |-- OCC_DOW: string (nullable = true)
 |-- OCC_HOUR: string (nullable = true)
 |-- OCC_MONTH: string (nullable = true)
 |-- OCC_YEAR: string (nullable = true)
 |-- PASSENGER: string (nullable = true)
 |-- PD_COLLISIONS: string (nullable = true)
 |-- PEDESTRIAN: string (nullable = true)



In [0]:
toronto_collisions_raw.show()

+----------+-------+--------+---------------+----------+--------------+--------+-----------------+------------------+------------------+----------+--------------------+--------+-------------+-------+--------+---------+--------+---------+-------------+----------+
|AUTOMOBILE|BICYCLE|DIVISION|EVENT_UNIQUE_ID|FATALITIES|FTR_COLLISIONS|HOOD_158|INJURY_COLLISIONS|         LAT_WGS84|        LONG_WGS84|MOTORCYCLE|   NEIGHBOURHOOD_158|OBJECTID|     OCC_DATE|OCC_DOW|OCC_HOUR|OCC_MONTH|OCC_YEAR|PASSENGER|PD_COLLISIONS|PEDESTRIAN|
+----------+-------+--------+---------------+----------+--------------+--------+-----------------+------------------+------------------+----------+--------------------+--------+-------------+-------+--------+---------+--------+---------+-------------+----------+
|       YES|    YES|     D55| GO-20142815344|         0|            NO|     068|              YES| 43.67619633749256|-79.35886348045678|        NO|North Riverdale (68)|   42001|1409461200000| Sunday|      18|   

In [0]:
# remove following columns: objectID, Hood_158, occ_month, occ_year, occ_dow
toronto_collisions_transformed = toronto_collisions_raw.drop(
    *["OBJECTID", "HOOD_158", "OCC_MONTH", "OCC_YEAR", "OCC_DOW"]
)

In [0]:
# drop duplicates if exists
toronto_collisions_transformed = toronto_collisions_transformed.dropDuplicates(
    subset=["EVENT_UNIQUE_ID"]
)

In [0]:
# drop na/null records if exist in any column
toronto_collisions_transformed = toronto_collisions_transformed.dropna(how="any")

In [0]:
# check, how much records have Not specified area(NSA) or Not Required (N/R) value in any column
toronto_collisions_transformed.filter(
    greatest(
        *[col(c).isin(["NSA", "N/R"]) for c in toronto_collisions_transformed.columns]
    )
    == True
).count() / toronto_collisions_transformed.count() * 100

Out[29]: 17.945454545454545

In [0]:
# check, how much records have Not specified area(NSA) and Not Required (N/R) value in columns
toronto_collisions_transformed.filter(
    (
        greatest(
            *[(col(c).isin(["N/R"])) for c in toronto_collisions_transformed.columns]
        )
        == True
    )
    & (
        greatest(
            *[(col(c).isin(["NSA"])) for c in toronto_collisions_transformed.columns]
        )
        == True
    )
).count() / toronto_collisions_transformed.count() * 100

Out[30]: 0.14545454545454545

In [0]:
# remove these records where isn't specified area(NSA) and Not Required (N/R) value in columns
toronto_collisions_transformed = toronto_collisions_transformed.subtract(
    toronto_collisions_transformed.filter(
        (
            greatest(
                *[
                    (col(c).isin(["N/R"]))
                    for c in toronto_collisions_transformed.columns
                ]
            )
            == True
        )
        & (
            greatest(
                *[
                    (col(c).isin(["NSA"]))
                    for c in toronto_collisions_transformed.columns
                ]
            )
            == True
        )
    )
)

In [0]:
toronto_collisions_transformed = toronto_collisions_transformed.withColumn(
    "DATE_EVENT",
    date_format((col("OCC_DATE") / 1000).cast("timestamp"), "yyyy-MM-dd").cast("date"),
)

In [0]:
toronto_collisions_transformed = toronto_collisions_transformed.drop("OCC_DATE")

In [0]:
toronto_collisions_transformed = (
    toronto_collisions_transformed.withColumn(
        "FATALITIES", col("FATALITIES").cast("integer")
    )
    .withColumnRenamed("LAT_WGS84", "LATITUDE")
    .withColumnRenamed("LONG_WGS84", "LONGITUDE")
    .withColumnRenamed("NEIGHBOURHOOD_158", "NEIGHBOURHOOD_NAME")
    .withColumn("OCC_HOUR", col("OCC_HOUR").cast("integer"))
    .withColumnRenamed("OCC_HOUR", "HOUR_EVENT")
)

In [0]:
toronto_collisions_transformed = toronto_collisions_transformed.sort(
    "DATE_EVENT", ascending=False
)
toronto_collisions_transformed.show()

+----------+-------+--------+---------------+----------+--------------+-----------------+------------------+------------------+----------+--------------------+----------+---------+-------------+----------+--------------+
|AUTOMOBILE|BICYCLE|DIVISION|EVENT_UNIQUE_ID|FATALITIES|FTR_COLLISIONS|INJURY_COLLISIONS|          LATITUDE|         LONGITUDE|MOTORCYCLE|  NEIGHBOURHOOD_NAME|HOUR_EVENT|PASSENGER|PD_COLLISIONS|PEDESTRIAN|DATETIME_EVENT|
+----------+-------+--------+---------------+----------+--------------+-----------------+------------------+------------------+----------+--------------------+----------+---------+-------------+----------+--------------+
|       YES|     NO|     D33| GO-20148033870|         0|            NO|               NO|43.720780848086626| -79.3325817281569|        NO|Banbury-Don Mills...|        17|       NO|          YES|        NO|    2014-09-11|
|       YES|     NO|     D22| GO-20148033867|         0|            NO|               NO| 43.67012369771391|-79.5213

## Save Toronto data to Azure data lake storage

In [0]:
# Save toronto weather data
toronto_weather_save_path = f'abfss://toronto-data@torontodatalakestorage.dfs.core.windows.net/transformed/toronto_weather_transformed_{datetime.today().strftime("%Y%m%d")}'

toronto_weather_transformed.write.format("parquet").mode("overwrite").save(toronto_weather_save_path)

In [0]:
# Save toronto traffic collisions data
toronto_traffic_collisions_save_path = f'abfss://toronto-data@torontodatalakestorage.dfs.core.windows.net/transformed/toronto_traffic_collisions_transformed_{datetime.today().strftime("%Y%m%d")}'

toronto_collisions_transformed.write.format("parquet").mode("overwrite").save(toronto_traffic_collisions_save_path)