In [1]:
import pandas as pd
# import findspark
# findspark.init()
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import functions as sf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import os
from datetime import datetime

In [2]:
spark = (SparkSession.builder
        # .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.30")
        .config("spark.driver.memory", "6g")
        .getOrCreate())

In [3]:
# crashes = spark.read.csv("/home/hien2706/school/nam3_hk1/ds105/do_an/EDA-and-Visualization/data/traffic_crashes_crashes.csv", header=True, inferSchema=True)
# people = spark.read.csv("/home/hien2706/school/nam3_hk1/ds105/do_an/EDA-and-Visualization/data/traffic_crashes_people.csv", header=True, inferSchema=True)
# vehicles = spark.read.csv("/home/hien2706/school/nam3_hk1/ds105/do_an/EDA-and-Visualization/data/traffic_crashes_vehicles.csv", header=True, inferSchema=True)

In [3]:
crashes = spark.read.option("header", True) \
               .option("quote", '"') \
               .option("escape", '"') \
               .option("inferSchema", True) \
               .csv('/home/jovyan/data/traffic_crashes_crashes.csv')
people = spark.read.option("header", True) \
               .option("quote", '"') \
               .option("escape", '"') \
               .option("inferSchema", True) \
               .csv('/home/jovyan/data/traffic_crashes_people.csv')
vehicles = spark.read.option("header", True) \
               .option("quote", '"') \
               .option("escape", '"') \
               .option("inferSchema", True) \
               .csv('/home/jovyan/data/traffic_crashes_vehicles.csv')

In [4]:
# crashes = spark.read.csv("/home/jovyan/data/traffic_crashes_crashes.csv", header=True, inferSchema=True, quote='"')
# people = spark.read.csv("/home/jovyan/data/traffic_crashes_people.csv", header=True, inferSchema=True, quote='"')
# vehicles = spark.read.csv("/home/jovyan/data/traffic_crashes_vehicles.csv", header=True, inferSchema=True, quote='"')

In [4]:
def filter_recent_data(df, date_column):
    # Convert the date column to timestamp format and filter for the desired date range
    df = df.withColumn(date_column, sf.to_timestamp(sf.col(date_column), "M/d/yyyy h:mm:ss a"))
    filtered_df = df.filter(
        (sf.year(sf.col(date_column)) >= 2022) 
        # & (sf.to_date(sf.col(date_column)) <= sf.lit("2024-10-06"))
    )
    return filtered_df

In [5]:
def drop_columns_with_missing_data(df, threshold=0.5):
    """
    Drop columns from DataFrame where the percentage of missing values exceeds the threshold.

    Parameters:
    df (DataFrame): The input DataFrame.
    threshold (float): The threshold for missing values, default is 0.5 (50%).

    Returns:
    DataFrame: A DataFrame with columns having missing values below the threshold.
    """
    # Calculate the total number of rows in the DataFrame
    total_rows = df.count()
    
    # Get a list of columns to drop based on missing value percentage
    columns_to_drop = []
    for col_name in df.columns:
        missing_count = df.filter(
            (sf.col(col_name).isNull())  |
            (sf.col(col_name) == "NULL") | 
            (sf.col(col_name) == "UNKNOWN")).count()
        missing_percentage = missing_count / total_rows
        if missing_percentage > threshold:
            columns_to_drop.append(col_name)

    # Drop the identified columns
    return df.drop(*columns_to_drop)

In [6]:
def drop_unnecessary_columns(df, relevant_columns):
    """
    Drop columns from DataFrame that are not in the list of relevant columns.

    Parameters:
    df (DataFrame): The input DataFrame.
    relevant_columns (list): A list of columns to keep.

    Returns:
    DataFrame: A DataFrame with only relevant columns.
    """
    all_columns = df.columns
    columns_to_drop = [col for col in all_columns if col not in relevant_columns]
    return df.drop(*columns_to_drop)

In [7]:
filtered_crashes = filter_recent_data(crashes, "CRASH_DATE")
filtered_people = filter_recent_data(people, "CRASH_DATE")
filtered_vehicles = filter_recent_data(vehicles, "CRASH_DATE")

In [8]:
filtered_crashes = drop_columns_with_missing_data(df = filtered_crashes, threshold=0.6)
filtered_people = drop_columns_with_missing_data(df = filtered_people, threshold=0.6)
filtered_vehicles = drop_columns_with_missing_data(df = filtered_vehicles, threshold=0.6)


