In [0]:
#creating SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('TDSQL').getOrCreate()

In [0]:
#creating dataframe
df = spark.read.csv('/FileStore/tables/taxi_data.csv',inferSchema=True,header=True)

In [0]:
data=df.dropna()
data.count()

Out[3]: 995134

In [0]:
#Converted Date columns from StringType to timestamp 
from pyspark.sql.functions import *

spark.conf.set("spark.sql.legacy.timeParserPolicy", "Legacy")
data=data.withColumn('tpep_pickup_datetime',to_timestamp(data.tpep_pickup_datetime, 'MM/dd/yyyy HH:mm')).\
        withColumn('tpep_dropoff_datetime',to_timestamp(data.tpep_dropoff_datetime, 'MM/dd/yyyy HH:mm'))

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.functions import *

# Engineered features
# Get rid of 27 categories into a dummy for condition. bad conditions include any of the categories below
# Temperature should only make a difference when causes an inconvenience/discomfort
# Get date, year, month, hour, day from tpep_pickup to perform analysis 
# Get trip duration and Covid

data = data.withColumn("good_condition", when(f.col("condition") == "Snow", 0) \
                        .when(f.col("condition") == "Rain / Windy", 0) \
                        .when(f.col("condition") == "Heavy Rain", 0) \
                        .when(f.col("condition") == "Rain", 0) \
                        .when(f.col("condition") == "Heavy T-Storm", 0) \
                        .when(f.col("condition") == "Thunder in the Vicinity", 0) \
                        .when(f.col("condition") == "Thunder", 0) \
                        .when(f.col("condition") == "Light Rain with Thunder", 0) \
                        .when(f.col("condition") == "Thunder / Windy", 0) \
                        .when(f.col("condition") == "T-Storm", 0) \
                        .otherwise(1)) \
            .withColumn("extreme_temp", when((f.col("temperature") > 86) | (f.col("temperature") < 21), 1).otherwise(0)) \
            .withColumn('date',to_date(data.tpep_pickup_datetime)) \
            .withColumn('year',year(data.tpep_pickup_datetime)) \
            .withColumn('month',month(data.tpep_pickup_datetime)) \
            .withColumn('hour', hour(data.tpep_pickup_datetime)) \
            .withColumn('day', dayofweek(data.tpep_pickup_datetime)) \
            .withColumn('trip_time', unix_timestamp(data.tpep_dropoff_datetime) - unix_timestamp(data.tpep_pickup_datetime)) \
            .withColumn('covid', when(f.col("tpep_pickup_datetime") > "2020-03-08 00:00:00", 1).otherwise(0)) 

data.show(1)   

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+--------------+----------+---------+-------+------------+--------------+----------+--------------+----------+---------+-------+------------+--------------+-----------+-----------+--------+----------+--------+------+----------+--------------+------------+----------+----+-----+----+---+---------+-----+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|borough_pu|median_rlst_pu|tourist_pu|entert_pu|park_pu|workplace_pu|residential_pu|borough_do|median_rlst_do|tourist_do|entert_do|park_do|workplace_do|residential_do|  rat

In [0]:
df_tip = data.select("tip_amount")
df_tip.summary().show()

+-------+------------------+
|summary|        tip_amount|
+-------+------------------+
|  count|            995134|
|   mean| 3.048746259297771|
| stddev|2.8264469154619736|
|    min|               0.0|
|    25%|              1.75|
|    50%|              2.36|
|    75%|              3.45|
|    max|            411.38|
+-------+------------------+



In [0]:
# creating view from dataframe
data.createOrReplaceTempView("TD")

In [0]:
data_notip = spark.sql("select * from TD where tip_amount == 0")
data_tip = spark.sql("select * from TD where tip_amount > 0 and tip_amount <= 6 limit 85000")
data_hightip = spark.sql("select * from TD where tip_amount > 6 limit 83000")

In [0]:
data= data_notip.union(data_tip).union(data_hightip) 
data.count()

Out[9]: 206327

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.functions import *

data = data.withColumn('tip',when((f.col("tip_amount") > 6) , 2).when((f.col("tip_amount") == 0) , 0).otherwise(1)) 

data.groupBy('tip').count().show()           

+---+-----+
|tip|count|
+---+-----+
|  0|38327|
|  1|85000|
|  2|83000|
+---+-----+



In [0]:
# Create a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3])

In [0]:
# Import the required libraries

from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline

In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data

borough_pu_indexer = StringIndexer(inputCol='borough_pu',outputCol='borough_pu_index',handleInvalid='keep')
borough_do_indexer = StringIndexer(inputCol='borough_do',outputCol='borough_do_index',handleInvalid='keep')

