In [7]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [8]:
spark = (
    SparkSession.builder
    .appName("Testing")
    .config("spark.driver.extraJavaOptions", "--add-opens=java.base/javax.security.auth=ALL-UNNAMED --enable-native-access=ALL-UNNAMED")
    .config("spark.executor.extraJavaOptions", "--add-opens=java.base/javax.security.auth=ALL-UNNAMED --enable-native-access=ALL-UNNAMED")
    .getOrCreate()
)

In [9]:
df = pd.read_csv(
    filepath_or_buffer="dataset/NYC Accidents 2020.csv"
)

In [10]:
df.head()

Unnamed: 0,CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,ON STREET NAME,CROSS STREET NAME,OFF STREET NAME,...,CONTRIBUTING FACTOR VEHICLE 2,CONTRIBUTING FACTOR VEHICLE 3,CONTRIBUTING FACTOR VEHICLE 4,CONTRIBUTING FACTOR VEHICLE 5,COLLISION_ID,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,VEHICLE TYPE CODE 3,VEHICLE TYPE CODE 4,VEHICLE TYPE CODE 5
0,2020-08-29,15:40:00,BRONX,10466.0,40.8921,-73.83376,POINT (-73.83376 40.8921),PRATT AVENUE,STRANG AVENUE,,...,Unspecified,,,,4342908,Sedan,Station Wagon/Sport Utility Vehicle,,,
1,2020-08-29,21:00:00,BROOKLYN,11221.0,40.6905,-73.919914,POINT (-73.919914 40.6905),BUSHWICK AVENUE,PALMETTO STREET,,...,Unspecified,,,,4343555,Sedan,Sedan,,,
2,2020-08-29,18:20:00,,,40.8165,-73.946556,POINT (-73.946556 40.8165),8 AVENUE,,,...,,,,,4343142,Station Wagon/Sport Utility Vehicle,,,,
3,2020-08-29,00:00:00,BRONX,10459.0,40.82472,-73.89296,POINT (-73.89296 40.82472),,,1047 SIMPSON STREET,...,Unspecified,Unspecified,Unspecified,,4343588,Station Wagon/Sport Utility Vehicle,Station Wagon/Sport Utility Vehicle,Sedan,Motorcycle,
4,2020-08-29,17:10:00,BROOKLYN,11203.0,40.64989,-73.93389,POINT (-73.93389 40.64989),,,4609 SNYDER AVENUE,...,Unspecified,,,,4342953,Sedan,Sedan,,,


In [11]:
df = df.drop(["ZIP CODE", "LONGITUDE", "LATITUDE", "LOCATION"], axis=1)

In [12]:
df.head()

Unnamed: 0,CRASH DATE,CRASH TIME,BOROUGH,ON STREET NAME,CROSS STREET NAME,OFF STREET NAME,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,...,CONTRIBUTING FACTOR VEHICLE 2,CONTRIBUTING FACTOR VEHICLE 3,CONTRIBUTING FACTOR VEHICLE 4,CONTRIBUTING FACTOR VEHICLE 5,COLLISION_ID,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,VEHICLE TYPE CODE 3,VEHICLE TYPE CODE 4,VEHICLE TYPE CODE 5
0,2020-08-29,15:40:00,BRONX,PRATT AVENUE,STRANG AVENUE,,0,0,0,0,...,Unspecified,,,,4342908,Sedan,Station Wagon/Sport Utility Vehicle,,,
1,2020-08-29,21:00:00,BROOKLYN,BUSHWICK AVENUE,PALMETTO STREET,,2,0,0,0,...,Unspecified,,,,4343555,Sedan,Sedan,,,
2,2020-08-29,18:20:00,,8 AVENUE,,,1,0,1,0,...,,,,,4343142,Station Wagon/Sport Utility Vehicle,,,,
3,2020-08-29,00:00:00,BRONX,,,1047 SIMPSON STREET,0,0,0,0,...,Unspecified,Unspecified,Unspecified,,4343588,Station Wagon/Sport Utility Vehicle,Station Wagon/Sport Utility Vehicle,Sedan,Motorcycle,
4,2020-08-29,17:10:00,BROOKLYN,,,4609 SNYDER AVENUE,0,0,0,0,...,Unspecified,,,,4342953,Sedan,Sedan,,,


In [13]:
df.shape

(74881, 25)

In [14]:
# COLLISION_ID is our unique values containing feature.
df["COLLISION_ID"].unique()

array([4342908, 4343555, 4343142, ..., 4269230, 4267482, 4268376],
      shape=(74881,))

