In [1]:
'''
Taxi Data Model Implementation
Written by Patrick Butcher
Last edited 16/08/2021
'''

'\nTaxi Data Model Implementation\nWritten by Patrick Butcher\nLast edited 16/08/2021\n'

In [2]:
from pyspark import SparkContext

# Start the spark context
sc = SparkContext.getOrCreate(conf=swan_spark_conf)

21/08/16 09:42:57 WARN Utils: Your hostname, DESKTOP-M8PQ28M resolves to a loopback address: 127.0.1.1; using 192.168.86.205 instead (on interface wifi0)
21/08/16 09:42:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/16 09:42:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 62126)


In [3]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import folium

# a nice way of filtering out deprecated warnings
import warnings
warnings.filterwarnings("ignore")

# start a spark session (from spark tutrial)
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

In [5]:
# schema
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col

'''
From Spark Tutorial, 
Author: Akira Takihara Wang,
Edited by: Patrick Butcher
'''

feb_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-02.csv', header=True)

ints = ('VendorID', 'passenger_count', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type',)
doubles = ('trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
           'improvement_surcharge', 'total_amount', 'congestion_surcharge')
strings = ('store_and_fwd_flag', )
dtimes = ('tpep_pickup_datetime', 'tpep_dropoff_datetime', )

dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})
schema = StructType()

for column in feb_yellow_sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )

                                                                                

In [6]:
# read data staight into spark dataframe
jan_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-01.csv', header=True, schema=schema)
feb_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-02.csv', header=True, schema=schema)
mar_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-03.csv', header=True, schema=schema)
apr_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-04.csv', header=True, schema=schema)
may_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-05.csv', header=True, schema=schema)
jun_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-06.csv', header=True, schema=schema)
jul_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-07.csv', header=True, schema=schema)
aug_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-08.csv', header=True, schema=schema)
sep_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-09.csv', header=True, schema=schema)
oct_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-10.csv', header=True, schema=schema)
nov_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-11.csv', header=True, schema=schema)
dec_yellow_sdf = spark.read.csv('../Proj1data/yellow_tripdata_2019-12.csv', header=True, schema=schema)

jan_yellow_sdf_2020 = spark.read.csv('../Proj1data/yellow_tripdata_2020-01.csv', header=True, schema=schema)

In [7]:
# combine dataframes
yellow_data_2019 = jan_yellow_sdf.union(feb_yellow_sdf).union(mar_yellow_sdf).union(apr_yellow_sdf).union(may_yellow_sdf).union(jun_yellow_sdf).union(jul_yellow_sdf).union(aug_yellow_sdf).union(sep_yellow_sdf).union(oct_yellow_sdf).union(nov_yellow_sdf).union(dec_yellow_sdf)
yellow_data_2019.printSchema()

del jan_yellow_sdf
del feb_yellow_sdf
del mar_yellow_sdf
del apr_yellow_sdf
del may_yellow_sdf
del jun_yellow_sdf
del jul_yellow_sdf
del aug_yellow_sdf
del sep_yellow_sdf
del oct_yellow_sdf
del nov_yellow_sdf
del dec_yellow_sdf


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
'''
Author: Andy Upton
Accessed: 16/08/2021
Edited by Patrick Butcher
https://www.andyupton.net/blog/2019/6/12/feature-engineering-with-pyspark
'''
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_date, dayofweek
from pyspark.sql.functions import unix_timestamp

yellow_data_2019_wtemp_and_time = yellow_data_2019.withColumn('trip_time', unix_timestamp("tpep_dropoff_datetime") - unix_timestamp('tpep_pickup_datetime'))

# get max temperature for the day
max_temperatures_sdf = spark.read.csv('../Proj1data/maxTemperaturesNYC.csv', header=True)
# max_temperatures_sdf.select(to_date(max_temperatures_sdf.date).alias('date'))

max_temperatures_sdf = max_temperatures_sdf.withColumn('date', to_date(max_temperatures_sdf['date']))

print(max_temperatures_sdf.schema)

# add column to yellow data
# yellow_data_2019_w_temp = yellow_data_2019.withColumn('max_temp', when(yellow_data_2019.date, dic[date]))
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.withColumn('date', to_date(yellow_data_2019_wtemp_and_time.tpep_pickup_datetime))

yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.join(max_temperatures_sdf, on=['date'], how='left_outer')

## Same for 2020 
max_temperatures_sdf_2020 = spark.read.csv('../Proj1data/maxTemperaturesNYC2020.csv', header=True)
max_temperatures_sdf_2020 = max_temperatures_sdf_2020.withColumn('date', to_date(max_temperatures_sdf_2020['date']))

jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020.withColumn('trip_time', unix_timestamp("tpep_dropoff_datetime") - unix_timestamp('tpep_pickup_datetime'))
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.withColumn('date', to_date(jan_yellow_sdf_2020_wtemp_and_time.tpep_pickup_datetime))
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.join(max_temperatures_sdf_2020, on=['date'], how='left_outer')


StructType(List(StructField(date,DateType,true),StructField(maxTemp,StringType,true)))


In [9]:
# change trip time to int for efficiency
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.withColumn('trip_time', yellow_data_2019_wtemp_and_time.trip_time.cast('int'))
yellow_data_2019_wtemp_and_time.schema

jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.withColumn('trip_time', jan_yellow_sdf_2020_wtemp_and_time.trip_time.cast('int'))

