In [0]:
from pyspark.sql import SparkSession
import time

#PART 1: Data Ingestion and Preperation

[1.1] Download the dataset for yellow and green taxi cabs from Jan 2019 to July 2021.

[1.2] Load the dataset into your Azure Blob Storage, then convert your dataset into parquet files. Explain the benefits of using parquet instead of csv.

In [0]:
# initialise spark session
spark = SparkSession.builder.appName('assignment-2').getOrCreate()

In [0]:
# mount databricks to azure container
# skip if azure is already mounted

storage_account_name = ""
storage_account_access_key = ""
blob_container_name = "assignment-2"

dbutils.fs.mount(
  source = f'wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net',
  mount_point = f'/mnt/{blob_container_name}/',
  extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net': storage_account_access_key}
)

Out[6]: True

In [0]:
def df_length(df):
    return df.count(), df.columns.count

In [0]:
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/yellow_clean',True)
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/green_clean',True)
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned',True)

Out[3]: True

In [0]:
# read csvs and group by yellow taxis and green taxis

# specify csv path
green_path = "/mnt/assignment-2/green*"
yellow_path = "/mnt/assignment-2/yellow*"

#specify taxi type
yellow_taxi_type = yellow_path.split('/')[-2]
green_taxi_type = green_path.split('/')[-2]


# CSV options
infer_schema = "true"
first_row_is_header = "true"
skip_corrupted_data = "DROPMALFORMED"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
green_raw = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("mode", skip_corrupted_data) \
  .option("sep", delimiter) \
  .load(green_path)

yellow_raw = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("mode", skip_corrupted_data) \
  .option("sep", delimiter) \
  .load(yellow_path)

In [0]:
# convert dataframes to parquet files
# SAVED RAW PARQUET FILES. 
green_raw.write.parquet('dbfs:/mnt/assignment-2/parquet/green_raw', mode='append')
yellow_raw.write.parquet('dbfs:/mnt/assignment-2/parquet/yellow_raw', mode='append')

[1.3] Count the total numbers of rows for each taxi colour (yellow and green).

In [0]:
# read parquet files
# READ RAW PARQUET FILES
df_green = spark.read.parquet("dbfs:/mnt/assignment-2/parquet/green_raw")
df_yellow = spark.read.parquet("dbfs:/mnt/assignment-2/parquet/yellow_raw")

In [0]:
df_yellow.count()

Out[6]: 124048218

In [0]:
df_green.count()

Out[78]: 8348567

[1.4] Explore the dataset and perform any required data cleaning to remove unrealistic trips (null values, extreme values, illogical values). You can use pyspark or sparksql.

In [0]:
# LOAD PARQUET FILES TO TABLE FOR CLEANING
dbutils.fs.rm('dbfs:/user/hive/warehouse/green_taxi_raw_table',True)
dbutils.fs.rm('dbfs:/user/hive/warehouse/yellow_taxi_raw_table',True)
df_green.write.saveAsTable("green_taxi_raw_table")
df_yellow.write.saveAsTable("yellow_taxi_raw_table")

Data Cleaning for the Green Taxis

In [0]:
# CLONE RAW DATA AND PERFORM QUERIES ON CLONED DATA 
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/green_df',True)
df_green_edit = spark.sql('select * from green_taxi_raw_table')
df_green_edit.write.parquet('dbfs:/mnt/assignment-2/parquet/green_df', mode='append')

In [0]:
# The maximum amount of passengers allowed in a yellow taxicab by law is four (4) in a four (4) passenger taxicab or five (5) passengers in a five (5) passenger taxicab, except that an additional passenger must be accepted if such passenger is under the age of seven (7) and is held on the lap of an adult passenger seated in the rear.

df_green_edit = df_green_edit.where("passenger_count < 7 AND passenger_count > 0");
df_green_edit = df_green_edit.where("trip_type = 1 OR trip_type = 2");
df_green_edit = df_green_edit.where("trip_distance > 0");
df_green_edit = df_green_edit.where("fare_amount > 2.5"); # -- minimum fare amount in nyc
df_green_edit = df_green_edit.where("tip_amount >= 0");
df_green_edit = df_green_edit.where("tolls_amount >= 0");
df_green_edit = df_green_edit.where("total_amount > 0");
df_green_edit = df_green_edit.where("improvement_surcharge = 0 OR improvement_surcharge = 0.3");
df_green_edit = df_green_edit.where("mta_tax = 0 OR mta_tax = 0.5");
df_green_edit = df_green_edit.where("extra = 1 OR extra = 0.5 OR extra = 0");
df_green_edit = df_green_edit.where("YEAR(lpep_pickup_datetime) = 2019 OR YEAR(lpep_pickup_datetime) = 2020 OR (YEAR(lpep_pickup_datetime) = 2021 AND MONTH(lpep_pickup_datetime) < 8)");
df_green_edit = df_green_edit.where("total_amount <= 350 OR fare_amount <= 350");
df_green_edit = df_green_edit.where("DATEDIFF(second, lpep_pickup_datetime, lpep_dropoff_datetime) > 0 AND DATEDIFF(second, lpep_pickup_datetime, lpep_dropoff_datetime) <= 10800");
df_green_edit = df_green_edit.where("trip_distance <= 75");
df_green_edit = df_green_edit.where("DATEDIFF(DAY, lpep_pickup_datetime,lpep_dropoff_datetime) = 0 OR DATEDIFF(DAY, lpep_pickup_datetime,lpep_dropoff_datetime) = 1");
# df_green_edit = df_green_edit.where("trip_distance/DATEDIFF(HOUR, lpep_pickup_datetime, lpep_dropoff_datetime) < 80")