In [15]:
df["VEHICLE TYPE CODE 1"].unique().shape  # 273 Unique vehicle types. 

(273,)

In [16]:
df["VEHICLE TYPE CODE 1"].unique()

array(['Sedan', 'Station Wagon/Sport Utility Vehicle', 'Bus',
       'Pick-up Truck', 'Box Truck', 'Taxi', 'Bike', 'Convertible', 'PK',
       'Flat Bed', 'E-Bike', nan, 'Motorcycle', 'AMBULANCE', 'Dump',
       'MOPED', 'Ambulance', 'Carry All', '4 dr sedan',
       'Refrigerated Van', 'Work Van', 'Tractor Truck Diesel',
       'E-Scooter', 'Tow Truck / Wrecker', 'Lawnmower',
       'Tractor Truck Gasoline', 'Armored Truck', 'Van', 'Concrete Mixer',
       'UNK', 'Golf Cart', 'Garbage or Refuse', 'Tanker', 'SWT',
       'Bulk Agriculture', 'TRAILER', 'TRAC', 'Moped', 'COURIER',
       'Minibike', 'PSD', 'FDNY fire', 'TRUCK VAN', 'Motorscooter',
       'LIMO', 'Multi-Wheeled Vehicle', 'FDNY TRUCK', 'Chassis Cab',
       'Lift Boom', 'dilevery t', 'DRILL RIG', 'van', 'trailer',
       'DELIVERY', 'ambulance', 'GEN  AMBUL', 'Tractor tr', 'Pumper',
       'OTH', 'TRUCK FLAT', '3-Door', 'Stake or Rack', 'Beverage Truck',
       'Front-Load', 'dump truck', 'FDNY Ambul', 'government',
      

In [17]:
df["VEHICLE TYPE CODE 5"].unique()

array([nan, 'PK', 'Station Wagon/Sport Utility Vehicle', 'Motorcycle',
       'Sedan', 'Pick-up Truck', 'Tractor Truck Diesel', 'Motorscooter',
       'Convertible', 'Van', 'Taxi', 'Box Truck', 'E-Scooter',
       'BOX Truck', 'Box truck', 'Dump', 'Bus'], dtype=object)

### Fixing values of car types.

In [18]:
# Fixing misspellings and standardize to proper case
misspellings = {
    'AMBULENCE': 'Ambulance',   # "ENCE" -> "ANCE"
    'Ambulance': 'Ambulance',   # Already correct case
    'GEN  AMBUL': 'Ambulance',  # Abbreviation
    'abulance': 'Ambulance',    # Missing "m"
    'AMB': 'Ambulance',         # Abbreviation
    'AMBU': 'Ambulance',        # Abbreviation
    'Amb': 'Ambulance',         # Mixed case
    'AMBULANCE': 'Ambulance',   # ALL CAPS to proper case
}

# Standardize capitalization - all to proper case (first letter capital)
capitalization_fixes = {
    # Ambulance variations
    'FDNY Ambul': 'Ambulance',
    'FDNY AMBUL': 'Ambulance',
    'Fdny ambul': 'Ambulance',
    'NYC AMBULA': 'Ambulance',
    'NYS AMBULA': 'Ambulance',
    'White ambu': 'Ambulance',
    
    # Fire Truck variations
    'FDNY fire': 'Fire Truck',
    'FDNY FIRET': 'Fire Truck',
    'FDNY TRUCK': 'Fire Truck',
    'FDNY FIRE': 'Fire Truck',
    'FDNY Engin': 'Fire Truck',
    'FDNY ENGIN': 'Fire Truck',
    'Fire Truck': 'Fire Truck',
    'Fire truck': 'Fire Truck',
    'fire truck': 'Fire Truck',
    'Firetruck': 'Fire Truck',
    'FIRETRUCK': 'Fire Truck',
    'FIRE TRUCK': 'Fire Truck',
    'FIRE ENGIN': 'Fire Truck',
    
    # Box Truck
    'BOX TRUCK': 'Box Truck',
    'box truck': 'Box Truck',
    'Box Truck': 'Box Truck',
    
    # Pick-up Truck
    'Pick up Tr': 'Pick-up Truck',
    'PICK-UP TR': 'Pick-up Truck',
    'PICK UP TR': 'Pick-up Truck',
    'Pick up': 'Pick-up Truck',
    'Pickup with mounted Camper': 'Pick-up Truck',
    'PICKUP TRU': 'Pick-up Truck',
    'PICK UP': 'Pick-up Truck',
    'Pick-up Truck': 'Pick-up Truck',
    
    # Van
    'WORK VAN': 'Van',
    'Work van': 'Van',
    'Work Van': 'Van',
    'TRUCK VAN': 'Van',
    'van': 'Van',
    'DELIVERY V': 'Van',
    'delivery v': 'Van',
    'CARGO VAN': 'Van',
    'Cargo Van': 'Van',
    'Van': 'Van',
    
    # Dump Truck
    'Dump': 'Dump Truck',
    'DUMP': 'Dump Truck',
    'dump truck': 'Dump Truck',
    'Dump truck': 'Dump Truck',
    'Dump Truck': 'Dump Truck',
    
    # Tractor
    'Tractor tr': 'Tractor Truck',
    'Tractor Tr': 'Tractor Truck',
    'tractor tr': 'Tractor Truck',
    'TRACTOR': 'Tractor Truck',
    'Tractor': 'Tractor Truck',
    'Tractor Truck Diesel': 'Tractor Truck',
    'Tractor Truck Gasoline': 'Tractor Truck',
    'Tractor tr': 'Tractor Truck',
    'Tractor Truck': 'Tractor Truck',
    
    # Motorcycle variations
    'Motorscooter': 'Motorcycle',
    'MOTORSCOOT': 'Motorcycle',
    'MOTOR SCOO': 'Motorcycle',
    'MOPED': 'Motorcycle',
    'moped': 'Motorcycle',
    'Motorbike': 'Motorcycle',
    'MOTORSCOOTER': 'Motorcycle',
    'Motorscooter': 'Motorcycle',
    'Motorcycle': 'Motorcycle',
    
    # Scooter
    'SCOOTER': 'Scooter',
    'E REVEL SC': 'E-scooter',
    'PUSH SCOOT': 'Scooter',
    'Scooter': 'Scooter',
    
    # Sedan
    '4 dr sedan': 'Sedan',
    '2 dr sedan': 'Sedan',
    '3-Door': 'Sedan',
    'Sedan': 'Sedan',
    
    # E-Bike
    'E-BIKE': 'E-bike',
    'E-Bik': 'E-bike',
    'E bike': 'E-bike',
    'E-Bike': 'E-bike',
    
    # Trailer
    'trailer': 'Trailer',
    'TRAILER': 'Trailer',
    'TRAIL': 'Trailer',
    'TRL': 'Trailer',
    'TR-Trailer': 'Trailer',
    'Trailer': 'Trailer',
    
    # Tow Truck
    'tow truck': 'Tow Truck',
    'TOW TRUCK': 'Tow Truck',
    'Tow truck': 'Tow Truck',
    'Tow Truck': 'Tow Truck',
    'Tow Truck / Wrecker': 'Tow Truck',
    
    # USPS/Mail
    'USPS VAN': 'USPS',
    'USPS TRUCK': 'USPS',
    'USPS POSTA': 'USPS',
    'USPS #7530': 'USPS',
    'postal tru': 'USPS',
    'postal bus': 'USPS',
    'POSTAL TRU': 'USPS',
    'MAIL TRUCK': 'USPS',
    'US POSTAL': 'USPS',
    'postal ser': 'USPS',
    'USPS': 'USPS',
    
    # Delivery
    'DELIVERY': 'Delivery',
    'DELIVERY T': 'Delivery',
    'DELIVERY V': 'Delivery',
    'delviery': 'Delivery',
    'Delv': 'Delivery',
    'DELV': 'Delivery',
    'Delivery': 'Delivery',
    
    # Commercial
    'COM': 'Commercial',
    'com': 'Commercial',
    'commercial': 'Commercial',
    'COM TRANS': 'Commercial',
    'COMMERCIAL': 'Commercial',
    
    # Utility
    'UTIL': 'Utility',
    'UTILITY VE': 'Utility',
    'UT': 'Utility',
    'UTILITY': 'Utility',
    'UTILITY TR': 'Utility',
    'UTILITY.': 'Utility',
    
    # Truck (generic)
    'TRUCK': 'Truck',
    'truck': 'Truck',
    'TRK': 'Truck',
    'Trc': 'Truck',
    'Tr': 'Truck',
    'Truck': 'Truck',
    
    # Garbage
    'GARBAGE TR': 'Garbage Truck',
    'Garbage or Refuse': 'Garbage Truck',
    'Garbage Truck': 'Garbage Truck',
    
    # Freight
    'FREIGHT FL': 'Freight',
    'FREIGHT TR': 'Freight',
    'FREIG': 'Freight',
    'FREIG DELV': 'Freight',
    'FREIGHTLIN': 'Freight',
    'Freight': 'Freight',
    
    # Flat Bed
    'Flat Bed': 'Flat Bed',
    'Flat Rack': 'Flat Bed',
    'FLATBED': 'Flat Bed',
    'TRUCK FLAT': 'Flat Bed',
    
    # Forklift
    'Fork lift': 'Forklift',
    'FORK LIFT': 'Forklift',
    'FORKLIFT': 'Forklift',
    'forklift': 'Forklift',
    
    # Convertible
    'Convertible': 'Convertible',
    'CONVERTIBLE': 'Convertible',
    
    # Golf Cart
    'GOLF CART': 'Golf Cart',
    'Golf Cart': 'Golf Cart',
    
    # Suburban
    'suburban': 'Suburban',
    'SUBN WHI': 'Suburban',
    'SUBURBAN': 'Suburban',
    
    # Bike
    'Bike': 'Bike',
    'BIKE': 'Bike',
    
    # Taxi
    'Taxi': 'Taxi',
    'TAXI': 'Taxi',
    
    # Bus (keep as is - don't change School Bus to Bus)
    'Bus': 'Bus',
    'BUS': 'Bus',
    
    # Station Wagon
    'Station Wagon': 'Station Wagon',
    
    # Convertible
    'Convertible': 'Convertible',
    
    # Bike
    'Bike': 'Bike',
    
    # E-scooter
    'E-Scooter': 'E-scooter',
    
    # Lawnmower
    'Lawnmower': 'Lawnmower',
    
    # Concrete Mixer
    'Concrete Mixer': 'Concrete Mixer',
    
    # Refrigerated Van
    'Refrigerated Van': 'Refrigerated Van',
    
    # Armored Truck
    'Armored Truck': 'Armored Truck',
    
    # Tanker
    'Tanker': 'Tanker',
    
    # Beverage Truck
    'Beverage Truck': 'Beverage Truck',
    
    # Forklift
    'Forklift': 'Forklift',
    
    # Go kart
    'Go kart': 'Go Kart',
    
    # Camper
    'Van Camper': 'Camper',
    
    # Backhoe
    'backhoe': 'Backhoe',
    'BACK HOE': 'Backhoe',
    'BACKHOE': 'Backhoe',
    
    # Bobcat
    'Bobcat': 'Bobcat',
    'BOBCAT FOR': 'Bobcat',
    
    # Snow Plow
    'Snow Plow': 'Snow Plow',
    
    # Hearse
    'Hearse': 'Hearse',
}

# Expand abbreviations
abbreviations = {
    'PK': 'Pickup',
    'PSD': 'Public Safety Vehicle',
    'FDNY': 'Fire Truck',
    'EMS': 'Ambulance',
    'MTA': 'Bus',
    'USPS': 'Mail Truck',
    'FDNY #226': 'Fire Truck',
    'NYC FD': 'Fire Truck',
    'FDNY EMT': 'Ambulance',
    'FDNY LADDE': 'Fire Truck',
    'ESU RESCUE': 'Rescue Vehicle',
    'UNK': 'Unknown',
    'UNKNOWN': 'Unknown',
    'UNKN': 'Unknown',
    'Unknown': 'Unknown',
    'OTH': 'Other',
    'OTHER': 'Other',
    'TRAC': 'Tractor',
    'SWT': 'Station Wagon',
    'LIMO': 'Limousine',
    'PK': 'Pickup',
    'PC': 'Passenger Car',
    'HRSE': 'Horse',
    'H1': 'Hummer H1',
    'J1': 'Jeep',
    '1C': 'One Car',
    'SE': 'Special Equipment',
    'OMS': 'Office of Management Services',
    'OMR': 'Other Motorized Road',
    'LCOMM': 'Light Commercial',
}

In [19]:
"""
Example.
df["VEHICLE TYPE CODE 1"] = df["VEHICLE TYPE CODE 1"] \
    .replace(misspellings) \
    .replace(capitalization_fixes) \
    .replace(abbreviations)
"""

vehicle_columns = [
    "VEHICLE TYPE CODE 1", 
    "VEHICLE TYPE CODE 2", 
    "VEHICLE TYPE CODE 3", 
    "VEHICLE TYPE CODE 4", 
    "VEHICLE TYPE CODE 5"
]

# Applying replacements
for col in df.columns:
    if col in vehicle_columns:
        df[col] = df[col] \
            .replace(misspellings) \
            .replace(capitalization_fixes) \
            .replace(abbreviations)

In [20]:
for col in df.columns:
    if col in vehicle_columns:
        print(df[col].unique().shape)

(142,)
(224,)
(40,)
(25,)
(16,)


### Renaming Columns

In [21]:
renaming_rules = {
    "CRASH DATE": "date",
    "CRASH TIME": "time",
    "NUMBER OF PERSONS INJURED": "persons_injured",
    "NUMBER OF PERSONS KILLED": "persons_killed",
    "NUMBER OF PEDESTRIANS INJURED": "pedestrians_injured",
    "NUMBER OF PEDESTRIANS KILLED": "pedestrians_killed",
    "VEHICLE TYPE CODE 1": "vehicle_type_1",
    "VEHICLE TYPE CODE 2": "vehicle_type_2", 
    "VEHICLE TYPE CODE 3": "vehicle_type_3",
    "VEHICLE TYPE CODE 4": "vehicle_type_4",
    "VEHICLE TYPE CODE 5": "vehicle_type_5",
    "COLLISION_ID": "collision_id"
}

df = df.rename(columns=renaming_rules)

# Converting all remaining columns to lowercase with underscores (aka Snake Case)
for col in df.columns:
    if col not in renaming_rules.values():  # Skiping already renamed columns
        new_name = col.lower().replace(' ', '_')
        df = df.rename(columns={col: new_name})

In [22]:
df.head()

Unnamed: 0,date,time,borough,on_street_name,cross_street_name,off_street_name,persons_injured,persons_killed,pedestrians_injured,pedestrians_killed,...,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_1,vehicle_type_2,vehicle_type_3,vehicle_type_4,vehicle_type_5
0,2020-08-29,15:40:00,BRONX,PRATT AVENUE,STRANG AVENUE,,0,0,0,0,...,Unspecified,,,,4342908,Sedan,Station Wagon/Sport Utility Vehicle,,,
1,2020-08-29,21:00:00,BROOKLYN,BUSHWICK AVENUE,PALMETTO STREET,,2,0,0,0,...,Unspecified,,,,4343555,Sedan,Sedan,,,
2,2020-08-29,18:20:00,,8 AVENUE,,,1,0,1,0,...,,,,,4343142,Station Wagon/Sport Utility Vehicle,,,,
3,2020-08-29,00:00:00,BRONX,,,1047 SIMPSON STREET,0,0,0,0,...,Unspecified,Unspecified,Unspecified,,4343588,Station Wagon/Sport Utility Vehicle,Station Wagon/Sport Utility Vehicle,Sedan,Motorcycle,
4,2020-08-29,17:10:00,BROOKLYN,,,4609 SNYDER AVENUE,0,0,0,0,...,Unspecified,,,,4342953,Sedan,Sedan,,,


In [23]:
df["hour"] = df["time"].astype(str).str.split(":").str[0]

In [24]:
df.head()

Unnamed: 0,date,time,borough,on_street_name,cross_street_name,off_street_name,persons_injured,persons_killed,pedestrians_injured,pedestrians_killed,...,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_1,vehicle_type_2,vehicle_type_3,vehicle_type_4,vehicle_type_5,hour
0,2020-08-29,15:40:00,BRONX,PRATT AVENUE,STRANG AVENUE,,0,0,0,0,...,,,,4342908,Sedan,Station Wagon/Sport Utility Vehicle,,,,15
1,2020-08-29,21:00:00,BROOKLYN,BUSHWICK AVENUE,PALMETTO STREET,,2,0,0,0,...,,,,4343555,Sedan,Sedan,,,,21
2,2020-08-29,18:20:00,,8 AVENUE,,,1,0,1,0,...,,,,4343142,Station Wagon/Sport Utility Vehicle,,,,,18
3,2020-08-29,00:00:00,BRONX,,,1047 SIMPSON STREET,0,0,0,0,...,Unspecified,Unspecified,,4343588,Station Wagon/Sport Utility Vehicle,Station Wagon/Sport Utility Vehicle,Sedan,Motorcycle,,0
4,2020-08-29,17:10:00,BROOKLYN,,,4609 SNYDER AVENUE,0,0,0,0,...,,,,4342953,Sedan,Sedan,,,,17


In [25]:
df.to_csv(path_or_buf="nyc_traffic_processed.csv", index=False)

In [26]:
from pyspark.sql.types import (
        StructType, 
        StructField, 
        StringType, 
        TimestampType, 
        IntegerType,
)

# Schema with proper types
schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("time", TimestampType(), True),
    StructField("borough", StringType(), True),
    StructField("on_street_name", StringType(), True),
    StructField("cross_street_name", StringType(), True),
    StructField("off_street_name", StringType(), True),
    StructField("persons_injured", IntegerType(), True),  # Should be integer
    StructField("persons_killed", IntegerType(), True),   # Should be integer
    StructField("pedestrians_injured", IntegerType(), True),
    StructField("pedestrians_killed", IntegerType(), True),
    StructField("number_of_cyclist_injured", IntegerType(), True),
    StructField("number_of_cyclist_killed", IntegerType(), True),
    StructField("number_of_motorist_injured", IntegerType(), True),
    StructField("number_of_motorist_killed", IntegerType(), True),
    StructField("contributing_factor_vehicle_1", StringType(), True),
    StructField("contributing_factor_vehicle_2", StringType(), True),
    StructField("contributing_factor_vehicle_3", StringType(), True),
    StructField("contributing_factor_vehicle_4", StringType(), True),
    StructField("contributing_factor_vehicle_5", StringType(), True),
    StructField("collision_id", StringType(), True),
    StructField("vehicle_type_1", StringType(), True),
    StructField("vehicle_type_2", StringType(), True),
    StructField("vehicle_type_3", StringType(), True),
    StructField("vehicle_type_4", StringType(), True),
    StructField("vehicle_type_5", StringType(), True),
    StructField("hour", IntegerType(), True),  # Should be integer 0-23
])

