Set up:

In [1]:
# Import Libraries and Packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import lit

In [None]:
# Create spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 1") 
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true") 
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

In [None]:
# Load in Taxi data
sdf_yellow = spark.read.parquet('../data/raw/Yellow_raw')
sdf_green = spark.read.parquet('../data/raw/Green_raw')

# Record the number of instances
nraw_y = sdf_yellow.count()
nraw_g = sdf_green.count()

(1 - 25705901/ (nraw_y+nraw_g))*100

In [None]:
# Load in External dataset - related to accidents in NYC in 2020
accidents_landing = spark.read.csv('../data/raw/NYC Accidents 2020.csv')

# Only interested in accidents involving taxis
accidents_raw = accidents_landing.filter((F.col('_c24')=='Taxi')|(F.col('_c25')=='Taxi')|
                         (F.col('_c26')=='Taxi')|(F.col('_c27')=='Taxi')|(F.col('_c28')=='Taxi'))

n_acc = accidents_raw.count()
print("Number of rows in Taxi Accident dataframe:", n_acc)


# Defining Schemas
accidents_raw_schema = accidents_raw.schema

Checking Consistency:

In [None]:
# Ensure everything has consistent casing
def consistent_casing(df):
    consistent_col_casing = [F.col(col_name).alias(col_name.lower()) for col_name in df.columns]
    new_df = df.select(*consistent_col_casing) # casefolding to lowercase
    return(new_df)

# Apply functions
sdf_yellow = consistent_casing(sdf_yellow)
sdf_green = consistent_casing(sdf_green)

# Define Schemas
sdf_schema_yellow = sdf_yellow.schema
sdf_schema_green = sdf_green.schema

In [None]:
# Datatype Conversion
def convert_datatype(df, column_name, datatype):
    df = df.withColumn(
    column_name,
    F.col(column_name).cast(datatype)
)
    return(df)

# Apply functions to each dataframe
sdf_yellow = convert_datatype(sdf_yellow, 'pulocationid', 'int')
sdf_yellow = convert_datatype(sdf_yellow, 'dolocationid', 'int')
sdf_yellow = convert_datatype(sdf_yellow, 'vendorid', 'int')

sdf_green = convert_datatype(sdf_green, 'pulocationid', 'int')
sdf_green = convert_datatype(sdf_green, 'dolocationid', 'int')
sdf_green = convert_datatype(sdf_green, 'vendorid', 'int')

# Check Schema
sdf_yellow.printSchema()

In [None]:
# Datatype conversion

def convert_datatype(df, column_name, datatype):
    df = df.withColumn(
    column_name,
    F.col(column_name).cast(datatype)
)
    return(df)

