In [11]:
# Import libraries

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd
from pyspark.sql.functions import date_format

In [12]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZon", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

### Yellow Taxi Preprocessing

In [13]:
# Define the yellow taxis raw train data directory
main_dir = '../data/raw/raw_train/yellow/'
mth = range(1,13)
yr = '2021'

# Define the schema for the spark dataframe
emptyRDD = spark.sparkContext.emptyRDD()
sch = spark.read.parquet('../data/raw/raw_train/yellow/2021-01.parquet')
sdf_yellow_train = spark.createDataFrame(emptyRDD, sch.schema )


# Merge the data from 2021 into a single spark dataframe
for month in mth:
    if month < 10:
         month = str(month).zfill(2)
    sdf = spark.read.parquet(f'{main_dir}{yr}-{month}.parquet')

# The airport_fee column has different data types in different files
# Hence converting into a same data type and joining the dataframes 
# into a single dataframe

    sdf_updated = sdf.withColumn(
        'airport_fee',
        F.col('airport_fee').cast('DOUBLE')
    )
    sdf_yellow_train = sdf_yellow_train.unionByName(sdf_updated)


In [14]:
# Checking for the null and nan values

sdf_yellow_train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
for c in (sdf_yellow_train.columns)[3:]])



+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|        1478695|            0|   1478695|           1478695|           0|           0|           0|          0|    0|      0|         0|           0|                    0|           0|             1478695|    5641418|
+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+------

                                                                                

In [15]:
# Filling the null values in the airport_fee and congestion surcharge column
#  with 0s

sdf_yellow_train = sdf_yellow_train.fillna(value = 0.0, subset=['airport_fee',
 'congestion_surcharge'])

In [16]:
# Dropping the rest of the null values from dataframe

sdf_yellow_train = sdf_yellow_train.dropna()

# Checking if the dataframe has any other missing values
sdf_yellow_train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)
 for c in (sdf_yellow_train.columns)[3:]])



+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|              0|            0|         0|                 0|           0|           0|           0|          0|    0|      0|         0|           0|                    0|           0|                   0|          0|
+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+------

                                                                                

In [17]:
# Converting the pickup and dropoff location columns to an integer

for field in ('PU', 'DO'):
    field = f'{field}LocationID'
    sdf_yellow_train = sdf_yellow_train.withColumn(
        field,
        F.col(field).cast('INT')
    )

In [None]:
# Checking for valid records based on the following conditions

sdf_yellow_train = sdf_yellow_train.withColumn(
    'is_valid_record',
    
    F.when(
        ((F.col('total_amount') > 0) & (F.col('PULocationID').between(1,263)) 
        & (F.col('DOLocationID').between(1,263)) & (F.col('passenger_count')
        .between(1,4)) & (((F.col('tpep_dropoff_datetime').cast("long")) - 
        (F.col('tpep_pickup_datetime').cast("long"))) > 0)),
        True
    ).otherwise(False)
)

In [None]:
# Filtering for trips paid by only credit cards 

sdf_yellow_train.createOrReplaceTempView('yellow_train')

sdf_yellow_train = spark.sql(""" 

SELECT 
    *
FROM 
    yellow_train
WHERE
    Payment_type = 1 AND is_valid_record IS TRUE

""")


In [None]:
# Defining the values of the rate code ids 

sdf_yellow_train = sdf_yellow_train.withColumn('rate_code',
 
    F.when((F.col('RateCodeID') == 1 ), 'Standard')\
    .when((F.col('RateCodeID') == 2 ), 'JFK')\
    .when((F.col('RateCodeID') == 3 ), 'Newark')\
    .when((F.col('RateCodeID') == 4 ), 'Nasau or Westchester')\
    .when((F.col('RateCodeID') == 5 ), 'Negotiated fare')\
    .when((F.col('RateCodeID') == 6 ), 'Shared ride')\
    .otherwise('Standard')
)

In [None]:
# Adding the taxi type column 

sdf_yellow_train.createOrReplaceTempView('temp_yellow')

sdf_yellow_train = spark.sql(""" 

SELECT 
    *,
    'Yellow taxi' AS vehicle_type
FROM 
    temp_yellow
    
""")

In [None]:
# Creating a new column to identify the ride type in the yellow taxi dataset

sdf_yellow_train = sdf_yellow_train.withColumn('vehicle_and_ride_type', 
    F.when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'Standard')), 'Yellow-Standard') \

    .when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'Shared ride')), 'Yellow-Shared ride') \

    .when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'JFK')), 'Yellow-JFK') \

    .when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'Negotiated fare')), 'Yellow-Negotiated fare') \

    .when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'Newark')), 'Yellow-Newark') \
    .when(((F.col('vehicle_type') == 'Yellow taxi') & 
    (F.col('rate_code') == 'Nasau or Westchester')), 
    'Yellow-Nasau or Westchester') \
)