df = spark.read.csv("nyc_traffic_processed.csv", header=True, schema=schema)

In [27]:
df.createOrReplaceTempView("accidents_table")

### Basic Selection of Locations

In [28]:
def select_crashes(borough: str):
    result = spark.sql(
        f"""
        select *
        from accidents_table
        where borough = "{borough}"
        """
    )
    result.show()

In [29]:
select_crashes("BROOKLYN")  # User can pass whatever borough 
                            # he needs to analyze. 


+-------------------+-------------------+--------+--------------------+-----------------+--------------------+---------------+--------------+-------------------+------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+--------------+--------------+--------------+----+
|               date|               time| borough|      on_street_name|cross_street_name|     off_street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|collision_id|      vehicle

In [30]:
def one_person_injured():
    result = spark.sql(
        """
        select *
        from accidents_table
        where persons_injured > 1
        """
    )
    result.show()

In [31]:
one_person_injured()

+-------------------+-------------------+-------------+--------------------+--------------------+------------------+---------------+--------------+-------------------+------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------+----+
|               date|               time|      borough|      on_street_name|   cross_street_name|   off_street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|co

### Little Filtering

In [32]:
def select_contributing_factors_1(factor: str):
    result = spark.sql(
        f"""
        select *
        from accidents_table
        where contributing_factor_vehicle_1 = "{factor}"
        """
    )
    result.show()