# change temperature to double
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.withColumn('maxTemp', yellow_data_2019_wtemp_and_time.trip_time.cast('double'))
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.withColumn('maxTemp', jan_yellow_sdf_2020_wtemp_and_time.trip_time.cast('double'))

In [10]:
# filter data using filter

# filter fare amount to be between 0 and 600
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.fare_amount > 0) 
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.fare_amount < 200)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.fare_amount > 0) 
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.fare_amount < 200)

# filter trip distance to be between 0 and 150
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.trip_distance > 0)
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.trip_distance < 150)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.trip_distance > 0)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.trip_distance < 150)

# filter payment type to be cash or card only
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.payment_type != 3)
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.payment_type != 4)
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.payment_type != 5)
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.payment_type != 6)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.payment_type != 3)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.payment_type != 4)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.payment_type != 5)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.payment_type != 6)

# filter so rate code must be standard rate
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.RatecodeID == 1)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.RatecodeID == 1)

# If temperature is missing, remove instance
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.maxTemp.isNotNull())
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.maxTemp.isNotNull())

# # no trips an be > 2 hours
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.trip_time > 0)
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.filter(yellow_data_2019_wtemp_and_time.trip_time < 7200)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.trip_time > 0)
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.filter(jan_yellow_sdf_2020_wtemp_and_time.trip_time < 7200)


In [13]:
# change temp to double type
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.withColumn('maxTemp', yellow_data_2019_wtemp_and_time.trip_time.cast('double'))
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.withColumn('maxTemp', jan_yellow_sdf_2020_wtemp_and_time.trip_time.cast('double'))

In [11]:
# add a column of fare amount/trip time, so average dollar per second
yellow_data_2019_wtemp_and_time = yellow_data_2019_wtemp_and_time.withColumn('fare_amount_per_sec', (col("fare_amount") / col("trip_time")))
jan_yellow_sdf_2020_wtemp_and_time = jan_yellow_sdf_2020_wtemp_and_time.withColumn('fare_amount_per_sec', (col("fare_amount") / col("trip_time")))

In [12]:
from pyspark.ml.feature import Imputer
columns = ['fare_amount', 'total_amount', 'trip_time', 'trip_distance']
imputer = Imputer(inputCols=columns, outputCols=["{}_imputed".format(c) for c in yellow_data_2019_wtemp_and_time.columns]).setStrategy("median")

In [14]:
# perform one hot encoding for PULocation ID to make continuous and use for regression model.
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="PULocationID",
                        outputCol="PULocationIDVEC")
model = encoder.fit(yellow_data_2019_wtemp_and_time)
encoded = model.transform(yellow_data_2019_wtemp_and_time)



                                                                                

In [15]:
encodedTest = model.transform(jan_yellow_sdf_2020_wtemp_and_time)

In [20]:
training_data = encoded.select('trip_distance', 'maxTemp', 'trip_time', 'fare_amount')
test_data = encodedTest.select('trip_distance', 'maxTemp', 'trip_time', 'fare_amount')

In [21]:
columns = training_data.columns[0:3]
columns

['trip_distance', 'maxTemp']

In [19]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(outputCol="features")
vecAssembler.setInputCols(columns)

output = vecAssembler.transform(training_data)
final_df = output.select('features', 'fare_amount')

output_test = vecAssembler.transform(test_data)
final_test = output_test.select('features', 'fare_amount')

In [20]:
# implement model
from pyspark.ml.regression import LinearRegression
lm = LinearRegression(labelCol='fare_amount')
model = lm.fit(final_df)
pd.DataFrame({"Coefficients": model.coefficients}, index=columns)

21/08/15 22:07:40 WARN Instrumentation: [bdb904f7] regParam is zero, which might cause numerical instability and overfitting.
21/08/15 22:07:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/08/15 22:07:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/08/15 22:37:32 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
21/08/15 22:37:32 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Unnamed: 0,Coefficients
trip_distance,1.86785
trip_time,0.006048
maxTemp,2.4e-05


In [21]:
# evaluate residuals on test data
res = model.evaluate(final_test)

                                                                                

In [22]:
res.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|-0.05760607432359244|
|-0.00714043155607...|
|  0.5611334058348678|
| 0.17139044038055573|
|0.054512131203940495|
| 0.46304136726043765|
| -0.6778511430152356|
|  0.6956665329775138|
| -0.4110537134638399|
|-0.10649381223197096|
| -1.6138240713914485|
| -0.2413829802489449|
|-0.10894508973362704|
| 0.10545255535870801|
|  0.4274580390690055|
|-0.12816788284072267|
|  1.0962795023441565|
| -0.5179477987387706|
|   -0.53307882088329|
|-0.24057673524162304|
+--------------------+
only showing top 20 rows



In [25]:
# evaluation
print("Mean absolute error: " , res.meanAbsoluteError)
print("Mean squared error: " , res.meanSquaredError)
print("r-squared: ", res.r2)

Mean absolute error:  1.3757183648426903
Mean squared error:  507.47506900082357
r-squared:  -7.310418451391756


In [37]:
print("Mean absolute error: " , res.meanAbsoluteError)
print("Mean squared error: " , res.meanSquaredError)
print("r-squared: ", res.r2)

Mean absolute error:  1.3757204488335586
Mean squared error:  507.47990476363395
r-squared:  -7.310497641907588