### HVFHV Preprocessing 

In [3]:
# Define the HVFHV raw test data directory

main_dir = '../data/raw/raw_train/FHVHV/'
mth = range(1,13)
yr = '2021'

# Define the schema 
emptyRDD = spark.sparkContext.emptyRDD()
sch = spark.read.parquet('../data/raw/raw_train/FHVHV/2021-01.parquet')
sdf_FHVHV_train = spark.createDataFrame(emptyRDD, sch.schema )

# Converting the data from 2021 into a single spark dataframe
for month in mth:
    if month < 10:
         month = str(month).zfill(2)
    sdf = spark.read.parquet(f'{main_dir}{yr}-{month}.parquet')

    #the airport_fee column has different data types in different files
    #Hence converting into a same data type and joining the dataframes into a 
    # single dataframe

    sdf_updated_FHVHV = sdf.withColumn(
        'airport_fee',
        F.col('airport_fee').cast('DOUBLE')
    )

    sdf_FHVHV_train = sdf_FHVHV_train.unionByName(sdf_updated_FHVHV)

                                                                                

In [None]:
# Checking for the null and nan values

sdf_FHVHV_train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
for c in (sdf_FHVHV_train.columns)[3:]])

In [5]:
# Filling all the numeric columns with 0 inplace of the NULLs

sdf_FHVHV_train = sdf_FHVHV_train.fillna(value = 0.0, 
subset=['base_passenger_fare','base_passenger_fare', 'tolls', 'bcf', 
'sales_tax', 'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay'])

In [None]:
# Selecting only Uber trips

sdf_FHVHV_train.createOrReplaceTempView('FHVHV_view')

sdf_FHVHV_train = spark.sql(""" 

SELECT 
    *
FROM
    FHVHV_view
WHERE 
    hvfhs_license_num = 'HV0003'

""")


In [None]:
#  Checking for the null values not 

cols = ['hvfhs_license_num', 'PULocationID', 'DOLocationID']
sdf_FHVHV_train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)
 for c in cols]).show()

In [None]:
# Converting the pickup and dropoff location ids to integer

for field in ('PU', 'DO'):

    field = f'{field}LocationID'

    sdf_FHVHV_train = sdf_FHVHV_train.withColumn(
        field,

        F.col(field).cast('INT')
    )

In [23]:
# Converting all the numeric columns to double

cols = ['base_passenger_fare', 'tolls', 'bcf', 'sales_tax', 
'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay']

for column in cols:
    
    sdf_FHVHV_train = sdf_FHVHV_train.withColumn(
            column,
            F.col(column).cast('DOUBLE')
        )

In [24]:
# Calculating the total fare amount

sdf_FHVHV_train = sdf_FHVHV_train.withColumn(
    'total_amount',

     (F.col('base_passenger_fare') + F.col('tolls') +
     F.col('bcf')+ F.col('sales_tax') + F.col('congestion_surcharge') + 
     F.col('airport_fee') + F.col('tips'))

)

In [25]:
# Rounding the total amount value to 2 digits

sdf_FHVHV_train = sdf_FHVHV_train.withColumn(
    'total_amount',
    
    F.round('total_amount', 2)
)

In [None]:
# Checking for outliers and labelling the records as valid or invalid based
# on the underlying rules

sdf_FHVHV_train = sdf_FHVHV_train.withColumn(
    'is_valid_record',

    F.when(
         ((F.col('total_amount') > 0) & (F.col('PULocationID').between(1,263)) 
         & (F.col('DOLocationID').between(1,263)) &
         (((F.col('dropoff_datetime').cast("long")) - 
         (F.col('pickup_datetime').cast("long"))) > 0)),
        True
    ).otherwise(False)
)

In [None]:
# Labelling the rides as Uber rudes

sdf_FHVHV_train = sdf_FHVHV_train.withColumn('vehicle_type', 
    F.when((F.col('hvfhs_license_num') == 'HV0003'), 'Uber')

)

In [None]:
# Labelling the ride type based on the differnet flags