In [33]:
select_contributing_factors_1(factor="Driver Inexperience")  # Again, user can
                                                             # pass whatever parameter he needs

+-------------------+-------------------+---------+--------------------+-----------------+--------------------+---------------+--------------+-------------------+------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+--------------+--------------+--------------+----+
|               date|               time|  borough|      on_street_name|cross_street_name|     off_street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|collision_id|      vehic

In [34]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- borough: string (nullable = true)
 |-- on_street_name: string (nullable = true)
 |-- cross_street_name: string (nullable = true)
 |-- off_street_name: string (nullable = true)
 |-- persons_injured: integer (nullable = true)
 |-- persons_killed: integer (nullable = true)
 |-- pedestrians_injured: integer (nullable = true)
 |-- pedestrians_killed: integer (nullable = true)
 |-- number_of_cyclist_injured: integer (nullable = true)
 |-- number_of_cyclist_killed: integer (nullable = true)
 |-- number_of_motorist_injured: integer (nullable = true)
 |-- number_of_motorist_killed: integer (nullable = true)
 |-- contributing_factor_vehicle_1: string (nullable = true)
 |-- contributing_factor_vehicle_2: string (nullable = true)
 |-- contributing_factor_vehicle_3: string (nullable = true)
 |-- contributing_factor_vehicle_4: string (nullable = true)
 |-- contributing_factor_vehicle_5: string (nullable = true)
 