In [9]:
relevant_crashes_columns = [
    'CRASH_RECORD_ID', 'CRASH_DATE', 'POSTED_SPEED_LIMIT', 'TRAFFIC_CONTROL_DEVICE', 
    'DEVICE_CONDITION', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'FIRST_CRASH_TYPE', 
    'TRAFFICWAY_TYPE', 'ALIGNMENT', 'ROADWAY_SURFACE_COND', 'ROAD_DEFECT', 
    'CRASH_TYPE', 'DAMAGE', 'PRIM_CONTRIBUTORY_CAUSE', 'SEC_CONTRIBUTORY_CAUSE', 
    'NUM_UNITS', 'MOST_SEVERE_INJURY', 'INJURIES_TOTAL', 'INJURIES_FATAL', 
    'CRASH_HOUR', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH', 'LATITUDE', 'LONGITUDE'
]

In [10]:
relevant_vehicles_columns = [
    'CRASH_UNIT_ID',
    'CRASH_RECORD_ID',
    'CRASH_DATE',
    'UNIT_NO',
    'UNIT_TYPE',
    'VEHICLE_ID',
    'MAKE',
    'MODEL',
    'LIC_PLATE_STATE',
    'VEHICLE_YEAR',
    'VEHICLE_DEFECT',
    'VEHICLE_TYPE',
    'VEHICLE_USE',
    'TRAVEL_DIRECTION',
    'MANEUVER',
    'OCCUPANT_CNT',
    'FIRST_CONTACT_POINT'
]

In [11]:
relevant_people_columns = [
    'PERSON_ID',
    'PERSON_TYPE',
    'CRASH_RECORD_ID',
    'VEHICLE_ID',
    'CRASH_DATE',
    'CITY',
    'STATE',
    'ZIPCODE',
    'SEX',
    'AGE',
    'DRIVERS_LICENSE_STATE',
    'DRIVERS_LICENSE_CLASS',
    'SAFETY_EQUIPMENT',
    'AIRBAG_DEPLOYED',
    'EJECTION',
    'INJURY_CLASSIFICATION',
    'DRIVER_ACTION',
    'DRIVER_VISION',
    'PHYSICAL_CONDITION',
    'BAC_RESULT'
]

In [12]:
filtered_crashes = drop_unnecessary_columns(filtered_crashes, relevant_crashes_columns)
filtered_vehicles = drop_unnecessary_columns(filtered_vehicles, relevant_vehicles_columns)
filtered_people = drop_unnecessary_columns(filtered_people, relevant_people_columns)

In [13]:
filtered_crashes.count()

304907

In [14]:
filtered_people.count()

667725

In [15]:
filtered_vehicles.count()

622751

In [16]:
len(filtered_crashes.columns)

25

In [17]:
len(filtered_vehicles.columns)

17

In [18]:
len(filtered_people.columns)

19

In [19]:
len(filtered_crashes.columns) + len(filtered_vehicles.columns) + len(filtered_people.columns)

61

