In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
import geopandas as gp
import fsspec
import s3fs
import boto3
import tempfile

#### Setup AWS S3 connection

In [0]:
aws_access_key_id=''
aws_secret_access_key=''

spark.conf.set("fs.s3a.access.key", aws_access_key_id)
spark.conf.set("fs.s3a.secret.key", aws_secret_access_key)

#### Create boto3 Session

In [0]:
session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
s3 = session.client('s3')

## Extractions

#### Load Taxi Zone Lookup Data

In [0]:
src_tz_path = "s3://capstone-techcatalyst-raw/other/taxi_zone_lookup.csv"
zone_df = spark.read.csv(src_tz_path, header=True)

In [0]:
zone_df.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



#### Load NYC Accident Data

In [0]:
src_acc_path = "s3://capstone-techcatalyst-raw/group_4_other/Motor_Vehicle_Collisions_-_Crashes_20240731.csv" 
acc_df = spark.read.csv(src_acc_path, header=True)

#### Load the GeoSpace Data for Boroughs

In [0]:

bucket = 'capstone-techcatalyst-raw'
key = 'group_4_other/Borough Boundaries.geojson'

with tempfile.NamedTemporaryFile(delete=False, suffix=".geojson") as tmp_file:
    temp_file_path = tmp_file.name

s3.download_file(bucket, key, temp_file_path)
gdf = gp.read_file(temp_file_path)
gdf = gdf[['boro_name','geometry']]



## Transformations

#### Change Crash Date columntype to Datetime

In [0]:
acc_df=acc_df.withColumn('CRASH DATE', F.to_date('CRASH DATE',"MM/dd/yyyy"))

#### Filter out irrelevant dates from Accident Data to Match Taxi Data

In [0]:
acc_df = acc_df.filter(acc_df["CRASH DATE"] >= "2023-09-01")

#### Remove Irrelevant Columns

In [0]:
columns_list = acc_df.columns
for item in enumerate(columns_list):
    print(item)

(0, 'CRASH DATE')
(1, 'CRASH TIME')
(2, 'BOROUGH')
(3, 'ZIP CODE')
(4, 'LATITUDE')
(5, 'LONGITUDE')
(6, 'LOCATION')
(7, 'ON STREET NAME')
(8, 'CROSS STREET NAME')
(9, 'OFF STREET NAME')
(10, 'NUMBER OF PERSONS INJURED')
(11, 'NUMBER OF PERSONS KILLED')
(12, 'NUMBER OF PEDESTRIANS INJURED')
(13, 'NUMBER OF PEDESTRIANS KILLED')
(14, 'NUMBER OF CYCLIST INJURED')
(15, 'NUMBER OF CYCLIST KILLED')
(16, 'NUMBER OF MOTORIST INJURED')
(17, 'NUMBER OF MOTORIST KILLED')
(18, 'CONTRIBUTING FACTOR VEHICLE 1')
(19, 'CONTRIBUTING FACTOR VEHICLE 2')
(20, 'CONTRIBUTING FACTOR VEHICLE 3')
(21, 'CONTRIBUTING FACTOR VEHICLE 4')
(22, 'CONTRIBUTING FACTOR VEHICLE 5')
(23, 'COLLISION_ID')
(24, 'VEHICLE TYPE CODE 1')
(25, 'VEHICLE TYPE CODE 2')
(26, 'VEHICLE TYPE CODE 3')
(27, 'VEHICLE TYPE CODE 4')
(28, 'VEHICLE TYPE CODE 5')


In [0]:
redux_acc_df = acc_df.select([*columns_list[0:7], *columns_list[10:20], *columns_list[24:26]])

#### Drop NULL Longitude and Latitude Columns

In [0]:
redux_acc_df = redux_acc_df.dropna(subset=['LONGITUDE','LATITUDE'])

#### Fill NULL contributing factor columns with 'Unspecified'

In [0]:
redux_acc_df = redux_acc_df.fillna('Unspecified', subset=['CONTRIBUTING FACTOR VEHICLE 1', 'CONTRIBUTING FACTOR VEHICLE 2'])

In [0]:
#redux_acc_df[7:15] = redux_acc_df.select([F.col(column).cast('integer') for column in redux_acc_df.columns[7:15]])
for column in redux_acc_df.columns[7:15]:
    redux_acc_df = redux_acc_df.withColumn(column, F.col(column).cast('int'))