In [35]:
def crashed_after_time(hour: str):
    result = spark.sql(
        f"""
        select *
        from accidents_table
        where hour >= "{hour}"
        """
    )
    result.show()

In [36]:
crashed_after_time("15")

+-------------------+-------------------+---------+--------------------+-----------------+-------------------+---------------+--------------+-------------------+------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+--------------------+--------------+--------------+----+
|               date|               time|  borough|      on_street_name|cross_street_name|    off_street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|collision_id|      v

### Aggregation Tasks

In [37]:
# Counting how many crashes happened in each borough
spark.sql(
    """
    select borough, count(collision_id)
    from accidents_table
    group by borough
    """
).show()

+-------------+-------------------+
|      borough|count(collision_id)|
+-------------+-------------------+
|         NULL|              25741|
|       QUEENS|              14017|
|     BROOKLYN|              16907|
|        BRONX|               9417|
|    MANHATTAN|               7353|
|STATEN ISLAND|               1446|
+-------------+-------------------+



In [38]:
# Finding the number of persons killed per borough.
spark.sql(
    """
    select borough, 
        sum(persons_killed) as killed_persons 
    from accidents_table
    group by borough
    """
).show()

+-------------+--------------+
|      borough|killed_persons|
+-------------+--------------+
|         NULL|            72|
|       QUEENS|            20|
|     BROOKLYN|            27|
|        BRONX|            10|
|    MANHATTAN|             9|
|STATEN ISLAND|             6|
+-------------+--------------+