In [20]:
filtered_crashes.show(10, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------+-------------------+------------------+----------------------+--------------------+-----------------+----------------------+------------------------+-------------------------------+------------------+--------------------+-----------+--------------------------------+-------------+--------------------------------------+--------------------------------------------------------------------------------+---------+------------------------+--------------+--------------+----------+-----------------+-----------+------------+-------------+
|CRASH_RECORD_ID                                                                                                                 |CRASH_DATE         |POSTED_SPEED_LIMIT|TRAFFIC_CONTROL_DEVICE|DEVICE_CONDITION    |WEATHER_CONDITION|LIGHTING_CONDITION    |FIRST_CRASH_TYPE        |TRAFFICWAY_TYPE                |ALIGNMENT         |ROADWA

In [21]:
# show data types
filtered_crashes.printSchema()

root
 |-- CRASH_RECORD_ID: string (nullable = true)
 |-- CRASH_DATE: timestamp (nullable = true)
 |-- POSTED_SPEED_LIMIT: integer (nullable = true)
 |-- TRAFFIC_CONTROL_DEVICE: string (nullable = true)
 |-- DEVICE_CONDITION: string (nullable = true)
 |-- WEATHER_CONDITION: string (nullable = true)
 |-- LIGHTING_CONDITION: string (nullable = true)
 |-- FIRST_CRASH_TYPE: string (nullable = true)
 |-- TRAFFICWAY_TYPE: string (nullable = true)
 |-- ALIGNMENT: string (nullable = true)
 |-- ROADWAY_SURFACE_COND: string (nullable = true)
 |-- ROAD_DEFECT: string (nullable = true)
 |-- CRASH_TYPE: string (nullable = true)
 |-- DAMAGE: string (nullable = true)
 |-- PRIM_CONTRIBUTORY_CAUSE: string (nullable = true)
 |-- SEC_CONTRIBUTORY_CAUSE: string (nullable = true)
 |-- NUM_UNITS: integer (nullable = true)
 |-- MOST_SEVERE_INJURY: string (nullable = true)
 |-- INJURIES_TOTAL: integer (nullable = true)
 |-- INJURIES_FATAL: integer (nullable = true)
 |-- CRASH_HOUR: integer (nullable = true)
 |

In [22]:
filtered_vehicles.printSchema()

root
 |-- CRASH_UNIT_ID: integer (nullable = true)
 |-- CRASH_RECORD_ID: string (nullable = true)
 |-- CRASH_DATE: timestamp (nullable = true)
 |-- UNIT_NO: integer (nullable = true)
 |-- UNIT_TYPE: string (nullable = true)
 |-- VEHICLE_ID: integer (nullable = true)
 |-- MAKE: string (nullable = true)
 |-- MODEL: string (nullable = true)
 |-- LIC_PLATE_STATE: string (nullable = true)
 |-- VEHICLE_YEAR: integer (nullable = true)
 |-- VEHICLE_DEFECT: string (nullable = true)
 |-- VEHICLE_TYPE: string (nullable = true)
 |-- VEHICLE_USE: string (nullable = true)
 |-- TRAVEL_DIRECTION: string (nullable = true)
 |-- MANEUVER: string (nullable = true)
 |-- OCCUPANT_CNT: integer (nullable = true)
 |-- FIRST_CONTACT_POINT: string (nullable = true)



In [23]:
filtered_people.printSchema()

root
 |-- PERSON_ID: string (nullable = true)
 |-- PERSON_TYPE: string (nullable = true)
 |-- CRASH_RECORD_ID: string (nullable = true)
 |-- VEHICLE_ID: integer (nullable = true)
 |-- CRASH_DATE: timestamp (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIPCODE: string (nullable = true)
 |-- SEX: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- DRIVERS_LICENSE_STATE: string (nullable = true)
 |-- DRIVERS_LICENSE_CLASS: string (nullable = true)
 |-- SAFETY_EQUIPMENT: string (nullable = true)
 |-- AIRBAG_DEPLOYED: string (nullable = true)
 |-- EJECTION: string (nullable = true)
 |-- INJURY_CLASSIFICATION: string (nullable = true)
 |-- DRIVER_ACTION: string (nullable = true)
 |-- PHYSICAL_CONDITION: string (nullable = true)
 |-- BAC_RESULT: string (nullable = true)



In [26]:
filtered_crashes.select("CRASH_DATE").orderBy("CRASH_DATE", ascending=True).show()

+-------------------+
|         CRASH_DATE|
+-------------------+
|2022-01-01 00:00:00|
|2022-01-01 00:00:00|
|2022-01-01 00:01:00|
|2022-01-01 00:13:00|
|2022-01-01 00:16:00|
|2022-01-01 00:25:00|
|2022-01-01 00:30:00|
|2022-01-01 00:30:00|
|2022-01-01 00:45:00|
|2022-01-01 00:51:00|
|2022-01-01 00:52:00|
|2022-01-01 00:53:00|
|2022-01-01 01:00:00|
|2022-01-01 01:00:00|
|2022-01-01 01:00:00|
|2022-01-01 01:05:00|
|2022-01-01 01:10:00|
|2022-01-01 01:12:00|
|2022-01-01 01:25:00|
|2022-01-01 01:25:00|
+-------------------+
only showing top 20 rows



In [33]:
output_path_crashes = "/home/jovyan/data/filtered_crashes"
output_path_people = "/home/jovyan/data/filtered_people"
output_path_vehicles = "/home/jovyan/data/filtered_vehicles"

In [34]:
filtered_crashes.coalesce(1).write.option("header", True) \
                   .option("quote", '"') \
                   .option("escape", '"') \
                   .option("sep", ",") \
                   .mode("overwrite") \
                   .csv(output_path_crashes)
filtered_people.coalesce(1).write.option("header", True) \
                   .option("quote", '"') \
                   .option("escape", '"') \
                   .option("sep", ",") \
                   .mode("overwrite") \
                   .csv(output_path_people)
filtered_vehicles.coalesce(1).write.option("header", True) \
                   .option("quote", '"') \
                   .option("escape", '"') \
                   .option("sep", ",") \
                   .mode("overwrite") \
                   .csv(output_path_vehicles)