accidents_raw = convert_datatype(accidents_raw, '_c3', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c4', 'double')
accidents_raw = convert_datatype(accidents_raw, '_c5', 'double')
accidents_raw = convert_datatype(accidents_raw, '_c10', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c11', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c12', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c13', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c14', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c15', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c16', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c17', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c23', 'int')
accidents_raw = convert_datatype(accidents_raw, '_c0', 'string')
accidents_raw = convert_datatype(accidents_raw, '_c1', 'string')

accidents_raw.printSchema()

In [None]:
## Feature Removal and Renaming

# Remove features that are not shared
sdf_green.filter(F.col('ehail_fee')!= None) # There are no values in the ehail_fee (green) column 
sdf_green = sdf_green.drop('ehail_fee') # Delete 'ehail_fee' since it's an empty column not listed in the data dictionary
sdf_green = sdf_green.drop('trip_type')
sdf_yellow = sdf_yellow.drop('airport_fee')

# According to the data dictionary, (lpep_dropoff_datetime, lpep_pickup_datetime) and 
# (tpep_dropoff_datetime, tpep_pickup_datetime) are equivalent and are compatible formating
# So rename lpep (green) to tpep
sdf_green = sdf_green.withColumnRenamed('lpep_dropoff_datetime', 'tpep_dropoff_datetime')
sdf_green = sdf_green.withColumnRenamed('lpep_pickup_datetime', 'tpep_pickup_datetime')

In [None]:
# rename columns in taxi df
accidents_raw = accidents_raw.withColumnRenamed('_c0', 'crash_date')
accidents_raw = accidents_raw.withColumnRenamed('_c1', 'crash_time')
accidents_raw = accidents_raw.withColumnRenamed('_c2', 'borough')
accidents_raw = accidents_raw.withColumnRenamed('_c3', 'zip_code')
accidents_raw = accidents_raw.withColumnRenamed('_c4', 'latitude')
accidents_raw = accidents_raw.withColumnRenamed('_c5', 'longitude')
accidents_raw = accidents_raw.withColumnRenamed('_c6', 'location')
accidents_raw = accidents_raw.withColumnRenamed('_c7', 'street_name')
accidents_raw = accidents_raw.withColumnRenamed('_c8', 'cross_street_name')
accidents_raw = accidents_raw.withColumnRenamed('_c9', 'off_street_name')
accidents_raw = accidents_raw.withColumnRenamed('_c10', 'persons_injured')
accidents_raw = accidents_raw.withColumnRenamed('_c11', 'persons_killed')
accidents_raw = accidents_raw.withColumnRenamed('_c12', 'pedestrians_injured')
accidents_raw = accidents_raw.withColumnRenamed('_c13', 'pedestrians_killed')
accidents_raw = accidents_raw.withColumnRenamed('_c14', 'cyclists_injured')
accidents_raw = accidents_raw.withColumnRenamed('_c15', 'cyclists_killed')
accidents_raw = accidents_raw.withColumnRenamed('_c16', 'motorists_injured')
accidents_raw = accidents_raw.withColumnRenamed('_c17', 'motorists_killed')
accidents_raw = accidents_raw.withColumnRenamed('_c18', 'contributing_factor_vehicle1')
accidents_raw = accidents_raw.withColumnRenamed('_c19', 'contributing_factor_vehicle2')
accidents_raw = accidents_raw.withColumnRenamed('_c20', 'contributing_factor_vehicle3')
accidents_raw = accidents_raw.withColumnRenamed('_c21', 'contributing_factor_vehicle4')
accidents_raw = accidents_raw.withColumnRenamed('_c22', 'contributing_factor_vehicle5')
accidents_raw = accidents_raw.withColumnRenamed('_c23', 'collision_id')
accidents_raw = accidents_raw.withColumnRenamed('_c24', 'vehicle1_type')
accidents_raw = accidents_raw.withColumnRenamed('_c25', 'vehicle2_type')
accidents_raw = accidents_raw.withColumnRenamed('_c26', 'vehicle3_type')
accidents_raw = accidents_raw.withColumnRenamed('_c27', 'vehicle4_type')
accidents_raw = accidents_raw.withColumnRenamed('_c28', 'vehicle5_type')
accidents_raw

In [None]:
# Extraneous features for removal
accidents_raw = accidents_raw.drop('collision_id')
accidents_raw = accidents_raw.drop('off_street_name')
accidents_raw = accidents_raw.drop('street_name')
accidents_raw = accidents_raw.drop('cross_street_name')

accidents_raw.count()

Merging Taxi Dataframes

In [None]:
# Create column to specify taxi colour before merging
sdf_yellow = sdf_yellow.withColumn("taxi_colour", lit('Y'))
sdf_green = sdf_green.withColumn("taxi_colour", lit('G'))

In [None]:
# Sort columns alphabetically before merging
sdf_yellow = sdf_yellow.select(sorted(sdf_yellow.columns))
sdf_green = sdf_green.select(sorted(sdf_green.columns))

# Merge dataframes
sdf_all = sdf_yellow.unionByName(sdf_green)
n_landing = sdf_all.count()
n_landing


Cleaning Data

In [None]:
# Remove irrelevant features
sdf_all = sdf_all.drop('extra')
sdf_all = sdf_all.drop('fare_amount')
sdf_all = sdf_all.drop('improvement_surcharge')
sdf_all = sdf_all.drop('mta_tax')
sdf_all = sdf_all.drop('payment_type')
sdf_all = sdf_all.drop('tolls_amount')
sdf_all = sdf_all.drop('tip_amount')
sdf_all = sdf_all.drop('total_amount')
sdf_all = sdf_all.drop('passenger_count')
sdf_all = sdf_all.drop('ratecodeid')
sdf_all = sdf_all.drop('store_and_fwd_flag')
sdf_all = sdf_all.drop('vendorid')

# Print Remaining number of features
print("Number of Columns in Dataframe: ", len(sdf_all.columns))

In [None]:
sdf_all.columns # 6 columns retained from original dataset (taxi colour was added)

In [None]:
print((accidents_raw.columns)) # 14 columns retained from original dataset, 2 columns added

Validity Checking:

In [None]:
# Delete Records of 0 trip distance
sdf_all = sdf_all.filter(F.col('trip_distance')>0)

# Filter out unknown taxi zones
sdf_all = sdf_all.filter((sdf_all.dolocationid != 264))
sdf_all = sdf_all.filter((sdf_all.dolocationid != 265))
sdf_all = sdf_all.filter((sdf_all.pulocationid != 264))
sdf_all = sdf_all.filter((sdf_all.pulocationid != 265))

n_raw = sdf_all.count()

#sdf_all.count()
print(n_landing - n_raw)
print((n_landing - n_raw)/(nraw_y-nraw_g))

# Print Schema
sdf_all.printSchema()

Creating Relevant Columns from the Taxi Data:

In [None]:
#define columns to sum
fatalities = ['persons_killed', 'pedestrians_killed','cyclists_killed', 'motorists_killed']
injuries = ['persons_injured', 'pedestrians_injured','cyclists_injured', 'motorists_injured']

#create columns totaling the injuries and fatalties and harm
accidents_raw = accidents_raw.withColumn('fatalities', F.expr('+'.join(fatalities)))
accidents_raw = accidents_raw.withColumn('injuries', F.expr('+'.join(injuries)))
accidents_raw = accidents_raw.withColumn('total_harmed', accidents_raw['injuries']+accidents_raw['fatalities'])

# Create column summing number of vechicles involved in crash
accidents_raw = accidents_raw.toPandas()
vehicles = ['vehicle1_type', 'vehicle2_type', 'vehicle3_type', 'vehicle4_type', 'vehicle5_type']
num_vehicles = accidents_raw[vehicles].notnull().sum(axis=1)
accidents_raw['num_vehicles'] = num_vehicles

In [None]:
# The following features have been previously aggregated
accidents_raw = accidents_raw.drop('persons_injured', axis = 1)
accidents_raw = accidents_raw.drop('persons_killed', axis = 1)
accidents_raw = accidents_raw.drop('pedestrians_injured', axis = 1)
accidents_raw = accidents_raw.drop('pedestrians_killed', axis = 1)
accidents_raw = accidents_raw.drop('cyclists_injured', axis = 1)
accidents_raw = accidents_raw.drop('cyclists_killed', axis = 1)
accidents_raw = accidents_raw.drop('motorists_injured', axis = 1)
accidents_raw = accidents_raw.drop('motorists_killed', axis = 1)
accidents_raw = accidents_raw.drop('vehicle1_type', axis = 1)
accidents_raw = accidents_raw.drop('vehicle2_type', axis = 1)
accidents_raw = accidents_raw.drop('vehicle3_type', axis = 1)
accidents_raw = accidents_raw.drop('vehicle4_type', axis = 1)
accidents_raw = accidents_raw.drop('vehicle5_type', axis = 1)

# Convert back to spark df
accidents_raw = spark.createDataFrame(accidents_raw)

In [None]:
sdf_all.count()

In [None]:
# Saving Updated files
accidents_raw.write.parquet('../data/landing/accidents_landing', mode = 'overwrite')
sdf_all.coalesce(1).write.format('parquet').mode('append').save('../data/landing/taxis_landing.parquet')