In [0]:
df_green_edit.write.parquet('dbfs:/mnt/assignment-2/parquet/green_clean', mode='append')

Data Cleaning for the Yellow Taxis

In [0]:
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/yellow_df',True)
df_yellow_edit = spark.sql('select * from yellow_taxi_raw_table')
df_yellow_edit.write.parquet('dbfs:/mnt/assignment-2/parquet/yellow_df', mode='append')

In [0]:
df_yellow_edit = df_yellow_edit.where("passenger_count < 7 AND passenger_count > 0");
df_yellow_edit = df_yellow_edit.where("trip_distance > 0");
df_yellow_edit = df_yellow_edit.where("fare_amount > 2.5"); 
df_yellow_edit = df_yellow_edit.where("tip_amount >= 0");
df_yellow_edit = df_yellow_edit.where("tolls_amount >= 0");
df_yellow_edit = df_yellow_edit.where("total_amount > 0");
df_yellow_edit = df_yellow_edit.where("improvement_surcharge = 0 OR improvement_surcharge = 0.3");
df_yellow_edit = df_yellow_edit.where("mta_tax = 0 OR mta_tax = 0.5");
df_yellow_edit = df_yellow_edit.where("extra = 1 OR extra = 0.5 OR extra = 0");
df_yellow_edit = df_yellow_edit.where("YEAR(tpep_pickup_datetime) = 2019 OR YEAR(tpep_pickup_datetime) = 2020 OR (YEAR(tpep_pickup_datetime) = 2021 AND MONTH(tpep_pickup_datetime) < 8)");
df_yellow_edit = df_yellow_edit.where("total_amount <= 250 OR fare_amount <= 250");
df_yellow_edit = df_yellow_edit.where("DATEDIFF(second, tpep_pickup_datetime, tpep_dropoff_datetime) > 0 AND DATEDIFF(second, tpep_pickup_datetime, tpep_dropoff_datetime) <= 10800");
df_yellow_edit = df_yellow_edit.where("trip_distance <= 75");
df_yellow_edit = df_yellow_edit.where("DATEDIFF(DAY, tpep_pickup_datetime, tpep_dropoff_datetime) = 0 OR DATEDIFF(DAY, tpep_pickup_datetime, tpep_dropoff_datetime) = 1");

In [0]:
df_yellow_edit.write.parquet('dbfs:/mnt/assignment-2/parquet/yellow_clean', mode='append')

[1.5] Combine the yellow and green taxi dataset together (their schema are not exactly the same).

In [0]:
dbutils.fs.rm('dbfs:/mnt/assignment-2/taxis',True)

Out[27]: True

In [0]:
spark.sql("DROP TABLE IF EXISTS taxi")

sql_statement = '''
  CREATE TABLE taxi (
    taxi_type string NOT NULL,
    year int NOT NULL,
    filename string NOT NULL,
    diff_sec long NOT NULL,
    vendor_id string,
    pickup_datetime timestamp,
    pickup_location_id string,
    dropoff_datetime timestamp,
    dropoff_location_id string,
    speed decimal(9, 5),
    passenger_count integer,
    trip_distance decimal(9, 5),
    payment_type string,
    rate_code string,
    store_and_forward string,
    fare_amount decimal(8, 2),
    extra decimal(8, 2),
    mta_tax decimal(8, 2),
    tip_amount decimal(8, 2),
    tolls_amount decimal(8, 2),
    surcharge decimal(8, 2),
    total_amount decimal(8, 2)
  )
  USING DELTA
  PARTITIONED BY (taxi_type, year)
  LOCATION '{}'
'''.format("dbfs:/mnt/assignment-2/taxis")

spark.sql(sql_statement)

Out[28]: DataFrame[]

In [0]:
import pyspark.sql.functions as pyf
from pyspark.sql import Row


#standardise column names for both datasets
pathlist = ["dbfs:/mnt/assignment-2/parquet/green_clean", "dbfs:/mnt/assignment-2/parquet/yellow_clean"]
filelist = []

for file in pathlist:
    print("Processing file {}".format(file))
    df = spark.read.parquet(file)
    taxi_type = file.split('/')[4].split('_')[0]
    print(taxi_type)
    print(df.columns)

