# **Assignment 1**
Javiera de la Carrera - 13743354

# Libraries

In [2]:
#spark session
from pyspark.sql import SparkSession

#import spark libraries
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_timestamp,date_format,hour,dayofweek,quarter
from pyspark.sql.functions import col,max, min, sum
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.window import Window

#modeling part
from pyspark.mllib.stat import Statistics
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


#import python libraries
import glob
import pandas as pd
# import numpy as np
# import statsmodels

# %matplotlib inline
# import matplotlib.pyplot as plt
# import seaborn as sns

import os
import sys
import time
import requests
import datetime

# Spark session

In [28]:
#docker
spark = SparkSession.builder \
        .appName('taxis') \
        .config('spark.sql.shuffle.partitions', 1000) \
        .config('spark.sql.repl.eagerEval.enabled',True) \
        .config("spark.driver.memory", "12g")\
        .config("spark.sql.autoBroadcastJoinThreshold",-1)\
        .getOrCreate() 
        
#.config("spark.sql.broadcastTimeout", 36000)\    
#df.rdd.getNumPartitions() #to be used in the middle for partitions

Uncomment below to create Spark Session in GCP

In [None]:
#GCP

### Initialise Spark session

## Builder
# spark = SparkSession.builder \
#                     .appName('taxi') \
#                     .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.17.1') \
#                     .getOrCreate()

# ## To always show the results of DataFrames and improve the formatting of the output, Spark DataFrame into Pandas DataFrame
# spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [3]:
spark.sparkContext._conf.getAll()

