In [1]:
from pyspark.sql import SparkSession, Window

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session
spark = (SparkSession.builder.appName("DataProcessingApp")
         .config("spark.executor.memory", "8g")
         .config("spark.driver.memory", "8g")
         .config("spark.driver.maxResultSize", "4g")
         .getOrCreate())
spark.sparkContext.setLogLevel("DEBUG")

# Read the CSV file into a DataFrame
df = spark.read.csv('../data/US_Accidents_March23.csv', header=True, inferSchema=True)
df.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Ameni

In [2]:
from pyspark.sql.functions import row_number

accident_window = Window.orderBy('ID')
df = df.withColumn('ID', row_number().over(accident_window))
df = df.drop('Source', 'Zipcode', 'Timezone','Airport_Code', 'Country')
df.drop_duplicates()
df.show(5)

+---+--------+-------------------+-------------------+------------------+-------------------+-------+-------+------------+--------------------+--------------+---------------+----------+-----+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|         Start_Time|           End_Time|         Start_Lat|          Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|        Street|           City|    County|State|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station| Stop|Traffic_Calming|Traffic_Sig

In [3]:
df = df.dropna(subset=['city','street'])
df = df.filter((df['Pressure(in)'] != 0) | (df['Visibility(mi)'] != 0))
df.show(5)

+---+--------+-------------------+-------------------+------------------+-------------------+-------+-------+------------+--------------------+--------------+---------------+----------+-----+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|         Start_Time|           End_Time|         Start_Lat|          Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|        Street|           City|    County|State|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station| Stop|Traffic_Calming|Traffic_Sig

In [4]:
from pyspark.sql.functions import when, col

df = df.withColumn("Weather_Condition", 
    when(col("Weather_Condition").rlike("Thunder|T-Storm"), "Thunderstorm")
    .when(col("Weather_Condition").rlike("Snow|Sleet|Wintry"), "Snow")
    .when(col("Weather_Condition").rlike("Rain|Drizzle|Shower"), "Rain")
    .when(col("Weather_Condition").rlike("Wind|Squalls"), "Windy")
    .when(col("Weather_Condition").rlike("Hail|Pellets"), "Hail")
    .when(col("Weather_Condition").rlike("Fair"), "Clear")
    .when(col("Weather_Condition").rlike("Cloud|Overcast"), "Cloudy")
    .when(col("Weather_Condition").rlike("Mist|Haze|Fog"), "Fog")
    .when(col("Weather_Condition").rlike("Sand|Dust"), "Sand")
    .when(col("Weather_Condition").rlike("Smoke|Volcanic Ash"), "Smoke")
    .when(col("Weather_Condition").rlike("N/A Precipitation"), None)
    .otherwise(col("Weather_Condition"))
)

df.select("Weather_Condition").distinct().show()

+-----------------+
|Weather_Condition|
+-----------------+
|     Thunderstorm|
|           Cloudy|
|              Fog|
|            Clear|
|            Smoke|
|             Sand|
|             Hail|
|            Windy|
|             Rain|
|             Snow|
|          Tornado|
|             NULL|
+-----------------+



In [5]:
df.select("Wind_Direction").distinct().show()

+--------------+
|Wind_Direction|
+--------------+
|           SSE|
|            SW|
|            NW|
|          Calm|
|             E|
|           WSW|
|           ENE|
|            NE|
|         South|
|           NNW|
|             N|
|           SSW|
|             W|
|             S|
|            SE|
|          East|
|           WNW|
|           NNE|
|          West|
|           VAR|
+--------------+
only showing top 20 rows



In [5]:
from pyspark.sql.functions import when, col

# Mapping wind directions
df = df.withColumn("Wind_Direction", 
    when(col("Wind_Direction") == "CALM", "Calm")
    .when(col("Wind_Direction") == "VAR", "Variable")
    .when(col("Wind_Direction") == "East", "E")
    .when(col("Wind_Direction") == "North", "N")
    .when(col("Wind_Direction") == "South", "S")
    .when(col("Wind_Direction") == "West", "W")
    .when(col("Wind_Direction").like("S_%_%"), "S")
    .when(col("Wind_Direction").like("W_%_%"), "W")
    .when(col("Wind_Direction").like("N_%_%"), "N")
    .when(col("Wind_Direction").like("E_%_%"), "E")
    .otherwise(col("Wind_Direction"))
)