In [0]:
redux_acc_df = redux_acc_df.withColumn("YEAR", F.date_format('CRASH DATE', 'yyyy'))
redux_acc_df = redux_acc_df.withColumn("MONTH", F.date_format('CRASH DATE', 'MM'))
redux_acc_df = redux_acc_df.withColumn("DAY OF MONTH", F.dayofmonth('CRASH DATE'))
redux_acc_df = redux_acc_df.withColumn("DAY OF WEEK", F.date_format('CRASH DATE', 'EEEE'))
redux_acc_df = redux_acc_df.withColumn("IS WEEKEND", F.dayofweek("CRASH DATE").isin([1,7]).cast("boolean"))
redux_acc_df = redux_acc_df.withColumn("CRASH DATETIME", F.date_format(F.concat_ws(' ',F.col('CRASH DATE'), F.col('CRASH TIME')), 'yyyy-MM-dd HH:mm:ss'))
redux_acc_df = redux_acc_df.select(redux_acc_df.columns[-1], *redux_acc_df.columns[0:-1])


In [0]:
redux_acc_df.printSchema()

root
 |-- CRASH DATETIME: string (nullable = true)
 |-- CRASH DATE: date (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: integer (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: integer (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: integer (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = false)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nullable = false)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)
 |-- VEHICLE TYPE CODE

#### Check the missing data

In [0]:
missing_data = redux_acc_df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in redux_acc_df.columns])

In [0]:
display(missing_data)

CRASH DATETIME,CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,NUMBER OF CYCLIST INJURED,NUMBER OF CYCLIST KILLED,NUMBER OF MOTORIST INJURED,NUMBER OF MOTORIST KILLED,CONTRIBUTING FACTOR VEHICLE 1,CONTRIBUTING FACTOR VEHICLE 2,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,YEAR,MONTH,DAY OF MONTH,DAY OF WEEK,IS WEEKEND
0,0,0,21173,21190,0,0,0,0,0,0,0,0,0,0,0,0,0,1366,26584,0,0,0,0,0


#### Convert accident df to a pandas datframe for compatibility with geopandas

In [0]:
from shapely.geometry import Point
pd_acc_df = redux_acc_df.toPandas()

In [0]:
geom_dict = gdf.set_index('boro_name')['geometry'].to_dict()
geom_dict

