In [0]:
#creating SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('TDSQL').getOrCreate()

In [0]:
#creating dataframe
df = spark.read.csv('/FileStore/tables/taxi_data.csv',inferSchema=True,header=True)

In [0]:
display(df.select(['tip_amount']))

tip_amount
2.5
1.0
4.96
3.06
1.0
3.25
3.0
2.75
2.06
2.46


In [0]:
from pyspark.sql.functions import log
#df_tip = data.select(['tip_amount_ceil'])
df_tip_log = df.select(log(10.0, df.tip_amount).alias('log-tip'))
display(df_tip_log)

log-tip
0.3979400086720376
0.0
0.6954816764901973
0.4857214264815799
0.0
0.5118833609788743
0.4771212547196623
0.4393326938302626
0.3138672203691534
0.3909351071033791


In [0]:
data=df.dropna()
data.count()

Out[7]: 995134

In [0]:
#Converted Date columns from StringType to timestamp 
from pyspark.sql.functions import *

spark.conf.set("spark.sql.legacy.timeParserPolicy", "Legacy")
data=data.withColumn('tpep_pickup_datetime',to_timestamp(df.tpep_pickup_datetime, 'MM/dd/yyyy HH:mm')).\
        withColumn('tpep_dropoff_datetime',to_timestamp(df.tpep_dropoff_datetime, 'MM/dd/yyyy HH:mm'))

In [0]:
import pyspark.sql.functions as f

# Engineered features
# Get rid of 27 categories into a dummy for condition. bad conditions include any of the categories below
# Temperature should only make a difference when causes an inconvenience/discomfort
# Get date, year, month, hour, day from tpep_pickup to perform analysis 
# Get trip duration and Covid

data = data.withColumn("good_condition", when(f.col("condition") == "Snow", 0) \
                        .when(f.col("condition") == "Rain / Windy", 0) \
                        .when(f.col("condition") == "Heavy Rain", 0) \
                        .when(f.col("condition") == "Rain", 0) \
                        .when(f.col("condition") == "Heavy T-Storm", 0) \
                        .when(f.col("condition") == "Thunder in the Vicinity", 0) \
                        .when(f.col("condition") == "Thunder", 0) \
                        .when(f.col("condition") == "Light Rain with Thunder", 0) \
                        .when(f.col("condition") == "Thunder / Windy", 0) \
                        .when(f.col("condition") == "T-Storm", 0) \
                        .otherwise(1)) \
            .withColumn("extreme_temp", when((f.col("temperature") > 86) | (f.col("temperature") < 21), 1).otherwise(0)) \
            .withColumn('date',to_date(data.tpep_pickup_datetime)) \
            .withColumn('year',year(data.tpep_pickup_datetime)) \
            .withColumn('month',month(data.tpep_pickup_datetime)) \
            .withColumn('hour', hour(data.tpep_pickup_datetime)) \
            .withColumn('day', dayofweek(data.tpep_pickup_datetime)) \
            .withColumn('trip_time', unix_timestamp(data.tpep_dropoff_datetime) - unix_timestamp(data.tpep_pickup_datetime)) \
            .withColumn('covid', when(f.col("tpep_pickup_datetime") > "2020-03-08 00:00:00", 1).otherwise(0)) \
            .withColumn('tip_amount_ceil', ceil('tip_amount').cast("int"))
           

#data.schema
data.show(1) 

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+--------------+----------+---------+-------+------------+--------------+----------+--------------+----------+---------+-------+------------+--------------+-----------+-----------+--------+----------+--------+------+----------+--------------+------------+----------+----+-----+----+---+---------+-----+---------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|borough_pu|median_rlst_pu|tourist_pu|entert_pu|park_pu|workplace_pu|residential_pu|borough_do|median_rlst_do|tourist_do|entert_do|park_do|workplace_do|resi

In [0]:
data.schema