In [0]:
assembler = VectorAssembler(inputCols=['fare_amount','passenger_count','borough_pu_index','borough_do_index','tourist_pu','entert_pu','workplace_pu','residential_pu','tourist_do','entert_do','workplace_do','residential_do','covid','good_condition','extreme_temp','hour','month'],outputCol="features")   

### Logistic Regression

In [0]:
# Create an object for the Logistic Regression model
from pyspark.ml.classification import LogisticRegression
lr_model = LogisticRegression(labelCol='tip')

In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously.

pipe = Pipeline(stages=[borough_pu_indexer,borough_do_indexer,assembler]) 
fitted_pipe=pipe.fit(train_data)
train_data=fitted_pipe.transform(train_data)

# Fit the model on the train data

fit_model = lr_model.fit(train_data.select(['features','tip']))

# Transform the test data using the model to predict the duration

test_data=fitted_pipe.transform(test_data)

# Store the results in a dataframe

results = fit_model.transform(test_data)

#### Evaluation

##### Accuracy

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
ACC_evaluator = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="accuracy")
accuracy = ACC_evaluator.evaluate(results)
print("The accuracy of the model is {}".format(accuracy))

The accuracy of the model is 0.7725469880688349


###### The Accuracy is 77% indicating that the models classification is really on the better side

##### F1 Score

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
ACC_evaluator = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="f1")
f1 = ACC_evaluator.evaluate(results)
print("The f1 of the model is {}".format(f1))

The f1 of the model is 0.7310230423927568


##### Confusion matrix

In [0]:
from sklearn.metrics import confusion_matrix

y_true = results.select("tip")
y_true = y_true.toPandas()

y_pred = results.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix \n {}".format(cnf_matrix))

Below is the confusion matrix 
 [[ 1571  7784  2158]
 [  820 23404  1194]
 [  599  1495 22746]]


###### The confusion matrix shows that the model classifies well

### NaiveBayes

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multiclass_models = [('Naive Bayes', NaiveBayes(labelCol='tip'))]

for name, model in multiclass_models:

    # Fit the model on the train data
    fit_model = model.fit(train_data.select(['features','tip']))
    
    # Transform the test data using the model to predict the duration
    results = fit_model.transform(test_data)

    #evaluation matrix of all multi class models
    ACC_evaluator = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="accuracy")
    accuracy = ACC_evaluator.evaluate(results)
      
    ACC_evaluator2 = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="f1")
    f1 = ACC_evaluator2.evaluate(results)
         
    print(name, accuracy, f1)

Naive Bayes 0.717019313269981 0.697220909827838


### Decision Tree and Random Forest

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multiclass_models = [ ('Decision Tree',DecisionTreeClassifier(labelCol='tip')),('Random Forest Classifier', RandomForestClassifier(labelCol='tip'))]

#pipe = Pipeline(stages=[borough_pu_indexer,borough_do_indexer,assembler])  
#fitted_pipe=pipe.fit(train_data)
#train_data=fitted_pipe.transform(train_data)
#test_data=fitted_pipe.transform(test_data)

for name, model in multiclass_models:

    # Fit the model on the train data
    fit_model = model.fit(train_data.select(['features','tip']))
    
    # Transform the test data using the model to predict the duration
    results = fit_model.transform(test_data)

    #evaluation matrix of all multi class models
    ACC_evaluator = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="accuracy")
    accuracy = ACC_evaluator.evaluate(results)
      
    ACC_evaluator2 = MulticlassClassificationEvaluator(labelCol="tip", predictionCol="prediction", metricName="f1")
    f1 = ACC_evaluator2.evaluate(results)
         
    print(name, accuracy, f1)
    print(fit_model.featureImportances)

Decision Tree 0.780333813601852 0.731456399365243
(17,[0,2,3,4,7,11,15],[0.9470337153946539,0.008105608491462992,0.005260937750786267,0.0012352366301691263,0.031244356120595076,0.006917546066785768,0.00020259954554681446])
Random Forest Classifier 0.7809813666607308 0.7300980990886989
(17,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],[0.5868186657285427,2.4997964103162824e-05,0.1666798950525914,0.09116858238010794,0.06629962046474405,0.009627029847371522,6.264963030325675e-06,0.018749383193863742,0.017837908157075814,0.02741953609994508,0.00010566999344310231,0.012649474892019173,0.00017847320414142751,8.653818763357875e-06,5.308225240151958e-06,0.002168076766384458,0.0002524592486324438])


######  Feature Importances : fare_amount, borough_pu, tourist_pu, residential_pu, borough_do, residential_do, tourist_do, entert_do, hour