In [8]:

df.select("Wind_Direction").distinct().show()

+--------------+
|Wind_Direction|
+--------------+
|            SW|
|            NW|
|          Calm|
|             E|
|            NE|
|             N|
|             W|
|             S|
|            SE|
|      Variable|
|          NULL|
+--------------+



In [7]:
from pyspark.sql.functions import count

df.select([count(when(col(c).isNull(), 1)).alias(f"{c}_null_count") for c in df.columns]).show()

+-------------+-------------------+---------------------+-------------------+--------------------+--------------------+------------------+------------------+-----------------------+----------------------+-----------------+---------------+-----------------+----------------+----------------------------+-------------------------+------------------------+----------------------+-----------------------+-------------------------+-------------------------+--------------------------+----------------------------+----------------------------+------------------+---------------+-------------------+-------------------+-------------------+------------------+------------------+---------------------+------------------+---------------+--------------------------+-------------------------+-----------------------+-------------------------+-------------------------+----------------------------+--------------------------------+
|ID_null_count|Severity_null_count|Start_Time_null_count|End_Time_null_count|Star

In [6]:
df.toPandas().to_csv('../data/US_Accidents_March23_cleaned.csv', index=False)

    ### To Data Warehouse

In [9]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number

# Dimension Table: Location_Dim
location_dim = df.select(
    col("Street"),
    col("City"),
    col("County"),
    col("State"),
).distinct()

window_location = Window.orderBy("Street", "City", "County", "State")
location_dim = location_dim.withColumn(
    'Location_ID',
    row_number().over(window_location)
)

# Weather_Condition_Dim
weather_condition_dim = df.select(
    col("Weather_Condition")
).distinct()

weather_condition_window = Window.orderBy("Weather_Condition")
weather_condition_dim = weather_condition_dim.withColumn(
    'Weather_Condition_ID',
    row_number().over(weather_condition_window)
)

# Wind_Direction_Dim
wind_direction_dim = df.select(
    col("Wind_Direction")
).distinct()

wind_direction_window = Window.orderBy("Wind_Direction")
wind_direction_dim = wind_direction_dim.withColumn(
    'Wind_Direction_ID',
    row_number().over(wind_direction_window)
)

# Environment_Dim
environment_dim = df.select(
    col("Amenity"),
    col("Bump"),
    col("Crossing"),
    col("Give_Way"),
    col("Junction"),
    col("No_Exit"),
    col("Railway"),
    col("Roundabout"),
    col("Station"),
    col("Stop"),
    col("Traffic_Calming"),
    col("Traffic_Signal"),
    col("Turning_Loop")
).distinct()

environment_window = Window.orderBy(
    "Amenity", "Bump", "Crossing", "Give_Way", "Junction",
    "No_Exit", "Railway", "Roundabout", "Station", "Stop",
    "Traffic_Calming", "Traffic_Signal", "Turning_Loop"
)
environment_dim = environment_dim.withColumn(
    'Environment_ID',
    row_number().over(environment_window)
)

# Twilight_Dim
twilight_dim = df.select(
    col("Sunrise_Sunset"),
    col("Civil_Twilight"),
    col("Nautical_Twilight"),
    col("Astronomical_Twilight")
).distinct()

twilight_window = Window.orderBy(
    "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"
)
twilight_dim = twilight_dim.withColumn(
    'Twilight_ID',
    row_number().over(twilight_window)
)

In [10]:
location_dim.describe().show()

+-------+------------------+----------+---------+------+------------------+
|summary|            Street|      City|   County| State|       Location_ID|
+-------+------------------+----------+---------+------+------------------+
|  count|            624774|    624774|   624774|624774|            624774|
|   mean|55.888888888888886|      NULL|     NULL|  NULL|          312387.5|
| stddev|107.16611014401285|      NULL|     NULL|  NULL|180356.86287884915|
|    min|         1 1/2 Ave|Aaronsburg|Abbeville|    AL|                 1|
|    max|  william Carey Dr|   Zwingle|  Ziebach|    WY|            624774|
+-------+------------------+----------+---------+------+------------------+