In [39]:
# Finding the top 5 streets with the highest number of crashes.
spark.sql(
    """
    SELECT on_street_name, 
        COUNT(collision_id) AS number_of_crashes 
    FROM accidents_table
    WHERE on_street_name IS NOT NULL
    GROUP BY on_street_name
    ORDER BY number_of_crashes DESC
    LIMIT 5
    """
).show(truncate=False)

+--------------------------+-----------------+
|on_street_name            |number_of_crashes|
+--------------------------+-----------------+
|BELT PARKWAY              |1241             |
|LONG ISLAND EXPRESSWAY    |745              |
|BROOKLYN QUEENS EXPRESSWAY|738              |
|FDR DRIVE                 |728              |
|MAJOR DEEGAN EXPRESSWAY   |591              |
+--------------------------+-----------------+



In [40]:
# Finding all streets that had more than 100 crashes.
spark.sql(
    """
    SELECT on_street_name, 
        COUNT(collision_id) AS number_of_crashes 
    FROM accidents_table
    WHERE 
        on_street_name IS NOT NULL
    GROUP BY on_street_name
        HAVING number_of_crashes > 100
    """
).show(truncate=False)

+-----------------------+-----------------+
|on_street_name         |number_of_crashes|
+-----------------------+-----------------+
|WOODHAVEN BOULEVARD    |177              |
|MYRTLE AVENUE          |165              |
|FLUSHING AVENUE        |111              |
|EAST 149 STREET        |115              |
|BEDFORD AVENUE         |191              |
|FULTON STREET          |177              |
|FLATLANDS AVENUE       |115              |
|PITKIN AVENUE          |102              |
|BROADWAY               |575              |
|ROCKAWAY BOULEVARD     |225              |
|MAJOR DEEGAN EXPRESSWAY|591              |
|CHURCH AVENUE          |127              |
|BRUCKNER EXPRESSWAY    |337              |
|CANAL STREET           |114              |
|BUSHWICK AVENUE        |216              |
|EAST FORDHAM ROAD      |137              |
|GRAND CENTRAL PARKWAY  |120              |
|WEST FORDHAM ROAD      |118              |
|EAST TREMONT AVENUE    |192              |
|HILLSIDE AVENUE        |166    