Out[141]: {'Brooklyn': <MULTIPOLYGON (((-73.863 40.584, -73.864 40.584, -73.864 40.584, -73.865 40....>,
 'Staten Island': <MULTIPOLYGON (((-74.051 40.566, -74.05 40.566, -74.05 40.566, -74.05 40.566...>,
 'Manhattan': <MULTIPOLYGON (((-74.011 40.684, -74.012 40.684, -74.012 40.684, -74.01 40.6...>,
 'Bronx': <MULTIPOLYGON (((-73.897 40.796, -73.897 40.796, -73.897 40.796, -73.898 40....>,
 'Queens': <MULTIPOLYGON (((-73.826 40.591, -73.826 40.59, -73.826 40.59, -73.826 40.59...>}

In [0]:
# Modify the buffer for the geodata
for key, value in geom_dict.items():
    value = value.buffer(0.00001)
    

In [0]:
def get_boro(point):
    for key, value in geom_dict.items():
        if point.within(value):
            return key.upper()
        return 'No Borough'
    
pd_acc_df['BOROUGH'] = pd_acc_df.apply(
    lambda x: get_boro(
        Point(x['LONGITUDE'], x['LATITUDE'])
        ) if pd.isna(x['BOROUGH']) else x['BOROUGH'], axis=1)


In [0]:
# Turn spark dataframe to pandas dataframe
new_acc_df=spark.createDataFrame(pd_acc_df)

In [0]:
missing_data = new_acc_df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in new_acc_df.columns])
display(missing_data)

CRASH DATETIME,CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,NUMBER OF CYCLIST INJURED,NUMBER OF CYCLIST KILLED,NUMBER OF MOTORIST INJURED,NUMBER OF MOTORIST KILLED,CONTRIBUTING FACTOR VEHICLE 1,CONTRIBUTING FACTOR VEHICLE 2,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,YEAR,MONTH,DAY OF MONTH,DAY OF WEEK,IS WEEKEND
0,0,0,0,21190,0,0,0,0,0,0,0,0,0,0,0,0,0,1366,26584,0,0,0,0,0


In [0]:
new_acc_df = new_acc_df.filter((F.col('BOROUGH') != 'No Borough')).select(*new_acc_df.columns[0:3],*new_acc_df.columns[5:])

In [0]:
new_acc_df = new_acc_df.dropna(subset='VEHICLE TYPE CODE 1')
new_acc_df = new_acc_df.fillna('Non Vehicular', subset = ['VEHICLE TYPE CODE 2'])
display(new_acc_df)

CRASH DATETIME,CRASH DATE,CRASH TIME,LATITUDE,LONGITUDE,LOCATION,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,NUMBER OF CYCLIST INJURED,NUMBER OF CYCLIST KILLED,NUMBER OF MOTORIST INJURED,NUMBER OF MOTORIST KILLED,CONTRIBUTING FACTOR VEHICLE 1,CONTRIBUTING FACTOR VEHICLE 2,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,YEAR,MONTH,DAY OF MONTH,DAY OF WEEK,IS WEEKEND
2024-04-03 16:50:00,2024-04-03,16:50,40.760567,-73.9155,"(40.760567, -73.9155)",0,0,0,0,0,0,0,0,Passing or Lane Usage Improper,Unspecified,Dot,Sedan,2024,4,3,Wednesday,False
2023-11-17 21:28:00,2023-11-17,21:28,40.7345,-74.00118,"(40.7345, -74.00118)",1,0,0,0,0,0,0,0,Failure to Yield Right-of-Way,Unspecified,E-Bike,Sedan,2023,11,17,Friday,False
2023-11-16 15:45:00,2023-11-16,15:45,40.726604,-73.81425,"(40.726604, -73.81425)",1,0,0,0,0,0,1,0,Driver Inexperience,Unspecified,Motorcycle,Non Vehicular,2023,11,16,Thursday,False
2023-11-16 07:30:00,2023-11-16,7:30,40.703976,-73.92309,"(40.703976, -73.92309)",0,0,0,0,0,0,0,0,Passing Too Closely,Unspecified,Station Wagon/Sport Utility Vehicle,Non Vehicular,2023,11,16,Thursday,False
2023-11-17 06:57:00,2023-11-17,6:57,40.703823,-73.81173,"(40.703823, -73.81173)",0,0,0,0,0,0,0,0,Other Vehicular,Driver Inexperience,Sedan,Non Vehicular,2023,11,17,Friday,False
2023-10-31 07:45:00,2023-10-31,7:45,40.634483,-73.91886,"(40.634483, -73.91886)",1,0,0,0,0,0,1,0,Unspecified,Unspecified,Sedan,Bus,2023,10,31,Tuesday,False
2023-11-16 11:30:00,2023-11-16,11:30,40.53343,-74.19537,"(40.53343, -74.19537)",0,0,0,0,0,0,0,0,Unspecified,Unspecified,Station Wagon/Sport Utility Vehicle,Non Vehicular,2023,11,16,Thursday,False
2023-11-15 07:00:00,2023-11-15,7:00,40.766445,-73.98333,"(40.766445, -73.98333)",1,0,1,0,0,0,0,0,Driver Inattention/Distraction,Unspecified,Sedan,Non Vehicular,2023,11,15,Wednesday,False
2023-11-16 06:40:00,2023-11-16,6:40,40.615837,-74.08473,"(40.615837, -74.08473)",1,0,1,0,0,0,0,0,Unspecified,Unspecified,Sedan,Non Vehicular,2023,11,16,Thursday,False
2023-11-16 15:15:00,2023-11-16,15:15,40.760857,-73.97972,"(40.760857, -73.97972)",0,0,0,0,0,0,0,0,Backing Unsafely,Unspecified,Motorcycle,Non Vehicular,2023,11,16,Thursday,False


In [0]:
missing_data = new_acc_df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in new_acc_df.columns])
display(missing_data)

CRASH DATETIME,CRASH DATE,CRASH TIME,LATITUDE,LONGITUDE,LOCATION,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,NUMBER OF CYCLIST INJURED,NUMBER OF CYCLIST KILLED,NUMBER OF MOTORIST INJURED,NUMBER OF MOTORIST KILLED,CONTRIBUTING FACTOR VEHICLE 1,CONTRIBUTING FACTOR VEHICLE 2,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,YEAR,MONTH,DAY OF MONTH,DAY OF WEEK,IS WEEKEND
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
acc_dst_path = 's3a://capstone-techcatalyst-conformed/group4/accident_data/'
new_acc_df.write.partitionBy('YEAR','MONTH').mode("overwrite").format("parquet").save(acc_dst_path)

In [0]:
tz_dst_path = 's3a//capstone-techcatalyst-conformed/group4/taxi_zone_lookup/'
zone_df.write.mode('overwrite').format('parquet').save(tz_dst_path)