#     filelist.append(df)
    for n in df.schema.names:
        clean_name = n.strip().lower()
        if n != clean_name:
            df = df.withColumnRenamed(n, clean_name)
    
    # Convert columns to standard names
    if "dolocationid"          in df.schema.names: df = df.withColumnRenamed("dolocationid", "dropoff_location_id")
    if "fare_amount"           in df.schema.names: df = df.withColumnRenamed("fare_amount", "fare_amount")
    if "improvement_surcharge" in df.schema.names: df = df.withColumnRenamed("improvement_surcharge", "surcharge")
    if "lpep_dropoff_datetime" in df.schema.names: df = df.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
    if "lpep_pickup_datetime"  in df.schema.names: df = df.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
    if "pickup_date"           in df.schema.names: df = df.withColumnRenamed("pickup_date", "pickup_datetime")
    if "pulocationid"          in df.schema.names: df = df.withColumnRenamed("pulocationid", "pickup_location_id")
    if "ratecodeid"            in df.schema.names: df = df.withColumnRenamed("ratecodeid", "rate_code")
    if "store_and_fwd_flag"    in df.schema.names: df = df.withColumnRenamed("store_and_fwd_flag", "store_and_forward")
    if "tip_amount"            in df.schema.names: df = df.withColumnRenamed("tip_amount", "tip_amount")
    if "tolls_amount"          in df.schema.names: df = df.withColumnRenamed("tolls_amount", "tolls_amount")
    if "total_amount"          in df.schema.names: df = df.withColumnRenamed("total_amount", "total_amount")
    if "tpep_dropoff_datetime" in df.schema.names: df = df.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    if "tpep_pickup_datetime"  in df.schema.names: df = df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    if "vendorid"              in df.schema.names: df = df.withColumnRenamed("vendorid", "vendor_id")
    
    target_cols = sqlContext.table("taxi").dtypes
    
    timeFmt = "yyyy-MM-dd'T'HH:mm:ss"

    df = df.withColumn("year", pyf.year("pickup_datetime"))
    df = df.withColumn("filename", pyf.input_file_name())
    df = df.withColumn("taxi_type", pyf.lit(taxi_type))
    df = df.withColumn("diff_sec", (pyf.unix_timestamp('dropoff_datetime', format=timeFmt)
            - pyf.unix_timestamp('pickup_datetime', format=timeFmt)))
    df = df.withColumn("speed", (pyf.lit(pyf.col("trip_distance")))/((pyf.lit(pyf.col("diff_sec")))/3600))
  
    # Add missing columns
    for tc in target_cols:
        if tc[0] not in df.schema.names:
            df = df.withColumn(tc[0], pyf.lit(None))

    # Make columns datatype match target
    for tc in target_cols:
        ac = next(x for x in df.dtypes if x[0] == tc[0])
        if ac[1] != tc[1]:
            df = df.withColumn(ac[0], df[ac[0]].cast(tc[1]))

    # Reorder columns to match target
    df = df.select([x[0] for x in target_cols])

    df.write.insertInto("taxi", overwrite = False)

Processing file dbfs:/mnt/assignment-2/parquet/green_clean
green
['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']
Processing file dbfs:/mnt/assignment-2/parquet/yellow_clean
yellow
['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']


In [0]:
dbutils.fs.rm('dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned',True)
df = spark.sql('select * from taxi')
df = df.dropna()
print('number of yellow taxi records: ', df.where("taxi_type = 'yellow'").count())
print('number of green taxi records: ', df.where("taxi_type = 'green'").count())
df.write.parquet('dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned', mode='append')

number of yellow taxi records:  83111350
number of green taxi records:  6674292


# PART 2: Business Questions (Only use SparkSQL)

In [0]:
dbutils.fs.rm('dbfs:/user/hive/warehouse/taxi_table',True)
df = spark.read.parquet("dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned/")
df = df.where("speed < 100")
df.write.saveAsTable("taxi_table")

In [0]:
df.columns

Out[3]: ['taxi_type',
 'year',
 'filename',
 'diff_sec',
 'vendor_id',
 'pickup_datetime',
 'pickup_location_id',
 'dropoff_datetime',
 'dropoff_location_id',
 'speed',
 'passenger_count',
 'trip_distance',
 'payment_type',
 'rate_code',
 'store_and_forward',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'surcharge',
 'total_amount']

### PART 2 question 1

[2.1.1] For each year and month, what was the total number of trips?

In [0]:
%sql 
SELECT year, 
MONTH(pickup_datetime) as month,
COUNT(*) as tot_trips
FROM taxi_table
GROUP BY year, month
ORDER BY year, month;

year,month,tot_trips
2019,1,8032786
2019,2,5125469
2019,3,5554256
2019,4,5228068
2019,5,5347092
2019,6,4935304
2019,7,4505296
2019,8,4314779
2019,9,4646424
2019,10,5081886


