In [1]:
#getting all the necessary geojson files

import geopandas as gpd
#converting the NTA and taxi zones data into geojson files
directory = "../raw_data_lite/"
taxi_zones_shp = gpd.read_file(f"{directory}taxi_zones.shp")
taxi_zones_shp.to_file(f"{directory}taxi_zones.geojson", drivers = 'GeoJSON')

nta_shp = gpd.read_file(f"{directory}nynta_21b/nynta.shp")
nta_shp.to_file(f"{directory}nta.geojson", drivers = 'GeoJSON')
#reading the files in as dataframes
taxi_zones = gpd.read_file(f"{directory}taxi_zones.geojson")
nta = gpd.read_file(f"{directory}nta.geojson")
boroughs = gpd.read_file(f"{directory}Borough Boundaries.geojson")



In [2]:
# Importing and starting a spark session
from pyspark.sql import SparkSession
from pyspark import SparkContext
#Supress warnings
spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()
sc.setLogLevel('WARN')

spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
#Make the spark files present well
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

21/08/09 02:58:34 WARN Utils: Your hostname, LAPTOP-D5HGLKLK resolves to a loopback address: 127.0.1.1; using 172.23.50.214 instead (on interface eth0)
21/08/09 02:58:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/09 02:58:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# reference from Akira Wang's Github
#forming a schema for the dataframes
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col

#setting datatypes for each individual column
ints = ('VendorID', 'passenger_count', 'RateCodeID', 'RatecodeID','payment_type', 'PULocationID', 'DOLocationID')
doubles = ('trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount'
          , 'congestion_surcharge')
strings = ('store_and_fwd_flag',)
dtimes = ('tpep_pickup_datetime', 'tpep_dropoff_datetime')


dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

taxi_dir = "../raw_data/yellow_tripdata_20"
#Using a dataset to form the schema
sdf = spark.read.csv(f"{taxi_dir}18-01.csv", header = True)

schema = StructType()
for column in sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )
#importing the taxi datasets in dictionaries with schemas


taxi18 = {str(i).zfill(2): spark.read.csv(f"{taxi_dir}18-{str(i).zfill(2)}.csv",
                                                     header = True, schema = schema) for i in range(1, 13)}
taxi19 = {str(i).zfill(2): spark.read.csv(f"{taxi_dir}19-{str(i).zfill(2)}.csv",
                                                     header = True, schema = schema) for i in range(1, 13)}


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
#merging the taxi zones and the NTA into a new dataframe
merged_zones = gpd.sjoin(taxi_zones, nta, how = 'right')


In [5]:
# create and form a dictionary to convert the Taxi Zone LocationID into NTA Codes
# based on the merged geojson file.
taxi_zone_to_nta = {}

for index, row in merged_zones.iterrows():
    taxi_zone_to_nta[row['OBJECTID']] = row['NTACode']


In [7]:
from pyspark.sql.functions import *
#summarising all the 2018 data into 1 dataframe
full_2018 = taxi18['01']
for i in range(2, 13):
    full_2018 = full_2018.union(taxi18[str(i).zfill(2)])

#filter out all payment types not credit card
full_2018 = full_2018.filter(full_2018.payment_type == 1)

#filter out all payment types that involve Newark Airport
full_2018 = full_2018.filter(full_2018.PULocationID != 1)
full_2018 = full_2018.filter(full_2018.DOLocationID != 1)

#filter out all negative trip distances
full_2018 = full_2018.filter(full_2018.trip_distance > 0)

#remove all ratecodes not contained within New York City
full_2018 = full_2018.filter((full_2018.RatecodeID != 3) & (full_2018.RatecodeID != 4) & 
                             (full_2018.RatecodeID != 99))

#remove all LocationIDs that are unexplained (posed to be out of state)
full_2018 = full_2018.filter((full_2018.PULocationID != 264) & (full_2018.PULocationID != 265)
                            & (full_2018.DOLocationID != 264) & (full_2018.DOLocationID != 265))

#remove all trips not in 2018
full_2018.filter(year(full_2018.tpep_pickup_datetime) != 2018).limit(5)

#creating a new column corresponding to trip duration in minutes
full_2018 = full_2018.withColumn("trip_duration", 
                                (full_2018.tpep_pickup_datetime.cast('long') - 
                                full_2018.tpep_dropoff_datetime.cast('long'))/60)

#filter out all negative trip durations
full_2018 = full_2018.filter(full_2018.trip_duration > 0)

In [8]:
full_2018.write.format('parquet').save('../preprocessed_data/preprocessed_2018_taxi_data.parquet')

                                                                                