### Working with NULL Values

In [41]:
# Counting how many crashes have no borough information (BOROUGH IS NULL).
spark.sql(
    """
    SELECT borough, 
        COUNT(collision_id) AS number_of_crashes
    FROM accidents_table
    WHERE borough is NULL
    GROUP BY borough
    """
).show(truncate=False)

+-------+-----------------+
|borough|number_of_crashes|
+-------+-----------------+
|NULL   |25741            |
+-------+-----------------+



In [48]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- borough: string (nullable = true)
 |-- on_street_name: string (nullable = true)
 |-- cross_street_name: string (nullable = true)
 |-- off_street_name: string (nullable = true)
 |-- persons_injured: integer (nullable = true)
 |-- persons_killed: integer (nullable = true)
 |-- pedestrians_injured: integer (nullable = true)
 |-- pedestrians_killed: integer (nullable = true)
 |-- number_of_cyclist_injured: integer (nullable = true)
 |-- number_of_cyclist_killed: integer (nullable = true)
 |-- number_of_motorist_injured: integer (nullable = true)
 |-- number_of_motorist_killed: integer (nullable = true)
 |-- contributing_factor_vehicle_1: string (nullable = true)
 |-- contributing_factor_vehicle_2: string (nullable = true)
 |-- contributing_factor_vehicle_3: string (nullable = true)
 |-- contributing_factor_vehicle_4: string (nullable = true)
 |-- contributing_factor_vehicle_5: string (nullable = true)
 

### CASE Statements