[2.1.2] For each year and month, which day of week (e.g. monday, tuesday, etc..) had the most trips?

In [0]:
%sql 
SELECT
  year,
  MONTH(pickup_datetime) as month,
  dayofweek(pickup_datetime) as day,
  count(*) as count,
  DENSE_RANK() OVER(PARTITION BY year, first(month(pickup_datetime)) order by count(*) desc) as rk
FROM taxi_table
group by year, month, day
order by rk
limit 31;



year,month,day,count,rk
2019,11,7,855639,1
2020,9,4,166217,1
2019,2,6,880900,1
2019,3,6,969371,1
2019,4,3,878006,1
2019,5,5,926548,1
2019,6,7,827437,1
2019,7,4,787674,1
2019,8,5,768368,1
2019,9,1,690143,1


[2.1.3] For each year and month, which hour of the day had the most trips?

In [0]:
%sql 
SELECT
  year,
  MONTH(pickup_datetime) as month,
  HOUR(pickup_datetime) as hour,
  COUNT(*) as count,
  DENSE_RANK() OVER(PARTITION BY year, first(month(pickup_datetime)) order by count(*) desc) as rk
FROM taxi_table
group by year, month, hour
order by rk
limit 31;

year,month,hour,count,rk
2020,3,18,149905,1
2021,6,18,137374,1
2019,2,18,340643,1
2019,3,18,363937,1
2019,1,18,536176,1
2019,5,18,350655,1
2019,6,18,308012,1
2019,8,18,279691,1
2019,9,18,304948,1
2019,10,18,336433,1


[2.1.4]  For each year and month, what was the average number of passengers?

In [0]:
%sql
SELECT year,
MONTH(pickup_datetime) as month,
AVG(passenger_count) as average
from taxi_table
GROUP BY year, month
ORDER BY year, month

year,month,average
2019,1,1.5715857487053682
2019,2,1.7128290113548634
2019,3,1.7236074462538278
2019,4,1.7184646412403206
2019,5,1.712572553455224
2019,6,1.6999066318913687
2019,7,1.6995369449643265
2019,8,1.6983018133721333
2019,9,1.6820686187915697
2019,10,1.666261698904698


[2.1.5]  For each year and month, what was the average amount paid per trip (using total_amount)?

In [0]:
%sql
SELECT year,
MONTH(pickup_datetime) as month,
AVG(total_amount)as average
from taxi_table
GROUP BY year, month
ORDER BY year, month

year,month,average
2019,1,15.303587
2019,2,18.39618
2019,3,18.995435
2019,4,18.988574
2019,5,19.343212
2019,6,19.477962
2019,7,19.219752
2019,8,19.257019
2019,9,19.535411
2019,10,19.37916


[2.1.6]  For each year and month, what was the average amount paid per passenger (using total_amount)?

In [0]:
%sql
SELECT year,
MONTH(pickup_datetime) as month,
AVG(total_amount)/AVG(passenger_count) as average
from taxi_table
GROUP BY year, month
ORDER BY year, month

year,month,average
2019,1,9.737672292209764
2019,2,10.740231440526836
2019,3,11.020743175185046
2019,4,11.049732152937862
2019,5,11.294827749617872
2019,6,11.45825402088597
2019,7,11.308816826222875
2019,8,11.3389851252431
2019,9,11.613920372662689
2019,10,11.630321943269005


### PART 2 question 2

[2.2.1] For each taxi colour (yellow and green), what was the average, median, minimum and maximum trip duration in seconds?

In [0]:
%sql
SELECT
taxi_type, 
AVG(diff_sec) as AVG, 
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY diff_sec) as MED,
PERCENTILE_CONT(0) WITHIN GROUP (ORDER BY diff_sec) as MIN,
PERCENTILE_CONT(1) WITHIN GROUP (ORDER BY diff_sec) as MAX
FROM taxi_table
GROUP BY taxi_type;

taxi_type,AVG,MED,MIN,MAX
yellow,835.3005122166585,655.0,1.0,10800.0
green,868.355557621668,656.0,1.0,10792.0


[2.2.2] For each taxi colour (yellow and green), what was the average, median, minimum and maximum trip distance in km?

In [0]:
%sql
SELECT
taxi_type, 
AVG(trip_distance) as AVG, 
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY trip_distance) as MED,
ROUND(MIN(trip_distance*1.609344),0) as MIN, 
MAX(trip_distance*1.609344) as MAX
FROM taxi_table
GROUP BY taxi_type;

taxi_type,AVG,MED,MIN,MAX
yellow,3.018663735,1.68,0,120.7008
green,3.094732645,1.86,0,120.0570624


[2.2.3] For each taxi colour (yellow and green), what was the average, median, minimum and maximum speed in km per hour?