sdf_FHVHV_train = sdf_FHVHV_train.withColumn('rate_code', 

    F.when(((F.col('airport_fee') == 0.0) & 
    ((F.col('shared_request_flag') == 'Y') & 
    (F.col('shared_match_flag') == 'Y'))), 'Shared ride')\

    .when(((F.col('airport_fee') == 0.0) & 
    ((F.col('shared_request_flag') == 'Y') & 
    (F.col('shared_match_flag') == 'N'))), 'Shared ride')\

    .when(((F.col('airport_fee') == 0.0) & 
    ((F.col('shared_request_flag') == 'N') & 
    (F.col('shared_match_flag') == 'Y'))), 'Standard')\

    .when(((F.col('airport_fee') == 0.0) & 
    ((F.col('shared_request_flag') == 'N') & 
    (F.col('shared_match_flag') == 'N'))), 'Standard')\
        
    .when((F.col('airport_fee') > 0.0 ),'LaGuardia/Newark/JFK')
)

In [31]:
# Dropping the unnecessary features

sdf_FHVHV_train = sdf_FHVHV_train.drop('Hvfhs_license_num',
'Dispatching_base_num'
,'originating_base_num','request_datetime', 'on_scene_datetime', 
'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
'wav_request_flag','wav_match_flag','driver_pay', 'tolls','bcf','sales_tax',
'congestion_surcharge')

In [None]:
sdf_FHVHV_train = sdf_FHVHV_train.withColumn('vehicle_and_ride_type', 
    when(((F.col('vehicle_type') == 'Uber') & 
    (F.col('rate_code') == 'Standard')), 'Uber-Standard') \

    .when(((F.col('vehicle_type') == 'Uber') & 
    (F.col('rate_code') == 'Shared ride')), 'Uber-Shared ride') \

    .when(((F.col('vehicle_type') == 'Uber') & 
    (F.col('rate_code') == 'LaGuardia/Newark/JFK')), 
    'Uber-LaGuardia/Newark/JFK')
)

### Merging the datasets

In [None]:
# Ordering the columns from the Yellow taxi dataset

sdf_yellow_train.createOrReplaceTempView("final_yellow")

final_sdf_yellow_train = spark.sql(""" 

SELECT 
    tpep_pickup_datetime AS pickup_time,
    tpep_dropoff_datetime AS dropoff_time,
    Trip_distance AS trip_distance,
    PULocationID,
    DOLocationID,
    Fare_amount AS base_fare,
    Tip_amount AS tips,
    Total_amount AS total_amount,
    vehicle_type,
    rate_code,
    vehicle_ride_type
FROM
    final_yellow

""")

In [None]:
# Ordering the columns from the HFVHV taxi dataset

sdf_FHVHV_train.createOrReplaceTempView("final_FHVHV")

final_sdf_FHVHV_train = spark.sql(""" 

SELECT 
    Pickup_datetime AS pickup_time,
    DropOff_datetime AS dropoff_time,
    trip_miles AS trip_distance,
    PULocationID,
    DOLocationID,
    base_passenger_fare AS base_fare,
    tips,
    total_amount,
    vehicle_type,
    rate_code,
    vehicle_and_ride_type
FROM
    final_FHVHV

""")

In [None]:
# Merging the Yellow taxi and HFVHV trips dataset

merged_data = final_sdf_yellow_train.union(final_sdf_FHVHV_train)

In [39]:
# Extracting year from the timestamp

merged_data = merged_data.withColumn("Year", 
date_format('pickup_time', 'yyyy'))

# Extracting month from the timestamp

merged_data = merged_data.withColumn("Month", 
date_format('pickup_time', 'MMMM'))

# Extracting date from the timestamp

merged_data = merged_data.withColumn("Date", 
date_format('pickup_time', 'dd'))

# Extracting day from the timestamp
merged_data = merged_data.withColumn("Day", 
date_format('pickup_time', 'EEEE'))

# Extracting pickup hour from the timestamp

merged_data = merged_data.withColumn("pickup_hour", 
date_format('pickup_time', 'HH'))

# Dropping the timestamp columns

merged_data = merged_data.drop('pickup_time', 'dropoff_time')

In [None]:
# Rearranigng the columns from the merged dataset

merged_data.createOrReplaceTempView("temp")

merged_data = spark.sql("""

SELECT 
    Year, Month, Date, Day, pickup_hour,
    trip_distance, PULocationID, DOLocationID, base_fare, tips, total_amount, 
    vehicle_type, rate_code, vehicle_and_ride_type
FROM 
    temp
"""
)

In [43]:
# Selecting only the standard rides for both the taxi types

final_merged = merged_data.where(

    (F.col('vehicle_and_ride_type') == 'Yellow-Standard')
    |
    (F.col('vehicle_and_ride_type') == 'Uber-Standard')

)

In [None]:
# Dropping the unwanted columns 

final_merged = final_merged.drop('vehicle_type', 'rate_code')

In [None]:
# Saving the merged training file

final_merged.write.parquet("../data/curated/merged_training.paraquet")