Out[11]: StructType(List(StructField(vendorid,IntegerType,true),StructField(tpep_pickup_datetime,TimestampType,true),StructField(tpep_dropoff_datetime,TimestampType,true),StructField(passenger_count,IntegerType,true),StructField(trip_distance,DoubleType,true),StructField(ratecodeid,IntegerType,true),StructField(store_and_fwd_flag,StringType,true),StructField(pulocationid,IntegerType,true),StructField(dolocationid,IntegerType,true),StructField(payment_type,IntegerType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(borough_pu,StringType,true),StructField(median_rlst_pu,IntegerType,true),StructField(tourist_pu,IntegerType,true),StructField(entert_pu,IntegerType,true),StructField(park_p

In [0]:
data.select(['tip_amount_ceil']).show(5)

+---------------+
|tip_amount_ceil|
+---------------+
|              3|
|              1|
|              5|
|              4|
|              1|
+---------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import log
df_tip_ceil_log = data.select(log(10.0, data.tip_amount_ceil).alias('log-tip_ceil'))
df_tip_ceil_log.show(5)
#display(df_tip_ceil_log)

+------------------+
|      log-tip_ceil|
+------------------+
|0.4771212547196623|
|               0.0|
|0.6989700043360187|
|0.6020599913279623|
|               0.0|
+------------------+
only showing top 5 rows



In [0]:
# Create a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3])

In [0]:
# Import the required libraries

#from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline

In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data

borough_pu_indexer = StringIndexer(inputCol='borough_pu',outputCol='borough_pu_index',handleInvalid='keep')
borough_do_indexer = StringIndexer(inputCol='borough_do',outputCol='borough_do_index',handleInvalid='keep')



In [0]:
assembler = VectorAssembler(inputCols=['fare_amount','passenger_count','borough_pu_index','tourist_pu','entert_pu','workplace_pu','residential_pu','borough_do_index','tourist_do','entert_do','workplace_do','residential_do','covid','good_condition','extreme_temp','hour','month'],outputCol="features")

In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. 

pipe = Pipeline(stages=[borough_pu_indexer,borough_do_indexer,assembler])

In [0]:
fitted_pipe=pipe.fit(train_data)

In [0]:
train_data=fitted_pipe.transform(train_data)

In [0]:
# Create an object for the Generalized Linear Regression model

glr_model = GeneralizedLinearRegression(family="poisson", link="log" , labelCol='tip_amount_ceil')

In [0]:
# Fit the model on the train data

fit_model = glr_model.fit(train_data.select(['features','tip_amount_ceil']))

In [0]:
# Transform the test data using the model to predict the duration

test_data=fitted_pipe.transform(test_data)

In [0]:
# Store the results in a dataframe

results = fit_model.transform(test_data)

In [0]:
results.show(1)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+--------------+----------+---------+-------+------------+--------------+----------+--------------+----------+---------+-------+------------+--------------+-----------+-----------+--------+----------+--------+------+---------+--------------+------------+----------+----+-----+----+---+---------+-----+---------------+----------------+----------------+--------------------+-----------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|borough_pu|median_rlst_pu|tourist_pu|entert_pu|park_pu|workplace_pu|residential_pu|

In [0]:
results.select(['features','tip_amount_ceil','prediction']).show()

+--------------------+---------------+------------------+
|            features|tip_amount_ceil|        prediction|
+--------------------+---------------+------------------+
|(17,[0,1,4,9,13,1...|              2| 2.644260575630717|
|(17,[0,1,4,9,13,1...|              4| 2.895028532801097|
|(17,[0,1,4,8,13,1...|              3|3.3986018649164196|
|(17,[0,1,4,7,11,1...|              0|3.9120400518720615|
|(17,[0,1,4,9,13,1...|              4|2.8776272428721303|
|(17,[0,1,7,8,13,1...|             14| 6.119700110997666|
|(17,[0,1,3,9,13,1...|              3| 3.463515421752912|
|(17,[0,1,4,9,13,1...|              1|2.7068602316157206|
|(17,[0,1,4,9,13,1...|              3|2.8084383324085764|
|(17,[0,1,4,13,15,...|              6|3.0893292678204505|
|(17,[0,1,4,9,13,1...|              2| 2.843137818680575|
|(17,[0,1,4,13,15,...|              3|2.7156105497548775|
|(17,[0,1,4,8,13,1...|              0|3.4525125004885133|
|(17,[0,1,4,8,13,1...|              1|3.4845610143905947|
|(17,[0,1,13,1

In [0]:
test_results = fit_model.evaluate(test_data)

#### Evaluation

In [0]:
summary=fit_model.summary

In [0]:
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

Coefficient Standard Errors: [1.6278357205470026e-05, 0.0005538570542960908, 0.0015084287404648492, 0.0023923775001568615, 0.0021451391337023356, 0.004460151925083748, 0.004095268109395218, 0.000956113262323575, 0.0024755168560088203, 0.0020442183591754923, 0.0042062401488725115, 0.0026502040894016785, 0.0035935171896009028, 0.004481056293597297, 0.003147240438785707, 0.00010816783263138232, 0.0001737402060879272, 0.005582114275043301]
T Values: [754.3594583682736, 8.34136253996289, 117.1783781652853, 125.18165725004255, 31.830682218617568, 39.25545234181084, -33.40358494813812, 105.10768240276364, 110.60283955872674, 22.44941176478589, 36.33928970089427, 30.730364652331684, -39.62286483264016, -2.8325807549634083, 1.0989907610059368, 28.894202096691846, 14.669930798517976, 137.9678211320088]
P Values: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.004617390144040723, 0.2717720958182772, 0.0, 0.0, 0.0]
Dispersion: 1.0
Null Deviance: 1176962.5285210568
Residual Degr