In [11]:
accident_fact = (df.join(location_dim, on=[
    df.Street == location_dim.Street,
    df.City == location_dim.City,
    df.County == location_dim.County,
    df.State == location_dim.State,
]).join(weather_condition_dim, on=[
    df.Weather_Condition == weather_condition_dim.Weather_Condition,
]).join(wind_direction_dim, on=[
        df.Wind_Direction == wind_direction_dim.Wind_Direction,
]).join(environment_dim, on=[
    df.Amenity == environment_dim.Amenity,
    df.Bump == environment_dim.Bump,
    df.Crossing == environment_dim.Crossing,
    df.Give_Way == environment_dim.Give_Way,
    df.Junction == environment_dim.Junction,
    df.No_Exit == environment_dim.No_Exit,
    df.Railway == environment_dim.Railway,
    df.Roundabout == environment_dim.Roundabout,
    df.Station == environment_dim.Station,
    df.Stop == environment_dim.Stop,
    df.Traffic_Calming == environment_dim.Traffic_Calming,
    df.Traffic_Signal == environment_dim.Traffic_Signal,
    df.Turning_Loop == environment_dim.Turning_Loop
]).join(twilight_dim, on=[
    df.Sunrise_Sunset == twilight_dim.Sunrise_Sunset,
    df.Civil_Twilight == twilight_dim.Civil_Twilight,
    df.Nautical_Twilight == twilight_dim.Nautical_Twilight,
    df.Astronomical_Twilight == twilight_dim.Astronomical_Twilight
]).select(
    col("ID").alias("Accident_ID"),
    col("Severity"),
    col("Start_Time"),
    col("End_Time"),
    col('Start_Lat'),
    col('Start_Lng'),
    col('End_Lat'),
    col('End_Lng'),
    col("Distance(mi)").alias("Distance_mi"),
    col("Weather_Timestamp"),
    col("Temperature(F)").alias("Temperature_F"),
    col("Humidity(%)").alias("Humidity_percent"),
    col("Wind_Speed(mph)").alias("Wind_Speed_mph"),
    col("Precipitation(in)").alias("Precipitation_in"),
    col("Visibility(mi)").alias("Visibility_mi"),
    location_dim["Location_ID"],
    weather_condition_dim["Weather_Condition_ID"],
    wind_direction_dim["Wind_Direction_ID"],
    environment_dim["Environment_ID"],
    twilight_dim["Twilight_ID"]
))

In [12]:
accident_fact.show(5)

+-----------+--------+-------------------+-------------------+---------+-----------+---------+-----------+-----------+-------------------+-------------+----------------+--------------+----------------+-------------+-----------+--------------------+-----------------+--------------+-----------+
|Accident_ID|Severity|         Start_Time|           End_Time|Start_Lat|  Start_Lng|  End_Lat|    End_Lng|Distance_mi|  Weather_Timestamp|Temperature_F|Humidity_percent|Wind_Speed_mph|Precipitation_in|Visibility_mi|Location_ID|Weather_Condition_ID|Wind_Direction_ID|Environment_ID|Twilight_ID|
+-----------+--------+-------------------+-------------------+---------+-----------+---------+-----------+-----------+-------------------+-------------+----------------+--------------+----------------+-------------+-----------+--------------------+-----------------+--------------+-----------+
|    5337731|       2|2021-11-27 05:42:00|2021-11-27 07:00:22|40.301935|-109.810405|40.301952|-109.806051|      0.229|

In [15]:
# Change to Pandas DataFrame and store as CSV
location_dim.toPandas().to_csv('../data/location_dim.csv', index=False)
weather_condition_dim.toPandas().to_csv('../data/weather_condition_dim.csv', index=False)
wind_direction_dim.toPandas().to_csv('../data/wind_direction_dim.csv', index=False)
environment_dim.toPandas().to_csv('../data/environment_dim.csv', index=False)
twilight_dim.toPandas().to_csv('../data/twilight_dim.csv', index=False)

In [16]:
# Change to Pandas DataFrame and store as CSV
accident_fact.toPandas().to_csv('../data/accident_fact.csv', index=False)