In [55]:
spark.sql(
    """
    SELECT 
        *,
        CASE 
            WHEN persons_killed > 0 
                OR pedestrians_killed > 0 
                OR number_of_cyclist_killed > 0 
                OR number_of_motorist_killed > 0
                THEN 'fatal'
            WHEN persons_injured > 0 
                OR pedestrians_injured > 0 
                OR number_of_cyclist_injured > 0 
                OR number_of_motorist_injured > 0
                THEN 'injury_only'
            ELSE 'property_damage'
        END AS crash_severity
    FROM accidents_table
    """
).show(truncate=False)

+-------------------+-------------------+--------+------------------------+-----------------------+-------------------+---------------+--------------+-------------------+------------------+-------------------------+------------------------+--------------------------+-------------------------+------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-----------------------------------+-----------------------------------+-----------------------------------+--------------+--------------+----+---------------+
|date               |time               |borough |on_street_name          |cross_street_name      |off_street_name    |persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|contributing_factor_vehicle_1 |contributing_factor_vehicle_2|contributing_factor_vehicle_3

### Window Functions

In [74]:
# Ranking streets by number of crashes for each borough
spark.sql(
    """
    SELECT 
        *,
        DENSE_RANK() OVER(PARTITION BY borough ORDER BY number_of_crashes DESC) as rank
    FROM 
        (SELECT 
            borough,
            on_street_name,
            COUNT(collision_id) AS number_of_crashes
        FROM accidents_table
        WHERE 
            borough IS NOT NULL
            AND
            on_street_name IS NOT NULL
        GROUP BY borough, on_street_name
        ) AS t
    """
).show()

# NOTE: I have checked the result - it is correct, do not worry. After BRONX, there will be BROOKLYN

+-------+--------------------+-----------------+----+
|borough|      on_street_name|number_of_crashes|rank|
+-------+--------------------+-----------------+----+
|  BRONX|  BRUCKNER BOULEVARD|              189|   1|
|  BRONX| EAST TREMONT AVENUE|              144|   2|
|  BRONX|       JEROME AVENUE|              141|   3|
|  BRONX|     GRAND CONCOURSE|              120|   4|
|  BRONX|         BOSTON ROAD|              112|   5|
|  BRONX|  WESTCHESTER AVENUE|              103|   6|
|  BRONX|      WEBSTER AVENUE|              102|   7|
|  BRONX|     EAST 149 STREET|               96|   8|
|  BRONX|   WHITE PLAINS ROAD|               94|   9|
|  BRONX|   EAST FORDHAM ROAD|               91|  10|
|  BRONX|   WEST FORDHAM ROAD|               80|  11|
|  BRONX|  SOUTHERN BOULEVARD|               73|  12|
|  BRONX|  EAST GUN HILL ROAD|               73|  12|
|  BRONX|            3 AVENUE|               71|  13|
|  BRONX|     EAST 233 STREET|               64|  14|
|  BRONX|CROSS BRONX EXPRE..

In [86]:
# Calculating the running total of injuries ordered by date for every borough.
spark.sql(
    """
    SELECT 
        borough,
        date,
        time,
        SUM(persons_injured + pedestrians_injured + 
            number_of_cyclist_injured + number_of_motorist_injured)
            OVER(
                PARTITION BY borough 
                ORDER BY date, time
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) as running_total_of_injuries
    FROM accidents_table
    WHERE borough IS NOT NULL
    ORDER BY borough, date, time
    """
).show()

+-------+-------------------+-------------------+-------------------------+
|borough|               date|               time|running_total_of_injuries|
+-------+-------------------+-------------------+-------------------------+
|  BRONX|2020-01-01 00:00:00|2025-12-07 00:00:00|                        0|
|  BRONX|2020-01-01 00:00:00|2025-12-07 00:27:00|                        0|
|  BRONX|2020-01-01 00:00:00|2025-12-07 00:37:00|                        0|
|  BRONX|2020-01-01 00:00:00|2025-12-07 02:05:00|                        0|
|  BRONX|2020-01-01 00:00:00|2025-12-07 02:20:00|                        2|
|  BRONX|2020-01-01 00:00:00|2025-12-07 02:24:00|                        8|
|  BRONX|2020-01-01 00:00:00|2025-12-07 03:30:00|                        8|
|  BRONX|2020-01-01 00:00:00|2025-12-07 03:45:00|                        8|
|  BRONX|2020-01-01 00:00:00|2025-12-07 04:46:00|                        8|
|  BRONX|2020-01-01 00:00:00|2025-12-07 04:50:00|                       10|
|  BRONX|202