In [0]:
%sql
SELECT
taxi_type, 
ROUND(AVG(speed*1.609344),2) as AVG, 
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY speed),2) as MED,
ROUND(MIN(speed*1.609344),2) as MIN, 
ROUND(MAX(speed*1.609344),2) as MAX
FROM taxi_table
GROUP BY taxi_type;

taxi_type,AVG,MED,MIN,MAX
green,19.44,10.78,0.01,160.71
yellow,19.14,10.34,0.01,160.84


### PART 2 question 3

[2.3] What was the percentage of trips where the driver received tips?

In [0]:
%sql
SELECT 
ROUND(SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END) / COUNT(total_amount) * 100, 2) as percentage_tipped
FROM taxi_table

percentage_tipped
67.63


### PART 2 question 4

[2.4] For trips where the driver received tips, what was the percentage where the driver received tips of at least $10.

In [0]:
%sql
SELECT
 ROUND(SUM(CASE WHEN tip_amount >= 10 THEN 1 ELSE 0 END) / SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END) * 100, 2) as percentage_tipped_over_10
FROM taxi_table

percentage_tipped_over_10
3.37


### PART 2 question 5

Classify each trip into bins of durations:
- Under 5 Mins
- From 5 mins to 10 mins
- From 10 mins to 20 mins
- From 20 mins to 30 mins
- At least 30 mins

In [0]:
%sql
SELECT
    CASE
        WHEN (diff_sec) < 300 THEN 'Under 5 Mins'
        WHEN (diff_sec) >= 300 AND (diff_sec) < 600 THEN 'From 5 mins to 10 mins'
        WHEN (diff_sec) >= 600 AND (diff_sec) < 1200 THEN 'From 10 mins to 20 mins'
        WHEN (diff_sec) >= 1200 AND (diff_sec) < 1800 THEN 'From 20 mins to 30 mins'
        ELSE 'At least 30 mins'
    END AS bucket,
    COUNT(*) AS count
FROM taxi_table
GROUP BY bucket
ORDER BY bucket

bucket,count
At least 30 mins,7068857
From 10 mins to 20 mins,31218767
From 20 mins to 30 mins,10893217
From 5 mins to 10 mins,27216669
Under 5 Mins,13342433


Then for each bins, calculate: 
- Average speed (km per hour)
- Average distance per dollar (km per $)

In [0]:
%sql
SELECT
    CASE
        WHEN (diff_sec) < 300 THEN 'Under 5 Mins'
        WHEN (diff_sec) >= 300 AND (diff_sec) < 600 THEN 'From 5 mins to 10 mins'
        WHEN (diff_sec) >= 600 AND (diff_sec) < 1200 THEN 'From 10 mins to 20 mins'
        WHEN (diff_sec) >= 1200 AND (diff_sec) < 1800 THEN 'From 20 mins to 30 mins'
        ELSE 'At least 30 mins'
    END AS bucket,
    ROUND(AVG(speed*1.609344),2) AS avg_speed,
    ROUND(AVG(trip_distance*1.609344) / AVG(total_amount),2) as avg_distance_per_dollar
FROM taxi_table
GROUP BY bucket
ORDER BY bucket

bucket,avg_speed,avg_distance_per_dollar
At least 30 mins,27.13,0.37
From 10 mins to 20 mins,17.83,0.24
From 20 mins to 30 mins,22.17,0.31
From 5 mins to 10 mins,17.07,0.18
Under 5 Mins,19.86,0.12


### PART 2 question 6

Which duration bin will you advise a taxi driver to target to maximise his income?
* Based on the query results produced, it is advised for a taxi driver to target people in a 0-5 minute bin to maximise his income. 
* Maximising income is calculated by minimising distance per dollar. 
* In this case, the 0-5 minute bin will maximise the taxi drivers' income as this bin has the lowest average distance per dollar ratio, mea

# PART 3: Machine Learning
Build at least two different ML models using Spark ML pipelines to predict the Total fare amount of a trip:
- Use the 2019 and 2020 data to train and validate your models
- Use the RMSE score to assess your models.
- Choose your best model and explain why you choose it.
- Using your best model, predict the 2021 data.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.sql.functions import col,isnan, when, count 
import pyspark.sql.functions as pyf

In [0]:
# dbutils.fs.rm('dbfs:/user/hive/warehouse/taxi_table',True)
df = spark.read.parquet("dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned/")
df = df.where("speed < 100")
df.count()


Out[49]: 89739943

### [3.1.1] Data Cleaning and Preperation

In [0]:
df.printSchema()

