In [1]:
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.types as T
import pyspark.sql.functions as F

import matplotlib.pyplot as plt
import seaborn as sns
import random

In [2]:
spark: SparkSession = SparkSession.builder.appName("Clean-Accident-{}".format(datetime.today())).master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/10 18:06:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
URL="./data/raw/US_Accidents_March23.csv"
df = spark.read.csv(URL, header=True, inferSchema=True)

                                                                                

In [5]:
df.show(10)

24/12/10 14:42:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+-------+--------+-------------------+-------------------+---------+------------------+--------+---------+------------------+--------------------+---------------+----------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+--------------+
|       ID| Source|Severity|         Start_Time|           End_Time|Start_Lat|         Start_Lng| End_Lat|  End_Lng|      Distance(mi)|         Description|         Street|      City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_

In [5]:
print(f"Records: {df.count()}\nColumns: {len(df.columns)}")



Records: 4325632
Columns: 47


                                                                                

In [6]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

In [7]:
df.describe().show()



+-------+---------+-------+------------------+-----------------+-----------------+-----------------+------------------+------------------+--------------------+------------------+----------+---------+-------+------------------+-------+----------+------------+------------------+------------------+------------------+------------------+-----------------+--------------+-----------------+--------------------+------------------+--------------+--------------+-----------------+---------------------+--------------+
|summary|       ID| Source|          Severity|        Start_Lat|        Start_Lng|          End_Lat|           End_Lng|      Distance(mi)|         Description|            Street|      City|   County|  State|           Zipcode|Country|  Timezone|Airport_Code|    Temperature(F)|     Wind_Chill(F)|       Humidity(%)|      Pressure(in)|   Visibility(mi)|Wind_Direction|  Wind_Speed(mph)|   Precipitation(in)| Weather_Condition|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twi

                                                                                

In [8]:
# Check for missing values ​​on each column
df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()



+---+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+--------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twiligh

                                                                                

In [4]:
# Handle column Street
df = df.na.drop(subset=["Street"])

In [5]:
df = df.na.drop(subset=[
    "City", 
    "Zipcode", 
    "Timezone", 
    "Airport_Code", 
    "Weather_Timestamp"
])

In [6]:
# Fill NA for columns
df = df.fillna({
    "Temperature(F)": df.select(F.median(F.when(F.col("Temperature(F)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Wind_Chill(F)": df.select(F.median(F.when(F.col("Wind_Chill(F)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Humidity(%)": df.select(F.avg(F.when(F.col("Humidity(%)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Pressure(in)": df.select(F.avg(F.when(F.col("Pressure(in)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Visibility(mi)": df.select(F.avg(F.when(F.col("Visibility(mi)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Wind_Direction": df.select(F.mode(F.when(F.col("Wind_Direction").isNull(), 1).otherwise(0))).collect()[0][0],
    "Wind_Speed(mph)": df.select(F.avg(F.when(F.col("Wind_Speed(mph)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Precipitation(in)": df.select(F.avg(F.when(F.col("Precipitation(in)").isNull(), 1).otherwise(0))).collect()[0][0],
    "Weather_Condition": df.select(F.mode(F.when(F.col("Weather_Condition").isNull(), 1).otherwise(0))).collect()[0][0],
})

                                                                                

In [7]:
df = df.na.drop(subset=[
    "Sunrise_Sunset",
    "Civil_Twilight", 
    "Nautical_Twilight", 
    "Astronomical_Twilight", 
])

In [8]:
df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

24/12/10 18:09:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+---+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+--------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twiligh

                                                                                

In [9]:
print(f"Records: {df.count()}\nColumns: {len(df.columns)}")



Records: 4218038
Columns: 47


                                                                                

In [11]:
df.coalesce(1).write.csv("/home/longnguyen/Documents/Coding/Fresher-Data-Engineer/DATA-WAREHOUSE-ACCIDENT-US-2016-2023/data/cleaned/US_Accidents_March23_Cleaned.csv", header=True)

                                                                                