[('spark.sql.repl.eagerEval.enabled', 'True'),
 ('spark.sql.shuffle.partitions', '1000'),
 ('spark.driver.memory', '12g'),
 ('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'taxis'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.sql.autoBroadcastJoinThreshold', '-1'),
 ('spark.driver.host', 'd96dce464e23'),
 ('spark.driver.port', '40611'),
 ('spark.app.id', 'local-1618213207967'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.debug.maxToStringFields', '1000')]

# 1. Loading the data: 2019-2020

In [29]:
#find all the taxis files: PLEASE CHANGE THE PATH TO YOUR FOLDER

txtfiles_gt = [file for file in glob.glob("*green_tripdata*.csv")]
txtfiles_yt = [file for file in glob.glob("*yellow_tripdata*.csv")]

Read one csv of each type of taxi to know about the data inside them

In [20]:
#open one csv for green and look the columns
spark.read.csv("green_tripdata_2019-01.csv",header=True).printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [21]:
#open one csv for yellow and look the column
spark.read.csv("yellow_tripdata_2019-01.csv",header=True).printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



After read the Schemas we could realise that the variables of yellow taxis data are almost the same as for green taxis. However, green taxis have 2 more variables. Therefore, they will be loaded separatedly and for the yellow ones, trip_type and ehail_fee will be created.

Also, a schema for each type of data will be created because it is useful to have a faster download. With that Spark does not have to understand the format of the data (it does not have to create the Schema by itself). 

In summary, because of this two conditions, it is handy to create the Schema by ourself.

In [30]:
#the Schemas were created using the data dictionary trip records
#green
green_schema = StructType(
    [StructField("VendorID", StringType(), False),
     StructField("pickup_datetime", TimestampType(), False),
     StructField("dropoff_datetime", TimestampType(), False),
     StructField("store_and_fwd_flag", StringType(), False),
     StructField("RatecodeID", StringType(), False),
     StructField("PULocationID", StringType(), False), #pick up location id
     StructField("DOLocationID", StringType(), False), #drop off location id
     StructField("passenger_count", IntegerType(), False),
     StructField("trip_distance", FloatType(), False), #milles
     StructField("fare_amount", FloatType(), False),
     StructField("extra", FloatType(), False),
     StructField("mta_tax", FloatType(), False),
     StructField("tip_amount", FloatType(), False),
     StructField("tolls_amount", FloatType(), False),
     StructField("ehail_fee", FloatType(), False),
     StructField("improvement_surcharge", FloatType(), False),
     StructField("total_amount", FloatType(), False),
     StructField("payment_type", StringType(), False),
     StructField("trip_type", StringType(), False),
     StructField("congestion_surcharge", FloatType(), False)])


#yellow

yellow_schema = StructType(
    [StructField("VendorID", StringType(), False),
     StructField("pickup_datetime", TimestampType(), False),
     StructField("dropoff_datetime", TimestampType(), False),
     StructField("passenger_count", IntegerType(), False),
     StructField("trip_distance", FloatType(), False),
     StructField("RatecodeID", StringType(), False),
     StructField("store_and_fwd_flag", StringType(), False),
     StructField("PULocationID", StringType(), False),
     StructField("DOLocationID", StringType(), False),
     StructField("payment_type", StringType(), False),
     StructField("fare_amount", FloatType(), False),
     StructField("extra", FloatType(), False),
     StructField("mta_tax", FloatType(), False),
     StructField("tip_amount", FloatType(), False),
     StructField("tolls_amount", FloatType(), False),
     StructField("improvement_surcharge", FloatType(), False),
     StructField("total_amount", FloatType(), False),
     StructField("congestion_surcharge", FloatType(), False),
     ])

In [31]:
#reading yellow csv

yellow = spark.read.option("header",True) \
    .schema(yellow_schema) \
    .csv(txtfiles_yt) \
    .withColumn("taxi_type", lit("yellow").cast(StringType())) \
    .withColumn("ehail_fee", lit(0.0).cast(FloatType())) \
    .withColumn("trip_type", lit("no").cast(StringType()))

#lit to put in all rows the value defined 
#trip type: I put another value, not nan because the green taxis have nan
#taxi_type: new variable to know if is a yellow or green taxi
#ehail_fee: new variable from green without a description in the dictionary

In [32]:
#reading green csv

green = spark.read.option("header",True)\
    .schema(green_schema) \
    .csv(txtfiles_gt)\
    .withColumn("taxi_type", lit("green").cast(StringType()))

# only new variable to know if is a yellow or green taxi

In [33]:
#reorder green variables to merge with yellow ones. They need the same Schema to be merged.

green = green.select(['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'taxi_type',
 'ehail_fee',
 'trip_type'])

In [34]:
#merge both data
df = yellow.union(green)

In [27]:
#final Schema

df.printSchema()
#all variables that are codes, categorical or ID were transformed to string

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- congestion_surcharge: float (nullable = true)
 |-- taxi_type: string (nullable = false)
 |-- ehail_fee: float (nullable = true)
 |-- trip_type: string (nullable = true)



# 2. Exploring and cleaning the Data

In [11]:
#number the rows and columns
count = df.count()
print("rows","columns")
print((count, len(df.columns)))

rows columns
(116825619, 21)


In [12]:
#columns
df.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'taxi_type',
 'ehail_fee',
 'trip_type']

In [14]:
#one row
#In each trip record dataset, one row represents a single trip.
df.head(1)

[Row(VendorID='1', pickup_datetime=datetime.datetime(2019, 1, 1, 0, 46, 40), dropoff_datetime=datetime.datetime(2019, 1, 1, 0, 53, 20), passenger_count=1, trip_distance=1.5, RatecodeID='1', store_and_fwd_flag='N', PULocationID='151', DOLocationID='239', payment_type='1', fare_amount=7.0, extra=0.5, mta_tax=0.5, tip_amount=1.649999976158142, tolls_amount=0.0, improvement_surcharge=0.30000001192092896, total_amount=9.949999809265137, congestion_surcharge=None, taxi_type='yellow', ehail_fee=0.0, trip_type='no')]

#### Missing values 

In [11]:
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
#df.select([F.round((F.count(F.when(F.isnull(F.col(c).cast("int")), F.col(c).cast("int")))/count),6).alias(c) for c in df.columns])

#missing values for string passed to integer

VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,taxi_type,ehail_fee,trip_type
1998368,0,0,1998368,0,1998368,1998368,0,0,1998368,0,0,0,0,0,2,0,6344562,0,7777747,942563


### Numerical variables

In [15]:
#first looking of the variables 
df.describe("passenger_count").show() 
df.describe("fare_amount").show() 
df.describe("extra").show() 
df.describe("trip_distance").show() 
df.describe("mta_tax").show()
df.describe("tip_amount").show()
df.describe("total_amount").show()
df.describe("ehail_fee").show()
df.describe("improvement_surcharge").show()
df.describe("tolls_amount").show()
df.describe("congestion_surcharge").show()

+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|         114827251|
|   mean|1.5277970209353875|
| stddev|1.1775814890423952|
|    min|                 0|
|    max|                 9|
+-------+------------------+

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|         116825619|
|   mean|13.311462728042809|
| stddev| 194.4980192640717|
|    min|           -1856.0|
|    max|          998310.0|
+-------+------------------+

+-------+------------------+
|summary|             extra|
+-------+------------------+
|  count|         116825619|
|   mean|1.0469919483353511|
| stddev|  46.2759172156658|
|    min|             -60.0|
|    max|          500000.8|
+-------+------------------+

+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|         116825619|
|   mean|3.3396968832882794|
| stddev|209.06998305117406|
|    min|         -37264.53|
|    max|  

#### First cleaning using the definitions of [NYC taxi fare webpage ](https://www1.nyc.gov/site/tlc/passengers/taxi-fare.page#:~:text=%242.50%20initial%20charge.,Dutchess%2C%20Orange%20or%20Putnam%20Counties.)

In [35]:
#filter all variables with negative values--> assumption: we have much data, so we can dropped it 
#with less data it could be better to replace
#ehail fee do not look bad but it is only for green in the dataset

#payments
df = df.where((F.col("fare_amount")>= 2.50)) # $2.50 initial charge and max of 2.50 + 0.50*1/5*tripdistance
df = df.where((F.col("mta_tax")>= 0)) # 0.5 for some destinations
df = df.where((F.col("tip_amount")>=0) & (F.col("total_amount")>= 2.50)) # it was assumed that an uncharged payment has a total amount
df = df.where((F.col("tolls_amount")>=0) & (F.col("extra")>= 0) & (F.col("extra")<= 1.00)) #overnight 0.5 and rush max 1.00 as extras
df = df.where((F.col("congestion_surcharge")>=0) & (F.col("congestion_surcharge")<= 2.75)) #congestion max 2.75
              
#passengers:maxium of 6 passenger (4 for small car and 6 for big)         
df = df.where((F.col("passenger_count")>0) & (F.col("passenger_count")<7)) 


#no negative distance
df = df.where((F.col("trip_distance")>= 0)) 


In [17]:
#describe again to know the new measures

df.describe("passenger_count").show() 
df.describe("fare_amount").show() 
df.describe("extra").show() 
df.describe("trip_distance").show() 
df.describe("mta_tax").show()
df.describe("tip_amount").show()
df.describe("total_amount").show()
df.describe("ehail_fee").show()
df.describe("improvement_surcharge").show()
df.describe("tolls_amount").show()
df.describe("congestion_surcharge").show()

+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          75927602|
|   mean|1.6606915888111415|
| stddev|1.3071586084115319|
|    min|                 1|
|    max|                 6|
+-------+------------------+

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          75927602|
|   mean|13.355921415944218|
| stddev|103.78408539441818|
|    min|               2.5|
|    max|          671123.1|
+-------+------------------+

+-------+-------------------+
|summary|              extra|
+-------+-------------------+
|  count|           75927602|
|   mean|0.31109860298337483|
| stddev|  0.380063858124019|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+

+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|          75927602|
|   mean|3.0256178133698484|
| stddev| 32.21392288199482|
|    min|               0.0|
| 

#### Second Cleaning

We do not know what is the max of each variable. Therefore, considering the Central Limit Theorem I can assume a normal distribution for the numeric variables because we are using a large dataset. Therefore, the definition mentions by  [machine lr mastery](https://machinelearningmastery.com/how-to-use-statistics-to-identify-outliers-in-data/) was used: Outliers are defined as any point located further than 3 standard deviations from the mean. Sometimes I used 4 st deviation because it was more logical. Also, some of them where looked with more detailed. For example: for distance is reasonable looking the map of the boroughs. mta is assumed to be 0 or 0,5 (categorical) as [NYC mta](https://www.tax.ny.gov/bus/mctmt/taxi.htm#:~:text=The%20taxicab%20and%20hail%20vehicle%20trip%20tax%20in%20the%20Metropolitan,and%20Bronx%20counties) said. For payment type = no charge it was assumed 0 payment but with a total amount value, so I did not take out.



In [37]:
#some outliers 
#if the fare in more than the minimun and distance = 0
df.where((F.col("total_amount")> 6.50) & (F.col("trip_distance")==0)).count() 

#max total amount: fare + extras + congestion + improv =  6.55
#https://www1.nyc.gov/site/tlc/passengers/taxi-fare.page#:~:text=%242.50%20initial%20charge.,Dutchess%2C%20Orange%20or%20Putnam%20Counties.

691347

In [38]:
#if the fare is more than the minimun and distance = 0
df.where((F.col("fare_amount")== 2.50) & (F.col("trip_distance")> 0.2)).count()

17569

Cleaning

In [36]:
#take out outliers for fare amount, total amount and distance
df = df.where((F.col("fare_amount")< 350)) # aprox 3 st deviation 
df = df.where((F.col("total_amount")< 400)) # aprox 3 st deviation but a little bigger than fare amount
df = df.where((F.col("tolls_amount")< 10)) # aprox 3 std deviation 
df = df.where((F.col("fare_amount")) <= (F.col("total_amount")))

## Also, the maximun distance in a straigh line is 50 milles in google maps, however, here is like 100 becaue we do not know if the drivers makes circles.
df = df.where((F.col("trip_distance")< 100 )) #3 st deviation

#because the st dev is very small because cash payments has 0 tips, I will put a maximun of 100
df = df.where((F.col("tip_amount")) < 100)

#combination fares and distance in a logical form
df = df.where((F.col("fare_amount")> 2.50) & (F.col("trip_distance")!=0) | (F.col("total_amount")<= 6.50) & (F.col("trip_distance")==0))


#mta tax is 0 or 0,5: changes to categorical
#The taxicab and hail vehicle trip tax in the Metropolitan Commuter Transportation District is a tax of $0.50 per taxicab and hail vehicle trip in some ocassions.

df = df.withColumn("mta_tax", \
              when(df["mta_tax"] > 0 , 0.5).otherwise(df["mta_tax"]))


In [37]:
#st devation of total amount, necesary to the model performance
df.describe("total_amount").show()

+-------+-----------------+
|summary|     total_amount|
+-------+-----------------+
|  count|         74769315|
|   mean|18.35019022706662|
| stddev|13.23944152682991|
|    min|              2.5|
|    max|           399.68|
+-------+-----------------+



### Categorical variables
- looking the balance and outliers

In [40]:
# VendorID: to types of machine
df.groupBy("VendorID").count()
#4 is weird

VendorID,count
1,4208739
4,196257
2,70364319


In [41]:
# store_and_fwd_flag: if the trip was saved in the machine of the vehicle
df.groupBy("store_and_fwd_flag").count()

store_and_fwd_flag,count
Y,122061
N,74647254


In [42]:
# # RatecodeID
df.groupBy("RatecodeID").count()
#99 is weird

RatecodeID,count
99,2526
3,17687
6,390
1,72819739
5,435196
4,70956
2,1422821


In [43]:
# payment_type
df.groupBy("payment_type").count()

payment_type,count
3,98003
1,52919826
5,166
4,31780
2,21719540


In [44]:
# taxi_type
df.groupBy("taxi_type").count()

taxi_type,count
yellow,68844062
green,5925253


In [45]:
# trip_type
df.groupBy("trip_type").count()
#null is weird

trip_type,count
,354
no,68844062
1,5732115
2,192784


#### Cleaning categorical variables

In [46]:
# I will change the weird values to the most common values 

#vendorid 4 to class 2 (is like the driver put 2 times 2 and it is the mayority class)
df = df.withColumn("VendorID", \
              when(df["VendorID"] == 4, 2).otherwise(df["VendorID"]))

# 99 to class 1 because is the mayority class, also they are only 53.
df = df.withColumn("RatecodeID", \
              when(df["RatecodeID"] == 99, 1).otherwise(df["RatecodeID"]))

# null to class 1 because green are the only one that have trip_type so null is different from no
df = df.withColumn("trip_type", \
              when(df["trip_type"].isNull(), 1).otherwise(df["trip_type"]))

#all another payment type does not have tip amount
df = df.withColumn("tip_amount", \
              when(df["payment_type"] != 1 , 0).otherwise(df["tip_amount"]))

#not used:
#drop all the rows that have null in the first variables above (rows that have many missing values)
# df.na.drop(thresh=5, subset=("VendorID","store_and_fwd_flag", "RatecodeID", "payment_type"))

In [47]:
#check
# how many tip amount are different from 0 and payment with other type than credit card (drop these) 

df.where((F.col("tip_amount")!= 0) & (F.col("payment_type")== 2)).count() 

0

### Date data: looking outliers
- date time coherent (min-max)

In [48]:
#knowing if date time is coherent
df.select(min("pickup_datetime"),max("pickup_datetime"),min("dropoff_datetime"),max("dropoff_datetime"))

#below I will take out the weird years (data engineering part)

min(pickup_datetime),max(pickup_datetime),min(dropoff_datetime),max(dropoff_datetime)
2001-01-01 00:02:08,2090-12-31 06:41:26,2001-01-01 01:00:02,2090-12-31 07:18:49


### Location data

- From the page we know that locationID is populated by numbers ranging from 1-265.
- Cleaning in data engineering part

In [None]:
# Pick Up location id (same as DO)
#df.groupBy("PULocationID").count().show(265)
#df.select("PULocationID").distinct().show()


# # Dropp Off location id
# df.groupBy("DOLocationID").count().show()
# #populated by numbers ranging from 1-265.

## Data Engineering

### Taxi zones

In [49]:
#open the location of each PU and DO to get more variables to the model
taxi_zone = spark.read.option("header",True)\
    .csv("taxi_zone_lookup.csv")

In [50]:
taxi_zone
#taxi_zone.count() 
#after opened the table in excel I could realise that there are 265 ID, but, the last 2 are unknown boroughs

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham G...,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wad...,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [51]:
taxi_zone = taxi_zone.drop("service_zone")

In [52]:
#joint with the actual data using the variables above
df = df.join(taxi_zone, df.PULocationID == taxi_zone.LocationID,how='left')


In [53]:
#rename to avoid confusing
df = df.drop(df.LocationID)\
       .withColumnRenamed("Borough", "Borough_PU")\
       .withColumnRenamed('Zone', 'Zone_PU')

In [54]:
df = df.join(taxi_zone, df.DOLocationID == taxi_zone.LocationID,how='left')

In [55]:
df = df.drop(df.LocationID)\
       .withColumnRenamed("Borough", "Borough_DO")\
       .withColumnRenamed('Zone', 'Zone_DO')

In [56]:
#unique values in borough and knowing the average total amount: Exploration about more expensive trips
df.groupBy("Borough_DO").agg(
      F.avg("total_amount"))

Borough_DO,avg(total_amount)
EWR,91.90575069663404
Brooklyn,27.615433490806645
Manhattan,16.510041452563392
Bronx,27.60353242664853
Queens,27.968154936483785
Unknown,30.48763576879663
Staten Island,36.1084090558494


In [57]:
df.groupBy("Borough_PU").agg(
      F.avg("total_amount"))

Borough_PU,avg(total_amount)
EWR,89.73858680725098
Brooklyn,18.150877444848412
Manhattan,16.435924363871955
Bronx,20.880524697076947
Queens,35.5660721397699
Unknown,18.989862115304422
Staten Island,51.43676536436079


From above different Borough have different total amount, been EWR the most expensive (airport).

In [59]:
#filter areas with unkown borough
df = df.where((F.col("Borough_PU")!= "Unknown") & (F.col("Borough_DO")!= "Unknown")) 

### New variables for datetime
- for datetime variables: year, quarter, months, dayweek, hour

In [60]:
#creating year,month,dayweek,etc
df = df.withColumn("year", date_format(col("dropoff_datetime"), "Y"))\
       .withColumn("month", date_format(col("dropoff_datetime"), "M"))\
       .withColumn('dow',dayofweek(df.dropoff_datetime))\
       .withColumn('quarter',quarter(df.dropoff_datetime))\
       .withColumn("dow_long", date_format(col("dropoff_datetime"), "EEEE")) \
       .withColumn('hour',hour(df.dropoff_datetime))

In [61]:
# value counts for these new variables
df.groupBy("year").count().show()
df.groupBy("month").count().show(13)
df.groupBy("dow").count().show(8)
df.groupBy("hour").count().show(26)
df.groupBy("quarter").count().show(26)

+----+--------+
|year|   count|
+----+--------+
|2091|       1|
|2002|      11|
|2033|       3|
|2020|17288473|
|2026|       1|
|2018|       1|
|2010|      39|
|2029|       3|
|2003|       3|
|2009|     678|
|2066|       1|
|2021|  134487|
|2058|       3|
|2008|       2|
|2001|       2|
|2019|56507535|
|2038|       4|
+----+--------+

+-----+-------+
|month|  count|
+-----+-------+
|    7|4978612|
|    9|5502743|
|    3|7681426|
|    6|5237043|
|    1|7291052|
|   11|5854317|
|    8|4946055|
|    5|5500005|
|   10|6170191|
|    4|5332838|
|   12|5870572|
|    2|9566393|
+-----+-------+

+---+--------+
|dow|   count|
+---+--------+
|  1| 9131605|
|  6|11357614|
|  3|10692660|
|  4|11082395|
|  5|11398215|
|  2| 9627573|
|  7|10641185|
+---+--------+

+----+-------+
|hour|  count|
+----+-------+
|  12|3918773|
|   1|1599764|
|   6|1254475|
|   3| 732385|
|   4| 566424|
|   8|3256782|
|  11|3667879|
|  19|4765365|
|  23|3178994|
|  21|4056554|
|  14|4072163|
|  16|4022503|
|  20|4159124|


In [62]:
#as mention above, years out of 2019-2020 were take out
df = df.where((F.col("year")>2018) & (F.col("year")<2021))
#df.show(1)

In [63]:
#new variable bin for each time of a day
def categorizerdate(x):
    if (x >= 0) & (x<5):
        return "early morning"
    elif (x >= 5) & (x <12):
        return "morning"
    elif (x >= 12) & (x <16):
        return "afternoon"
    elif (x >= 16) & (x <20):
        return "evening"
    elif (x >= 20) & (x <=23):
        return "late night"

In [64]:
bucket_udf = udf(categorizerdate, StringType() )
df = df.withColumn("day_time", bucket_udf("hour")) #time of a day

### New variable Duration
- difference between pickup and dropoff time in minutes

In [65]:
#here I put the duration in minutes (I divide the difference by 60) 

df = df.withColumn("duration", 
    (F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long"))/60.)

#first outliers
df = df.where(F.col("duration")>=0) #drop datetime dropoff before pickup


In [66]:
df.describe("duration").show()

+-------+-----------------+
|summary|         duration|
+-------+-----------------+
|  count|         73795576|
|   mean|19.02444278344467|
| stddev|83.81551403619875|
|    min|              0.0|
|    max|43648.01666666667|
+-------+-----------------+



Cleaning: 
- 3 st deviation from the mean as before

In [67]:
#trip duration: more than 0 but less than a a 3 st of the mean: 270 == 4.5 hours .. it is a lot but we do not have more information
df = df.where( (F.col("duration")< 270)) 

In [68]:
#logical cases as above: fare, distance, duration

df = df.where((F.col("fare_amount")> 2.50) & (F.col("trip_distance")!=0) & (F.col("duration")!=0) | (F.col("total_amount")<= 6.55) & (F.col("trip_distance")==0) & (F.col("duration")==0))

Bins for duration

In [69]:
def categorizer(duration):
    if duration < 5:
        return "short1"
    elif (duration >= 5) & (duration <10):
        return "short2"
    elif (duration >= 10) & (duration <20):
        return "middle"
    elif (duration >= 20) & (duration <30):
        return "long1"
    elif (duration >= 30):
        return "long2"

In [70]:
bucket_udf = udf(categorizer, StringType() )
df = df.withColumn("bins_duration", bucket_udf("duration")) #duration in minutes

### New variable: Average paid per passenger

In [71]:
#creating a new variable (not used in the model)
df= df.withColumn("av_paid", df.total_amount/df.passenger_count)

In [72]:
df.describe("av_paid")

summary,av_paid
count,73319398.0
mean,14.835076567775143
stddev,12.221611337018498
min,0.5
max,393.6400146484375


### New variable Speed(km/hr)

In [73]:
df = df.withColumn("speed", (df.trip_distance*1.6)/((df.duration)/60))

In [74]:
df.describe("speed")

summary,speed
count,73319250
mean,19.434548894936395
stddev,117.86501467078517
min,0.003690415089219...
max,150566.396484375


In [75]:
#I assume the min speed is 0 and the max speed is 220
df = df.where((F.col("speed")<=220) & (F.col("speed")>=0))

# the max speed is 220 asumming a taxi could not go too fast, but probably the maximun should be less.

### New variable bins of distance

In [76]:
def categorizerdist(distance):
    if distance < 5:
        return "short1"
    elif (distance >= 5) & (distance <20):
        return "short2"
    elif (distance >= 20) & (distance <40):
        return "middle"
    elif (distance >= 40) & (distance <60):
        return "long1"
    elif (distance >= 60):
        return "long2"

In [77]:
bucket_udf = udf(categorizerdist, StringType() )
df = df.withColumn("bins_distance", bucket_udf("trip_distance")) #duration in minutes

### Cheking missing values again to export the data

In [78]:
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
#only ehaill fee has missing values

VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,taxi_type,ehail_fee,trip_type,Borough_PU,Zone_PU,Borough_DO,Zone_DO,year,month,dow,quarter,dow_long,hour,day_time,duration,bins_duration,av_paid,speed,bins_distance
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5821305,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [80]:
#number the rows and columns
count = df.count()
print("rows","columns")
print((count, len(df.columns)))

#its aprox 63% of the original data, but because it is big data, its still a lot of information.

rows columns
(73297413, 37)


# 3. Export data

In [79]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
#df.write.parquet("df_clean.parquet")

df.write.mode('overwrite').parquet("df_clean.parquet")

#I used it because it has a fast reading. That was useful to make the queries below in different days.

# 4. Answer the list of provided business questions

In [83]:
parqDF=spark.read.parquet("df_clean.parquet")

In [84]:
#create temporary view for to run faster queries

#parquet df
parqDF.createOrReplaceTempView("df_par")

#spark df
#df.createOrReplaceTempView("df_par")

### a. For each year and month:

What was the total number of trips?

In [None]:
# sparksql
#df.groupBy("year").count().show()

In [97]:
spark.sql('''
SELECT 
 year, month
 , COUNT(1) n_trips
FROM df_par
GROUP BY year, month
ORDER BY n_trips
''').show(24)

#almost all the months in 2020 have few trips than 2019.

+----+-----+-------+
|year|month|n_trips|
+----+-----+-------+
|2020|    4| 152445|
|2020|    5| 198245|
|2020|    6| 338796|
|2020|    7| 501566|
|2020|    8| 662063|
|2020|    9| 886474|
|2020|   11|1001497|
|2020|   10|1117900|
|2020|   12|1206911|
|2020|    3|2141049|
|2019|    1|2733703|
|2019|    8|4247183|
|2019|    7|4440139|
|2020|    2|4455335|
|2019|   12|4491394|
|2020|    1|4514960|
|2019|    9|4577649|
|2019|   11|4812995|
|2019|    6|4861047|
|2019|   10|5010768|
|2019|    2|5048379|
|2019|    4|5143664|
|2019|    5|5263851|
|2019|    3|5489400|
+----+-----+-------+



Which hour of the day had the most trips?

In [None]:
#spark sql
#df.groupBy("year","hour").count().sort("count", ascending=False).show()

In [100]:
spark.sql('''
WITH x AS
(
    SELECT year, month, hour, COUNT(*) AS trips
    FROM df_par
    GROUP BY year,month, hour
)
SELECT x.year, x.month, x.hour, x.trips
FROM x
INNER JOIN
(
    SELECT year,month, MAX( trips ) AS maxCountX
    FROM x
    GROUP BY year,month
) x2
ON x2.maxCountX = x.trips
Order by trips
''').show(24)
# most trips at 18 (15 of 24) and others at 15 pm

+----+-----+----+------+
|year|month|hour| trips|
+----+-----+----+------+
|2020|    4|  15| 11334|
|2020|    5|  15| 15165|
|2020|    6|  15| 25205|
|2020|    7|  15| 36458|
|2020|    8|  15| 47840|
|2020|    9|  18| 66575|
|2020|   11|  15| 80113|
|2020|   10|  18| 83117|
|2020|   12|  15| 94203|
|2020|    3|  18|148983|
|2019|    1|  18|186963|
|2019|    8|  18|279397|
|2019|   12|  19|285928|
|2019|    7|  18|289743|
|2019|    9|  18|302445|
|2020|    2|  18|309373|
|2019|    6|  18|310187|
|2020|    1|  18|310445|
|2019|   11|  18|317974|
|2019|   10|  19|333422|
|2019|    2|  18|335561|
|2019|    4|  18|344770|
|2019|    5|  18|352855|
|2019|    3|  18|363012|
+----+-----+----+------+



Which weekday had the most trips?

In [None]:
#spark SQL
# w = Window().partitionBy("month").orderBy(F.desc("count"))

# df_with_rank = (df_agg
#     .withColumn("rank", F.dense_rank().over(w)))

# df_with_rank.where(F.col("rank") == 1).show()

In [103]:
spark.sql('''
WITH x AS
(
    SELECT year, month, dow_long, COUNT(*) AS trips
    FROM df_par
    GROUP BY year,month, dow_long
)
SELECT x.year,x.month, x.dow_long, x.trips
FROM x
INNER JOIN
(
    SELECT year,month, MAX( trips ) AS maxCountX
    FROM x
    GROUP BY year,month
) x2
ON x2.maxCountX = x.trips
Order by year, dow_long
''').show(24)
#the most repeated weekday in the two years was thursday, and people travel less in taxis on sunday, monday and wednesday.

+----+-----+---------+------+
|year|month| dow_long| trips|
+----+-----+---------+------+
|2019|    3|   Friday|955978|
|2019|    2|   Friday|863399|
|2019|   12|   Friday|696089|
|2019|    6| Saturday|815457|
|2019|   11| Saturday|844033|
|2019|    9|   Sunday|684498|
|2019|   10| Thursday|862885|
|2019|    5| Thursday|909829|
|2019|    8| Thursday|754486|
|2019|    1| Thursday|549746|
|2019|    4|  Tuesday|863916|
|2019|    7|Wednesday|776055|
|2020|    5|   Friday| 38132|
|2020|    1|   Friday|784626|
|2020|    8|   Monday|109579|
|2020|   11|   Monday|169296|
|2020|    2| Saturday|777383|
|2020|    4| Thursday| 28077|
|2020|    7| Thursday| 93379|
|2020|   10| Thursday|204167|
|2020|    3|  Tuesday|332539|
|2020|   12|  Tuesday|275965|
|2020|    6|  Tuesday| 62357|
|2020|    9|Wednesday|164195|
+----+-----+---------+------+



What was the average number of passengers? 

- assumption: maximun of 6

In [None]:
#SparkSQL
#df.groupBy("year").avg( "passenger_count")
#df.describe("passenger_count").show()

In [99]:
#year
spark.sql('''
SELECT 
 year, month
 , AVG(passenger_count) avg_pass
FROM df_par
GROUP BY year, month
Order by year,month
''').show(24)
#almost all the months in the two years have on average of 2 passengers per trip
#Only april 2020 have 1 passenger after rounded.

+----+-----+------------------+
|year|month|          avg_pass|
+----+-----+------------------+
|2019|    1|1.5753796224388676|
|2019|   10|1.6673266453366031|
|2019|   11|1.6634735751855134|
|2019|   12|1.6526546101277242|
|2019|    2|1.7157715377549903|
|2019|    3|1.7255292017342514|
|2019|    4|1.7193510307049604|
|2019|    5| 1.713222885678185|
|2019|    6|1.7002939901630245|
|2019|    7| 1.699389816399892|
|2019|    8|1.6989548131078882|
|2019|    9|1.6834604400643212|
|2020|    1| 1.632525648067757|
|2020|   10| 1.548797745773325|
|2020|   11|1.5367884277236976|
|2020|   12|1.5803145385202388|
|2020|    2|1.6228101815015032|
|2020|    3|1.5905446348962589|
|2020|    4| 1.427072058775296|
|2020|    5|1.4593558475623598|
|2020|    6|1.5027420630704023|
|2020|    7|1.5231475020236618|
|2020|    8|1.5356151906993745|
|2020|    9|1.5423159618894633|
+----+-----+------------------+



What was the average amount paid per trip (total_amount)?

In [None]:
#sparkSQL
# df.groupBy("year").avg( "total_amount")

In [89]:
spark.sql('''
SELECT 
 year, month
 , AVG(total_amount) avg_tot_amount
FROM df_par
GROUP BY year, month
ORDER BY year,month
''').show(24)
#It could be seeen that on average trips in 2020 were cheaper than 2019.

+----+-----+------------------+
|year|month|    avg_tot_amount|
+----+-----+------------------+
|2019|    1|15.096294748356863|
|2019|   10|18.869977676019314|
|2019|   11|18.571725949140497|
|2019|   12|18.721124848093712|
|2019|    2|18.036768584663175|
|2019|    3|18.567307344125382|
|2019|    4|18.530467365477495|
|2019|    5| 18.85895335140635|
|2019|    6|18.972481721704433|
|2019|    7| 18.71558866978898|
|2019|    8|18.709218650467744|
|2019|    9|19.007221638322868|
|2020|    1|17.850002345749367|
|2020|   10|16.432112214216787|
|2020|   11|16.334123382101073|
|2020|   12|16.782228968766717|
|2020|    2| 17.87319769034908|
|2020|    3| 17.63022656460476|
|2020|    4|15.327375863020611|
|2020|    5|15.754282578319412|
|2020|    6|16.730792409476525|
|2020|    7| 16.64126167410382|
|2020|    8|16.667475996871243|
|2020|    9| 16.50839015831163|
+----+-----+------------------+



What was the average amount paid per passenger (total_amount)?

In [None]:
#year
# df.groupBy("year").avg( "av_paid")

In [90]:
spark.sql('''
SELECT 
 year,month
 , AVG(total_amount/passenger_count) avg_ppp
FROM df_par
GROUP BY year, month
ORDER BY year, month
''').show(24)
#as the same as before, the paid per passenger was less in 2020 than in 2019.

+----+-----+------------------+
|year|month|           avg_ppp|
+----+-----+------------------+
|2019|    1|12.581860001105474|
|2019|   10| 15.31556331966309|
|2019|   11|15.083163967273954|
|2019|   12|15.207953932412295|
|2019|    2|14.558124805878265|
|2019|    3|14.947923602348268|
|2019|    4|14.884629502462765|
|2019|    5|  15.1721931423285|
|2019|    6|15.292627259584567|
|2019|    7| 15.05255132379084|
|2019|    8|15.038600577625951|
|2019|    9|15.364268120960881|
|2020|    1|14.639396892337118|
|2020|   10|13.800163751637143|
|2020|   11| 13.76754429925853|
|2020|   12|13.922603903424715|
|2020|    2|14.705244765235154|
|2020|    3| 14.65821195844199|
|2020|    4|13.503782270454625|
|2020|    5|13.695092579936892|
|2020|    6| 14.32963459427532|
|2020|    7|14.131913231334824|
|2020|    8|14.062551362994341|
|2020|    9|13.897527495012008|
+----+-----+------------------+



### b.  For each taxi colour (yellow and green)

What was the average, median, minimum and maximum trip duration in seconds?

In [None]:
# df.groupBy("taxi_type").agg(
#       F.min("duration")*60,
#       F.avg("duration")*60,
#       F.max("duration")*60).show()
#       #F.approxQuantile("duration", [0.5], 0.25))

In [105]:
spark.sql('''
SELECT 
 taxi_type
 , AVG(duration*60)  avg_dur 
 , MIN(duration*60)  min_dur
 , MAX(duration*60)  max_dur
 , approx_percentile(duration*60, 0.5, 100) median_dur
FROM df_par
GROUP BY taxi_type
''').show()
#Since the minimun assumed in the cleaning part was 0, it could be see that the min in sec is 1.
#Also, the maximun is less than 270 (assumption before)
#The average of yellow if less than green but booth near 850 (14 minutes). 


+---------+-----------------+-------+------------------+----------+
|taxi_type|          avg_dur|min_dur|           max_dur|median_dur|
+---------+-----------------+-------+------------------+----------+
|   yellow|843.0866080577625|    1.0|           16197.0|     679.0|
|    green| 854.211110441199|    1.0|16191.000000000002|     679.0|
+---------+-----------------+-------+------------------+----------+



Also, it interesting to see that the average is higher than the median. Therefore, it is probably that the trips with duration near the maximum are few.

What was the average, median, minimum and maximum trip distance in km?

In [None]:
#spark sql
# df.groupBy("taxi_type").agg(
#       F.min("trip_dist_km"),
#       F.avg("trip_dist_km"),
#       F.max("trip_dist_km")).show()
#       #F.approxQuantile("duration", [0.5], 0.25))

In [92]:
spark.sql('''
SELECT 
 taxi_type
 , AVG(trip_distance*1.6)  avg_td 
 , MIN(trip_distance*1.6)  min_td
 , MAX(trip_distance*1.6)  max_td
 ,approx_percentile(trip_distance*1.6, 0.5, 100) median_td
FROM df_par
GROUP BY taxi_type''').show()
#the minimum is 0 because the assumptions in the cleaning part. The same for the maximun.
#As above, the average is like two times of the median probably because some trips are near the maximun but they are few.
#It is very similar between taxy type.

+---------+-----------------+--------------------+------------------+------------------+
|taxi_type|           avg_td|              min_td|            max_td|         median_td|
+---------+-----------------+--------------------+------------------+------------------+
|   yellow|4.735446221545285|0.015999999642372132|158.99200439453125|2.7200000762939456|
|    green| 4.76187468326689|0.015999999642372132| 135.5199951171875| 2.992000007629395|
+---------+-----------------+--------------------+------------------+------------------+



What was the average, median, minimum and maximum speed in km per hour? 


In [93]:
spark.sql('''
SELECT 
 taxi_type
 , AVG((trip_distance*1.6)/(duration/60))  avg_speed_km 
 , MIN((trip_distance*1.6)/(duration/60))  min_speed_km
 , MAX((trip_distance*1.6)/(duration/60))  max_speed_km
 , approx_percentile((trip_distance*1.6)/(duration/60), 0.5, 100) median_speed_km
FROM df_par
GROUP BY taxi_type''').show()
#green and yellow have almost the same average speed.
#The maximun and minumum are inside the range assumed above
#the average and median its not very different, probably there are few outliers here.

+---------+-----------------+--------------------+------------------+------------------+
|taxi_type|     avg_speed_km|        min_speed_km|      max_speed_km|   median_speed_km|
+---------+-----------------+--------------------+------------------+------------------+
|   yellow| 18.6772211546339|0.003690415089219...|219.92726586081767|  16.3265306122449|
|    green|19.11572623111619|0.006349906152854...|219.92726586081767|17.209756432510005|
+---------+-----------------+--------------------+------------------+------------------+



### c. What was the percentage of trips where the driver received tips?

- assumption, only for payment_type = credit card because for cash NYC dataset does not have the information
- also, for another payment type it was assumed that there is not a tip because they are related with no payment

In [None]:
# df.where(df.tip_amount>0).count()/df.count()

In [94]:
spark.sql('''
select 
(select count(*) from df_par where payment_type =1 AND tip_amount>0)/ count(*) as perc_with_tips
from df_par 
where payment_type =1
''').show()
#only considering credit card, 95% of the time the taxis received tips

+------------------+
|    perc_with_tips|
+------------------+
|0.9462256074176378|
+------------------+



In [None]:
#corraborate its ok

# spark.sql('''
# select count(*)
# from df_par
# where payment_type =1 AND tip_amount>0
# ''').show()

# spark.sql('''
# select count(*)
# from df_par
# where payment_type =1 
# ''').show()

### d. For trips where the driver received tips, What was the percentage where the driver received tips of at least $10.

- the same assumption as below, only for payment_type = credit card

In [None]:
# df.where(df.tip_amount>10).count()/df.where(df.tip_amount>0).count()

In [95]:
spark.sql('''
select 
(select count(*) from df_par where payment_type =1 AND tip_amount >= 10)/ count(*) as perc_with_tips
from df_par 
where payment_type =1 and tip_amount > 0
''').show()
#only 3% of the time the driver recieves a tip of at least 10 using credit card

+--------------------+
|      perc_with_tips|
+--------------------+
|0.030831322526626518|
+--------------------+



### e. Classify each trip into bins of durations:
- Under 5 Mins,From 5 mins to 10 mins,From 10 mins to 20 mins,From 20 mins to 30 mins,At least 30 mins
- Then for each bins, calculate: Average speed (km per hour) and Average distance per dollar (km per $)

In [96]:
spark.sql('''
SELECT 
  CASE  WHEN duration < 5 THEN "<5"
      WHEN duration >=5 AND duration <10 THEN "5-10"
      WHEN duration >=10 AND duration <20 THEN "10-20"
      WHEN duration >=20 AND duration <30 THEN "20-30"
      WHEN duration >=30 THEN ">30" 
      END as duration_bins,
  AVG((trip_distance*1.6)/(duration/60)) as ave_speed,
  AVG((trip_distance*1.6)/(total_amount)) as ave_dist_dollar
FROM df_par
GROUP BY 
  CASE  WHEN duration < 5 THEN "<5"
      WHEN duration >=5 AND duration <10 THEN "5-10"
      WHEN duration >=10 AND duration <20 THEN "10-20"
      WHEN duration >=20 AND duration <30 THEN "20-30"
      WHEN duration >=30 THEN ">30" 
      END
''').show()
#the less average distance per dollar is in bin <5 minutes trip. 
#But it is interesting that the average speed is higher than 5-10 and 10-20 min. 
#Its like that more duration more probability of traffic congestion, so less speed.

+-------------+------------------+-------------------+
|duration_bins|         ave_speed|    ave_dist_dollar|
+-------------+------------------+-------------------+
|          >30|25.703337447297155|0.35497007534492064|
|           <5| 19.62316593317579|0.12833775722739388|
|        10-20|17.436047735429053| 0.2285887630802899|
|        20-30|21.347825394595272| 0.2856627274076294|
|         5-10|16.803196441768144|0.17656291442405736|
+-------------+------------------+-------------------+



### f. Which duration bin will you advise a taxi driver to target to maximise his income?

In [None]:
#the bin that gives less distance per dollar shloud be the target because for less distance the driver receives 1 dollar.
#the driver should target trips of less than 5 minutes because its receives 1 dollar in 0,13 km.

# 5. Build a ML model to predict the Total fare amount of a trip - GCP PART
- based for the last 3 months of data (train your data on the remaining dataset). 
- You are not allowed to use “fare_amount” as a feature for your model. 
- The model will be assessed using the RMSE score.


### Load the data from google storage

In [3]:
# Google Cloud Storage Bucket
gcs_bucket = 'parquet_df'
folder = 'df_clean.parquet'

parqDF=spark.read.parquet('gs://{}/{}'.format(gcs_bucket,folder))

## Prepare the data

Selection of columns to use:
- 1. Some variables were dropped
- 2. Correlation of the numerical variables with the total amount 
- 3. Relation of the categorical variables and total amount

In [87]:
df = parqDF

In [10]:
#missing values again (from parquet files in GCP)
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,taxi_type,ehail_fee,trip_type,Borough_PU,Zone_PU,Borough_DO,Zone_DO,year,month,dow,quarter,dow_long,hour,day_time,duration,bins_duration,av_paid,speed,bins_distance
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5829694,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [5]:
df.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'taxi_type',
 'ehail_fee',
 'trip_type',
 'Borough_PU',
 'Zone_PU',
 'Borough_DO',
 'Zone_DO',
 'year',
 'month',
 'dow',
 'quarter',
 'dow_long',
 'hour',
 'day_time',
 'duration',
 'bins_duration',
 'av_paid',
 'speed',
 'bins_distance']

In [88]:
#new dataframe to avoid change df
df_model = df
#df_model = df.sample(withReplacement=False, fraction=0.001) #to try with subsample

#### Dropp some columns
- datetime was dropped because it is one value per row
- Location ID was dropped because it has 263 values
- ehail_fee because it has many missing values and yellow taxis do not have it so I put 0
- average paid because it was created with total amount
- fare amount because it was mandatory


In [89]:
df_model = df_model.drop("fare_amount", "ehail_fee", "pickup_datetime", "dropoff_datetime", "PULocationID", "DOLocationID", "Zone_PU", "Zone_PU", "av_paid")

In [90]:
df_model.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- congestion_surcharge: float (nullable = true)
 |-- taxi_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- Borough_PU: string (nullable = true)
 |-- Borough_DO: string (nullable = true)
 |-- Zone_DO: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dow: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- dow_long: string (nullable = true)
 |-- hour: integer (nullable

### Correlation numeric variables

In [8]:
corr_var = [
 'trip_distance',
 'extra',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'speed',
 'duration']

In [9]:
corr_data = df_model.select(corr_var)

col_names = corr_data.columns
features = corr_data.rdd.map(lambda row: row[0:])
corr_mat=Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

# print(corr_df.to_string())

In [10]:
corr_df
#with total amount is very correlated(positive) trip distance, duration, tip amount, tolls amount, and speed.
#between them: speed with trip_distance have a positive correlation(it is expected because speed  was created with trip distance)
#but there are not multicolinearity

Unnamed: 0,trip_distance,extra,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,speed,duration
trip_distance,1.0,-0.055338,0.505426,0.598967,-0.054412,0.925235,-0.151918,0.658451,0.782197
extra,-0.055338,1.0,-0.0058,-0.070279,0.035803,-0.031688,-0.004035,-0.015334,-0.04353
tip_amount,0.505426,-0.0058,1.0,0.418334,0.031372,0.679567,0.108017,0.286834,0.451582
tolls_amount,0.598967,-0.070279,0.418334,1.0,-0.017525,0.653784,-0.024693,0.395615,0.437976
improvement_surcharge,-0.054412,0.035803,0.031372,-0.017525,1.0,-0.034669,0.090917,-0.032874,-0.06112
total_amount,0.925235,-0.031688,0.679567,0.653784,-0.034669,1.0,-0.011055,0.515663,0.839491
congestion_surcharge,-0.151918,-0.004035,0.108017,-0.024693,0.090917,-0.011055,1.0,-0.182554,-0.059582
speed,0.658451,-0.015334,0.286834,0.395615,-0.032874,0.515663,-0.182554,1.0,0.188027
duration,0.782197,-0.04353,0.451582,0.437976,-0.06112,0.839491,-0.059582,0.188027,1.0


### Some information of categorical
- additional to the bussiness questions above

In [14]:
df.groupBy("payment_type").agg(
      F.avg("total_amount"))

payment_type,avg(total_amount)
3,18.839148980207245
5,16.153259203169082
1,19.440214191482696
4,18.459082389825337
2,15.277134558823873


In [15]:
df.groupBy("day_time").agg(
      F.avg("total_amount"))

day_time,avg(total_amount)
afternoon,17.92181650176273
morning,17.509077209948025
late night,18.907289130422285
evening,18.32396750060964
early morning,19.274203122245275


In [16]:
df.groupBy("RatecodeID").agg(
      F.avg("total_amount"))

RatecodeID,avg(total_amount)
3,45.28260809904577
5,37.66927490512147
6,10.712174161620762
1,17.164054550549952
4,58.66229495395925
2,68.06670469709013


In [17]:
df.groupBy("store_and_fwd_flag").agg(
      F.avg("total_amount"))

store_and_fwd_flag,avg(total_amount)
Y,19.41564811858532
N,18.235599580778963


In [18]:
df.groupBy("bins_duration").agg(
      F.avg("total_amount"))

bins_duration,avg(total_amount)
long1,28.253768081241702
long2,47.681962778953576
short1,9.1437703048754
short2,11.693697397774985
middle,17.378305928837285


In [19]:
df.groupBy("bins_distance").agg(
      F.avg("total_amount"))

bins_distance,avg(total_amount)
long1,126.13791769742966
long2,172.93500002692727
short1,14.224050339614044
short2,40.639347451623145
middle,71.58374561213907


In [20]:
df.groupBy("trip_type").agg(
      F.avg("total_amount"))

trip_type,avg(total_amount)
1,15.147609094336952
no,18.47274219193124
2,26.55248930284848


In [21]:
df.groupBy("taxi_type").agg(
      F.avg("total_amount"))

taxi_type,avg(total_amount)
green,15.508819398431635
yellow,18.47274219193124


In [22]:
df.groupBy("VendorID").agg(
      F.avg("total_amount"))

VendorID,avg(total_amount)
1,18.514978795375693
2,18.221892943940382


In [24]:
df.groupBy("year").agg(
      F.avg("total_amount"))

year,avg(total_amount)
2020,17.354182974664212
2019,18.50764610209183


In [25]:
df.groupBy("month").agg(
      F.avg("total_amount"))

month,avg(total_amount)
7,18.50505163977245
11,18.186317858789305
3,18.304369230534444
8,18.43386836929054
5,18.74627033422565
6,18.826424355698272
9,18.601822796200995
1,16.811491006470014
10,18.425298701770902
4,18.438268491909746


In [28]:
df.groupBy("dow_long").agg(
      F.avg("total_amount"))

dow_long,avg(total_amount)
Wednesday,18.252978752192085
Tuesday,18.141381296582548
Friday,18.292258007219125
Thursday,18.50573191801035
Saturday,17.58065442072873
Monday,18.41145233165885
Sunday,18.51102844861624


In [27]:
df.groupBy("quarter").agg(
      F.avg("total_amount"))

quarter,avg(total_amount)
1,17.72636673479108
3,18.516759928984605
4,18.30943987964008
2,18.67016935986904


In [35]:
df.groupBy("mta_tax").agg(
      F.avg("total_amount"))

mta_tax,avg(total_amount)
0.0,36.55852516328567
0.5,18.19390441739796


In [36]:
df.groupBy("passenger_count").agg(
      F.avg("total_amount"))

passenger_count,avg(total_amount)
1,18.16574229904865
6,18.16736194469376
3,18.35580709251433
5,18.199312842904423
4,18.408944978409576
2,18.55832800586188


After looking the total fare amount for the categories of the categorical variables, it could be realised that payment type, day time, store_fw_flag  and vendorID have similar total amount per class. But, ratecodeID have differences, been number 2 (Airport) the most expensive. Also, bins duration and bins distance have different total amount per category.  Besides, dispatch trips are more expensive and yellow taxis. Mta_tax also have different total amount and for passenger count there are not a difference.

For dates variables, there are not a high diference in total amount (years, quarter, months and day of the week).

Also, it is important to see that having small categories for each string type makes creating one-hot encoding a suitable pre-processing step.

# Modelling
- Linear Regression
- Random Forest

### Selecting colums
- first models, all the raw columns were selected and daytime instead of hours
- second, considering the above analysis and the feature importance of the first models-first set of variables

## First Selection of variables

In [42]:
#selection of columns
cols_list = ['VendorID','passenger_count','trip_distance','RatecodeID','store_and_fwd_flag','payment_type','extra','mta_tax','tip_amount',
 'tolls_amount','improvement_surcharge','total_amount','congestion_surcharge','taxi_type','trip_type','Borough_PU','Borough_DO',
 'year','quarter','dow_long','month','day_time','duration','speed']

df_model = df_model.select(cols_list)

In [43]:
cat_cols = ['payment_type','day_time','RatecodeID', 'year','month', 'VendorID', 'store_and_fwd_flag','taxi_type','trip_type',
            'Borough_PU','Borough_DO', 'dow_long', 'passenger_count','mta_tax', 'quarter']

In [44]:
num_cols = ['congestion_surcharge','improvement_surcharge','extra', 'tolls_amount', 'tip_amount', 'trip_distance','duration', 'speed']

### Transform categorical data 

In [45]:
stages = []
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoderEstimator(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]
    
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]


#### Vector assemble for the model

In [46]:
#features
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

In [47]:
#target change the name a label 
df_model = df_model.withColumnRenamed("total_amount", "label")


In [48]:
stages += [assembler]

## Pipeline

In [49]:
pipeline = Pipeline(stages=stages)

In [50]:
pipeline_model = pipeline.fit(df_model)

In [51]:
df_model = pipeline_model.transform(df_model)

In [52]:
#selectng the two columns that we need to pyspark
df_model.select(['label', 'features']).show

<bound method DataFrame.show of +-----+--------------------+
|label|            features|
+-----+--------------------+
| 13.8|(63,[1,4,8,13,16,...|
| 14.3|(63,[0,6,8,13,14,...|
| 18.3|(63,[1,7,8,13,16,...|
| 12.3|(63,[0,6,8,13,14,...|
| 16.3|(63,[1,4,8,13,16,...|
| 10.8|(63,[1,6,8,13,14,...|
|18.95|(63,[0,4,8,13,16,...|
| 11.8|(63,[0,6,8,13,14,...|
| 22.8|(63,[1,4,8,13,16,...|
| 14.8|(63,[1,6,8,13,14,...|
|15.39|(63,[0,8,13,16,25...|
|15.96|(63,[0,5,8,13,14,...|
| 16.8|(63,[1,4,8,13,16,...|
|19.32|(63,[0,6,8,13,14,...|
| 16.8|(63,[1,4,8,13,16,...|
| 14.3|(63,[1,6,8,13,14,...|
| 30.0|(63,[0,4,8,13,16,...|
|15.96|(63,[0,6,8,13,14,...|
|29.96|(63,[0,4,10,13,16...|
|  9.8|(63,[1,5,8,13,14,...|
+-----+--------------------+
only showing top 20 rows
>

## Split data

In [53]:
#test for the last 3 months of 2020
train_data = df_model.where((df_model.year ==2019) | (df_model.year == 2020) & (df_model.month <10))
test_data = df_model.where((df_model.year == 2020) & (df_model.month >9))

In [54]:
train_data.count()

69971105

In [55]:
test_data.count()

3326308

### MODELS

In [None]:
rfr = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", seed = 12 )
                           
model = rfr.fit(train_data)
predictions_rfr_train = model.transform(train_data)
predictions_rfr_test = model.transform(test_data)

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression

# Define LinearRegression algorithm 
glr = GeneralizedLinearRegression(family="gaussian", link="identity",\
                                  maxIter=10, regParam=0.3, featuresCol="features",\
                                  labelCol="label")
                           
model2 = glr.fit(train_data)
predictions_glr_train = model2.transform(train_data)
predictions_glr_test = model2.transform(test_data)

In [None]:
#not used finally, very long
#print("Coefficients: " + str(model2.coefficients))

In [None]:
#model2.summary #not used, very long

### Performance: RMSE

In [None]:
# RMSE is used as evaluation metric
evaluator = RegressionEvaluator(predictionCol="prediction",\
                                labelCol="label",\
                                metricName ="rmse")
RMSE_rfr_test= evaluator.evaluate(predictions_rfr_test)
RMSE_rfr_train = evaluator.evaluate(predictions_rfr_train)

RMSE_glr_test= evaluator.evaluate(predictions_glr_test)
RMSE_glr_train = evaluator.evaluate(predictions_glr_train)

In [None]:
print("Random Forest")
print("Root Mean Squared Error (RMSE) on train data = %g" % RMSE_rfr_train)
print("Root Mean Squared Error (RMSE) on test data = %g" % RMSE_rfr_test)

Random Forest
Root Mean Squared Error (RMSE) on train data = 2.85931
Root Mean Squared Error (RMSE) on test data = 2.60685


In [None]:
print("Linear Regression")
print("Root Mean Squared Error (RMSE) on train data = %g" % RMSE_glr_train)
print("Root Mean Squared Error (RMSE) on test data = %g" % RMSE_glr_test)

Linear Regression
Root Mean Squared Error (RMSE) on train data = 2.483
Root Mean Squared Error (RMSE) on test data = 2.01799


RMSE is helpful to compare between models. Looking above, Linear Regression performs better (lower error) than Random Forest, but, they are similar.

Because RMSE is in units of the outcome, alone is meaningless. Therefore, one form to know if we are getting a good performance is to compare it with the description of the actual target value. After comparison with the standard deviation in the cleaning part above, the RMSE is lower, therefore looks good.

### Feature Importance

In [None]:
model.featureImportances

SparseVector(63, {0: 0.003, 1: 0.0013, 6: 0.0, 7: 0.0, 8: 0.0151, 9: 0.0391, 10: 0.0011, 11: 0.0, 12: 0.0, 16: 0.0, 25: 0.0, 27: 0.0005, 28: 0.0006, 30: 0.0218, 31: 0.011, 32: 0.0, 35: 0.0101, 36: 0.0004, 37: 0.0002, 38: 0.0, 39: 0.0, 45: 0.0, 51: 0.0002, 55: 0.0008, 56: 0.0002, 57: 0.0003, 58: 0.0527, 59: 0.1264, 60: 0.3939, 61: 0.2799, 62: 0.0413})

In [None]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [None]:
ExtractFeatureImp(model.featureImportances, df_model, "features").head(10)

Unnamed: 0,idx,name,score
5,60,trip_distance,0.393903
6,61,duration,0.27985
4,59,tip_amount,0.126414
3,58,tolls_amount,0.052688
7,62,speed,0.041256
17,9,RatecodeID_ohe_2,0.039111
38,30,Borough_PU_ohe_Manhattan,0.021844
16,8,RatecodeID_ohe_1,0.01511
39,31,Borough_PU_ohe_Queens,0.011029
43,35,Borough_DO_ohe_Manhattan,0.010059


### Changing manually parameters of Random Forest 

In [68]:
#default parameters for random forest model
print(model.explainParam("maxDepth")) #shorter is more generalizable
print(model.explainParam("maxBins"))  #more bins more time processing but more exactly
print(model.explainParam("numTrees")) #change trees to make the final "votation"
#maxDepth=5, maxBins=32,numTrees=20

maxDepth: Maximum depth of the tree. (Nonnegative) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5)
maxBins: Max number of bins for discretizing continuous features.  Must be at least 2 and at least number of categories for any categorical feature. (default: 32)
numTrees: Number of trees to train (at least 1) (default: 20)


In [None]:
rfr2 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 10, maxBins = 10)
rfr3 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 5, maxBins = 10)
rfr4 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 10, maxBins = 20)
                           
model_rfr2 = rfr2.fit(train_data)
predictions_rfr_train_2 = model_rfr2.transform(train_data)
predictions_rfr_test_2 = model_rfr2.transform(test_data)

model_rfr3 = rfr3.fit(train_data)
predictions_rfr_train_3 = model_rfr3.transform(train_data)
predictions_rfr_test_3 = model_rfr3.transform(test_data)

model_rfr4 = rfr4.fit(train_data)
predictions_rfr_train_4 = model_rfr4.transform(train_data)
predictions_rfr_test_4 = model_rfr4.transform(test_data)

In [None]:
#RMSE
print("N1")
print( evaluator.evaluate(predictions_rfr_train_2))
print(evaluator.evaluate(predictions_rfr_test_2))

print("N2")
print( evaluator.evaluate(predictions_rfr_train_3))
print(evaluator.evaluate(predictions_rfr_test_3))

print("N3")
print( evaluator.evaluate(predictions_rfr_train_4))
print(evaluator.evaluate(predictions_rfr_test_4))

N1
2.7479517493487244
2.5303145412758834
N2
3.5440389751293515
3.138203003293098
N3
2.1358847611212157
2.0237583116045674


In [73]:
ExtractFeatureImp(model_rfr4.featureImportances, df_model, "features").head(10)

Unnamed: 0,idx,name,score
5,60,trip_distance,0.417131
6,61,duration,0.259664
4,59,tip_amount,0.100575
3,58,tolls_amount,0.058646
7,62,speed,0.038316
17,9,RatecodeID_ohe_2,0.033874
43,35,Borough_DO_ohe_Manhattan,0.02324
39,31,Borough_PU_ohe_Queens,0.019982
16,8,RatecodeID_ohe_1,0.017043
8,0,payment_type_ohe_1,0.007588


The last tuning for RF had a performance similar to the linear regresion.

#### Grid Search (it was not done because it takes a long time)

In [None]:
# #using others parameters

# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# rfr = RandomForestRegressor(featuresCol="features",\
#                             labelCol="label" , numTrees = 10, seed =12)

# evaluator = RegressionEvaluator(predictionCol="prediction",\
#                                 labelCol="label",\
#                                 metricName ="rmse")

# paramGrid = (ParamGridBuilder()
#              .addGrid(rfr.maxDepth, [5,10])
#              .addGrid(rfr.maxBins, [10, 20])
#   #           .addGrid(rfr.numTrees, [10])
#              .build())

# # paramGrid = ParamGridBuilder() \
# #     .addGrid(rfr.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
# #     .addGrid(rfr.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
# #     .addGrid(rfr.maxBins, [10,20,50]) \
# #     .build()

# cv = CrossValidator(estimator=rfr,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=evaluator,
#                           numFolds=3)  

# cvModel = cv.fit(train_data)

# predictions = cvModel.transform(test_data)
# cm = predictions.select("prediction","label")

In [None]:
# rmse = evaluator.evaluate(predictions)
# print("RMSE on our test set: %g" % rmse)

In [None]:
# bestPipeline = cvModel.bestModel
# bestModel = bestPipeline.stages[1]

In [None]:
# #BEST HYPERPARAMETERS

# print('numTrees - ', bestModel.getNumTrees)
# print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
# print('maxBins - ', bestModel.getOrDefault('maxBins'))

In [None]:
# #feature importance
# importances = bestModel.featureImportances

# x_values = list(range(len(importances)))
# #x_values.orderBy(["value", "rank"], ascending=[1, 1])

# plt.bar(x_values, importances, orientation = 'vertical')
# plt.xticks(x_values, feature_list, rotation=90)
# plt.ylabel('Importance')
# plt.xlabel('Feature')
# plt.title('Feature Importances')

## Second selection of variables
- given that from the correlation part and the feature importance part above, duration and distance are esential, they were kept as variables (the running time was not too long so the bins were not useful). Only the 10 important features of the RF were selected. 

In [None]:
df_model = df
df_model = df_model.drop("fare_amount", "ehail_fee", "pickup_datetime", "dropoff_datetime", "PULocationID", "DOLocationID", "Zone_PU", "Zone_PU", "av_paid")

In [91]:
#selection of columns
cols_list = ['RatecodeID','tip_amount','trip_distance', 'duration',
 'tolls_amount','total_amount','Borough_PU','Borough_DO',
 'year','month','speed', 'payment_type']

df_model = df_model.select(cols_list)

In [92]:
#categorical data
cat_cols = ['RatecodeID', 'year','month','payment_type',
            'Borough_PU','Borough_DO']

In [93]:
#numerical data
num_cols = ['tolls_amount', 'tip_amount', 'speed', 'trip_distance','duration']

### Transform categorical data 

In [94]:
stages = []
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoderEstimator(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]
    
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]


#### Vector assemble for the model

In [95]:
#features
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

In [96]:
#target change the name a label 
df_model = df_model.withColumnRenamed("total_amount", "label")


In [97]:
stages += [assembler]

## Pipeline

In [98]:
pipeline = Pipeline(stages=stages)

In [99]:
pipeline_model = pipeline.fit(df_model)

In [100]:
df_model = pipeline_model.transform(df_model)

In [101]:
#selectng the two columns that we need to pyspark
df_model.select(['label', 'features']).show

<bound method DataFrame.show of +-----+--------------------+
|label|            features|
+-----+--------------------+
| 13.8|(36,[0,5,8,18,22,...|
| 14.3|(36,[0,5,6,17,21,...|
| 18.3|(36,[0,5,8,18,22,...|
| 12.3|(36,[0,5,6,17,21,...|
| 16.3|(36,[0,5,8,18,22,...|
| 10.8|(36,[0,5,6,18,21,...|
|18.95|(36,[0,5,8,17,22,...|
| 11.8|(36,[0,5,6,17,21,...|
| 22.8|(36,[0,5,8,18,22,...|
| 14.8|(36,[0,5,6,18,21,...|
|15.39|(36,[0,5,8,17,22,...|
|15.96|(36,[0,5,6,17,21,...|
| 16.8|(36,[0,5,8,18,22,...|
|19.32|(36,[0,5,6,17,21,...|
| 16.8|(36,[0,5,8,18,22,...|
| 14.3|(36,[0,5,6,18,21,...|
| 30.0|(36,[0,5,8,17,22,...|
|15.96|(36,[0,5,6,17,21,...|
|29.96|(36,[2,5,8,17,22,...|
|  9.8|(36,[0,5,6,18,21,...|
+-----+--------------------+
only showing top 20 rows
>

## Split data

In [102]:
#where for the last 3 months of 2020
train_data = df_model.where((df_model.year ==2019) | (df_model.year == 2020) & (df_model.month <10))
test_data = df_model.where((df_model.year == 2020) & (df_model.month >9))

In [103]:
train_data.count()

69971105

In [104]:
test_data.count()

3326308

## Models

In [None]:
rfr = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", seed = 12 )
#numTrees=10,     

model3 = rfr.fit(train_data)
predictions_rfr_train2 = model3.transform(train_data)
predictions_rfr_test2 = model3.transform(test_data)

In [None]:
# Import LinearRegression class
from pyspark.ml.regression import GeneralizedLinearRegression

# Define LinearRegression algorithm 
glr = GeneralizedLinearRegression(family="gaussian", link="identity",\
                                  maxIter=10, regParam=0.3, featuresCol="features",\
                                  labelCol="label")
                           
model4 = glr.fit(train_data)
predictions_glr_train2 = model4.transform(train_data)
predictions_glr_test2 = model4.transform(test_data)

In [None]:
#print("Coefficients: " + str(model2.coefficients))

In [None]:
#model4.summary

### Performance

In [None]:
# RMSE is used as evaluation metric
evaluator = RegressionEvaluator(predictionCol="prediction",\
                                labelCol="label",\
                                metricName ="rmse")
RMSE_rfr_test2= evaluator.evaluate(predictions_rfr_test2)
RMSE_rfr_train2 = evaluator.evaluate(predictions_rfr_train2)

RMSE_glr_test2= evaluator.evaluate(predictions_glr_test2)
RMSE_glr_train2 = evaluator.evaluate(predictions_glr_train2)

In [None]:
print("Random Forest")
print("Root Mean Squared Error (RMSE) on train data = %g" % RMSE_rfr_train2)
print("Root Mean Squared Error (RMSE) on test data = %g" % RMSE_rfr_test2)

Random Forest
Root Mean Squared Error (RMSE) on train data = 2.90293
Root Mean Squared Error (RMSE) on test data = 2.68506


In [None]:
print("Linear Regression")
print("Root Mean Squared Error (RMSE) on train data = %g" % RMSE_glr_train2)
print("Root Mean Squared Error (RMSE) on test data = %g" % RMSE_glr_test2)

Linear Regression
Root Mean Squared Error (RMSE) on train data = 2.642
Root Mean Squared Error (RMSE) on test data = 2.24827


The RMSE is higher for this second selection of variables for both models. Linear regression is again better than RF. The training part was faster than with all the variables

### Feature Importance

In [110]:
model3.featureImportances

SparseVector(36, {0: 0.0322, 1: 0.067, 2: 0.0015, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 0.0, 15: 0.0, 17: 0.0034, 18: 0.0026, 19: 0.0, 21: 0.0109, 22: 0.0001, 23: 0.0, 24: 0.0, 26: 0.0003, 27: 0.0011, 28: 0.0003, 29: 0.0, 30: 0.0, 31: 0.0618, 32: 0.176, 33: 0.0152, 34: 0.3189, 35: 0.3087})

In [35]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [111]:
ExtractFeatureImp(model3.featureImportances, df_model, "features").head(10)

Unnamed: 0,idx,name,score
3,34,trip_distance,0.318861
4,35,duration,0.308683
1,32,tip_amount,0.176001
6,1,RatecodeID_ohe_2,0.067019
0,31,tolls_amount,0.061831
5,0,RatecodeID_ohe_1,0.032221
2,33,speed,0.015163
26,21,Borough_PU_ohe_Manhattan,0.010901
22,17,payment_type_ohe_1,0.003389
23,18,payment_type_ohe_2,0.002637


The feature importances change in comparison with the Random Forest above because we only select some variables. However, trip distance and duration still be the most important variables (it is expected because they have a high correlation with total amount).

### Changing manually parameters of Random Forest 

In [68]:
#default parameters for random forest model
print(model.explainParam("maxDepth")) #shorter is more generalizable
print(model.explainParam("maxBins"))  #more bins more time processing but more exactly
print(model.explainParam("numTrees")) #more trees to make the final "votation"
#maxDepth=5, maxBins=32,numTrees=20

maxDepth: Maximum depth of the tree. (Nonnegative) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5)
maxBins: Max number of bins for discretizing continuous features.  Must be at least 2 and at least number of categories for any categorical feature. (default: 32)
numTrees: Number of trees to train (at least 1) (default: 20)


In [None]:
rfr2 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 10, maxBins = 10)
rfr3 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 5, maxBins = 10)
rfr4 = RandomForestRegressor(featuresCol="features",\
                            labelCol="label", numTrees = 10, seed =12, \
                           maxDepth = 10, maxBins = 20)
                           
model_rfr2 = rfr2.fit(train_data)
predictions_rfr_train_2 = model_rfr2.transform(train_data)
predictions_rfr_test_2 = model_rfr2.transform(test_data)

model_rfr3 = rfr3.fit(train_data)
predictions_rfr_train_3 = model_rfr3.transform(train_data)
predictions_rfr_test_3 = model_rfr3.transform(test_data)

model_rfr4 = rfr4.fit(train_data)
predictions_rfr_train_4 = model_rfr4.transform(train_data)
predictions_rfr_test_4 = model_rfr4.transform(test_data)

In [None]:
#RMSE
print("N1")
print( evaluator.evaluate(predictions_rfr_train_2))
print(evaluator.evaluate(predictions_rfr_test_2))

print("N2")
print( evaluator.evaluate(predictions_rfr_train_3))
print(evaluator.evaluate(predictions_rfr_test_3))

print("N3")
print( evaluator.evaluate(predictions_rfr_train_4))
print(evaluator.evaluate(predictions_rfr_test_4))

N1
2.885344558669312
2.6563713201039247
N2
3.543046329107338
3.232685066905132
N3
2.3122678003024904
2.178478943270714


The last Random Forest(RF) is better than the linear regression and the other RF models of the second selection of variables. It could be realised that with only 12 variables the model performance is very similar to one with 21 (first selection of variables), therefore, if we want a faster training and evaluation, it is better to use a simple model.

In [114]:
#feature importance of the best model above
ExtractFeatureImp(model_rfr4.featureImportances, df_model, "features").head(10)

Unnamed: 0,idx,name,score
4,35,duration,0.304888
3,34,trip_distance,0.268635
1,32,tip_amount,0.189494
0,31,tolls_amount,0.090571
6,1,RatecodeID_ohe_2,0.073165
5,0,RatecodeID_ohe_1,0.019647
2,33,speed,0.017491
26,21,Borough_PU_ohe_Manhattan,0.015998
27,22,Borough_PU_ohe_Queens,0.003867
7,2,RatecodeID_ohe_5,0.003376


As before, Grid Search was not used because it takes a long time

### Conclusion from the models and selection of variables

The Linear regression model had a good performance considering almost all the variables of the data and with few variables (both sets). Therefore, it could be realised that the predictors have a linear relationship with the outcome. Because this type of model is simple than de RF and fast, it is a good option to use with Big Data in this type of escenario (linear relationship). 

The default Random Forest model perform worse than the linear regresion with all the variables and with few variables. However, after parameter tuning it has a better performance than the linear regression with few variables. Therefore, this model with few variables, is also handy to use when we are working with Big Data. However, we need to tune the parameters and that could take time.

Finally, both models have a less RMSE in test data and probably it is because its size. It is only 5% of the total data and therefore it might have less noise (weird cases of trips - outliers) than the train set. Probably the distribution of the data it is not the same. Therefore, the given split affected the performance and for a real case, we need to consider that and may be change the split.

In [135]:
spark.stop()