root
 |-- taxi_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- filename: string (nullable = true)
 |-- diff_sec: long (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- speed: decimal(9,5) (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: decimal(9,5) (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_forward: string (nullable = true)
 |-- fare_amount: decimal(8,2) (nullable = true)
 |-- extra: decimal(8,2) (nullable = true)
 |-- mta_tax: decimal(8,2) (nullable = true)
 |-- tip_amount: decimal(8,2) (nullable = true)
 |-- tolls_amount: decimal(8,2) (nullable = true)
 |-- surcharge: decimal(8,2) (nullable = true)
 |-- total_amount: decimal(8,2) (nulla

In [0]:
# feature engineering 
df = df.withColumn('unix_pickup_datetime', pyf.unix_timestamp(pyf.col('pickup_datetime'), "yyyy-MM-dd'T'HH:mm:ssX"))
df = df.withColumn('unix_dropoff_datetime', pyf.unix_timestamp(pyf.col('dropoff_datetime'), "yyyy-MM-dd'T'HH:mm:ssX"))

In [0]:
cols_list = ['year','unix_pickup_datetime','unix_dropoff_datetime', 'speed', 'pickup_location_id', 'dropoff_location_id','trip_distance','tip_amount','total_amount']
df = df.select(cols_list)

In [0]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- unix_pickup_datetime: long (nullable = true)
 |-- unix_dropoff_datetime: long (nullable = true)
 |-- speed: decimal(9,5) (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: decimal(9,5) (nullable = true)
 |-- tip_amount: decimal(8,2) (nullable = true)
 |-- total_amount: decimal(8,2) (nullable = true)



Categorical Column Transformation.

In [0]:
cat_cols = ['pickup_location_id', 'dropoff_location_id']
stages = []

In [0]:
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

Numeric Column Transformation.

In [0]:
num_cols = ['year','unix_pickup_datetime','unix_dropoff_datetime','speed','trip_distance','tip_amount']
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]
print(cat_cols_ohe)

['pickup_location_id_ohe', 'dropoff_location_id_ohe']


In [0]:
label = 'total_amount'

### Create Pipeline

In [0]:
# dbutils.fs.rm('dbfs:/mnt/assignment-2/pipeline/',True)
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages=stages)
# pipeline.write().overwrite().save("dbfs:/mnt/assignment-2/pipeline/")

In [0]:
filtered_data = df.where('year = 2019 OR year=2020')
prediction_data = df.where('year = 2021')
pipeline_model = pipeline.fit(filtered_data)
prediction_pipeline_model = pipeline.fit(prediction_data)

In [0]:
filtered_data = pipeline_model.transform(filtered_data)
filtered_data.show()


+----+--------------------+---------------------+--------+------------------+-------------------+-------------+----------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|year|unix_pickup_datetime|unix_dropoff_datetime|   speed|pickup_location_id|dropoff_location_id|trip_distance|tip_amount|total_amount|pickup_location_id_ind|pickup_location_id_ohe|dropoff_location_id_ind|dropoff_location_id_ohe|            features|
+----+--------------------+---------------------+--------+------------------+-------------------+-------------+----------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|2019|          1546303600|           1546304000|13.50000|               151|                239|      1.50000|      1.65|        9.95|                  38.0|      (262,[38],[1.0])|                    8.0|        (263,[8],[1.0])|(531,[38,270,525,.

In [0]:
prediction_data = prediction_pipeline_model.transform(prediction_data)
prediction_data.show()

+----+--------------------+---------------------+--------+------------------+-------------------+-------------+----------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|year|unix_pickup_datetime|unix_dropoff_datetime|   speed|pickup_location_id|dropoff_location_id|trip_distance|tip_amount|total_amount|pickup_location_id_ind|pickup_location_id_ohe|dropoff_location_id_ind|dropoff_location_id_ohe|            features|
+----+--------------------+---------------------+--------+------------------+-------------------+-------------+----------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|2021|          1625099431|           1625099839| 9.00000|               230|                164|      1.02000|      0.00|       10.30|                  24.0|      (262,[24],[1.0])|                   20.0|       (260,[20],[1.0])|(528,[24,282,522,.

In [0]:
it1_filtered_data = filtered_data.select([label, "features"])
it1_filtered_data.show()

+------------+--------------------+
|total_amount|            features|
+------------+--------------------+
|        9.95|(531,[38,270,525,...|
|       37.56|(531,[11,273,525,...|
|       16.30|(531,[12,283,525,...|
|        8.80|(531,[21,275,525,...|
|        9.05|(531,[13,282,525,...|
|       11.62|(531,[13,289,525,...|
|       18.50|(531,[22,311,525,...|
|        6.96|(531,[15,281,525,...|
|       13.00|(531,[15,272,525,...|
|       14.80|(531,[0,298,525,5...|
|       19.55|(531,[29,266,525,...|
|       13.56|(531,[26,268,525,...|
|        8.50|(531,[19,300,525,...|
|       16.80|(531,[13,272,525,...|
|       42.95|(531,[13,336,525,...|
|        5.80|(531,[32,264,525,...|
|       28.50|(531,[70,336,525,...|
|       14.80|(531,[17,302,525,...|
|       15.30|(531,[15,272,525,...|
|        7.30|(531,[14,278,525,...|
+------------+--------------------+
only showing top 20 rows



In [0]:
it1_prediction_data = prediction_data.select(["features"])
it1_prediction_data.show()

+--------------------+
|            features|
+--------------------+
|(528,[24,282,522,...|
|(528,[55,277,522,...|
|(528,[21,280,522,...|
|(528,[19,310,522,...|
|(528,[12,292,522,...|
|(528,[53,266,522,...|
|(528,[23,273,522,...|
|(528,[17,271,522,...|
|(528,[6,268,522,5...|
|(528,[26,268,522,...|
|(528,[9,291,522,5...|
|(528,[11,263,522,...|
|(528,[47,286,522,...|
|(528,[2,267,522,5...|
|(528,[19,308,522,...|
|(528,[7,274,522,5...|
|(528,[26,298,522,...|
|(528,[10,342,522,...|
|(528,[89,291,522,...|
|(528,[21,281,522,...|
+--------------------+
only showing top 20 rows



In [0]:
train_data, test_data = it1_filtered_data.randomSplit([0.8, 0.2], seed=8)

### Train Linear Regression Model

In [0]:
lr_model=LinearRegression(featuresCol='features', labelCol=label) 

In [0]:
lr_model = lr_model.fit(train_data)

In [0]:
trainingSummary = lr_model.summary
print("Root Mean Squared Error (RMSE) on train data: %f" % trainingSummary.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on train data: 3.450273


In [0]:
train_data.describe().show()

In [0]:
lr_predictions = lr_model.transform(test_data)
lr_predictions.select("prediction","total_amount","features").show(5)

test_result = lr_model.evaluate(test_data)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

+------------------+------------+--------------------+
|        prediction|total_amount|            features|
+------------------+------------+--------------------+
| 7.190240120515227|        3.80|(531,[0,262,525,5...|
| 5.154400973580778|        3.80|(531,[0,262,525,5...|
| 5.653303546831012|        3.80|(531,[0,262,525,5...|
|  6.46763267647475|        3.80|(531,[0,262,525,5...|
|6.3553484147414565|        3.80|(531,[0,262,525,5...|
+------------------+------------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1574.31


In [0]:
lr_predictions.describe().show()

+-------+------------------+------------------+
|summary|      total_amount|        prediction|
+-------+------------------+------------------+
|  count|          15950447|          15950447|
|   mean|         18.455386|18.060296223163558|
| stddev|14.125647881896606|1574.3580609774983|
|    min|              2.60|-6287417.822968636|
|    max|            949.80| 866.0271283024922|
+-------+------------------+------------------+



### Machine Learning Model 2

In [0]:
df = spark.read.parquet("dbfs:/mnt/assignment-2/parquet/taxi_appended_cleaned/")
df = df.where("speed < 100")
df.count()

Out[74]: 89739943

In [0]:
df2 = df
df2 = df2.withColumn('unix_pickup_datetime', pyf.unix_timestamp(pyf.col('pickup_datetime'), "yyyy-MM-dd'T'HH:mm:ssX"))
df2 = df2.withColumn('unix_dropoff_datetime', pyf.unix_timestamp(pyf.col('dropoff_datetime'), "yyyy-MM-dd'T'HH:mm:ssX"))

In [0]:
df2.printSchema()

root
 |-- taxi_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- filename: string (nullable = true)
 |-- diff_sec: long (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- speed: decimal(9,5) (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: decimal(9,5) (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_forward: string (nullable = true)
 |-- fare_amount: decimal(8,2) (nullable = true)
 |-- extra: decimal(8,2) (nullable = true)
 |-- mta_tax: decimal(8,2) (nullable = true)
 |-- tip_amount: decimal(8,2) (nullable = true)
 |-- tolls_amount: decimal(8,2) (nullable = true)
 |-- surcharge: decimal(8,2) (nullable = true)
 |-- total_amount: decimal(8,2) (nulla

In [0]:
cols_list = ['year','unix_pickup_datetime','unix_dropoff_datetime', 'diff_sec', 'speed', 'passenger_count','pickup_location_id', 'dropoff_location_id','trip_distance','tip_amount','tolls_amount','surcharge','total_amount']
df2 = df2.select(cols_list)

In [0]:
df2.printSchema()

root
 |-- year: integer (nullable = true)
 |-- unix_pickup_datetime: long (nullable = true)
 |-- unix_dropoff_datetime: long (nullable = true)
 |-- diff_sec: long (nullable = true)
 |-- speed: decimal(9,5) (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: decimal(9,5) (nullable = true)
 |-- tip_amount: decimal(8,2) (nullable = true)
 |-- tolls_amount: decimal(8,2) (nullable = true)
 |-- surcharge: decimal(8,2) (nullable = true)
 |-- total_amount: decimal(8,2) (nullable = true)



In [0]:
cat_cols = ['pickup_location_id', 'dropoff_location_id']
stages = []

In [0]:
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

In [0]:
num_cols = ['year','unix_pickup_datetime','unix_dropoff_datetime','speed','diff_sec','passenger_count','tolls_amount','surcharge','trip_distance','tip_amount']
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]
print(cat_cols_ohe)

['pickup_location_id_ohe', 'dropoff_location_id_ohe']


In [0]:
label = "total_amount"

In [0]:
assembler2 = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")
stages += [assembler]
pipeline2 = Pipeline(stages=stages)

In [0]:
filtered_data2 = df2.where('year = 2019 OR year=2020')
prediction_data2 = df2.where('year = 2021')
pipeline_model2 = pipeline2.fit(filtered_data2)
prediction_pipeline_model2 = pipeline2.fit(prediction_data2)

In [0]:
filtered_data2 = pipeline_model2.transform(filtered_data2)
filtered_data2.show()

+----+--------------------+---------------------+--------+--------+---------------+------------------+-------------------+-------------+----------+------------+---------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|year|unix_pickup_datetime|unix_dropoff_datetime|diff_sec|   speed|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|tip_amount|tolls_amount|surcharge|total_amount|pickup_location_id_ind|pickup_location_id_ohe|dropoff_location_id_ind|dropoff_location_id_ohe|            features|
+----+--------------------+---------------------+--------+--------+---------------+------------------+-------------------+-------------+----------+------------+---------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|2019|          1546303600|           1546304000|     400|13.50000|              1|               151| 

In [0]:
prediction_data2 = prediction_pipeline_model2.transform(prediction_data2)
prediction_data2.show()

+----+--------------------+---------------------+--------+--------+---------------+------------------+-------------------+-------------+----------+------------+---------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|year|unix_pickup_datetime|unix_dropoff_datetime|diff_sec|   speed|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|tip_amount|tolls_amount|surcharge|total_amount|pickup_location_id_ind|pickup_location_id_ohe|dropoff_location_id_ind|dropoff_location_id_ohe|            features|
+----+--------------------+---------------------+--------+--------+---------------+------------------+-------------------+-------------+----------+------------+---------+------------+----------------------+----------------------+-----------------------+-----------------------+--------------------+
|2021|          1625099431|           1625099839|     408| 9.00000|              1|               230| 

In [0]:
it2_filtered_data = filtered_data2.select([label, "features"])
it2_filtered_data.show()

+------------+--------------------+
|total_amount|            features|
+------------+--------------------+
|        9.95|(531,[38,270,525,...|
|       37.56|(531,[11,273,525,...|
|       16.30|(531,[12,283,525,...|
|        8.80|(531,[21,275,525,...|
|        9.05|(531,[13,282,525,...|
|       11.62|(531,[13,289,525,...|
|       18.50|(531,[22,311,525,...|
|        6.96|(531,[15,281,525,...|
|       13.00|(531,[15,272,525,...|
|       14.80|(531,[0,298,525,5...|
|       19.55|(531,[29,266,525,...|
|       13.56|(531,[26,268,525,...|
|        8.50|(531,[19,300,525,...|
|       16.80|(531,[13,272,525,...|
|       42.95|(531,[13,336,525,...|
|        5.80|(531,[32,264,525,...|
|       28.50|(531,[70,336,525,...|
|       14.80|(531,[17,302,525,...|
|       15.30|(531,[15,272,525,...|
|        7.30|(531,[14,278,525,...|
+------------+--------------------+
only showing top 20 rows



In [0]:
it2_prediction_data = prediction_data2.select(["features"])
it2_prediction_data.show()

+--------------------+
|            features|
+--------------------+
|(528,[24,282,522,...|
|(528,[55,277,522,...|
|(528,[21,280,522,...|
|(528,[19,310,522,...|
|(528,[12,292,522,...|
|(528,[53,266,522,...|
|(528,[23,273,522,...|
|(528,[17,271,522,...|
|(528,[6,268,522,5...|
|(528,[26,268,522,...|
|(528,[9,291,522,5...|
|(528,[11,263,522,...|
|(528,[47,286,522,...|
|(528,[2,267,522,5...|
|(528,[19,308,522,...|
|(528,[7,274,522,5...|
|(528,[26,298,522,...|
|(528,[10,342,522,...|
|(528,[89,291,522,...|
|(528,[21,281,522,...|
+--------------------+
only showing top 20 rows



In [0]:
train_data2, test_data2 = it2_filtered_data.randomSplit([0.8, 0.2], seed=8)

In [0]:
lr_model2=LinearRegression(featuresCol='features', labelCol=label) 

In [0]:
lr_model2 = lr_model2.fit(train_data2)

In [0]:
trainingSummary2 = lr_model2.summary
print("Root Mean Squared Error (RMSE) on train data: %f" % trainingSummary2.rootMeanSquaredError)

In [0]:
train_data2.describe().show()

In [0]:
lr_predictions2 = lr_model2.transform(test_data2)
lr_predictions2.select("prediction","total_amount","features").show(5)

test_result2 = lr_model2.evaluate(test_data2)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result2.rootMeanSquaredError)

In [0]:
lr_predictions.describe().show()