# Build Predictive Model(s)

In this workbook, you will read the merged dataset you created previously and you will create transformer, estimators and pipelines to build a binary classification model to predict wether a trip has a tip or not.

## Instructions:

1. Read in your merged dataset
2. Use transformes and encoders to perform feature engineering
3. Split into training and testing
4. Build `LogisticRegression` model(s) and train them using pipelines
5. Evaluate the performance of the model(s) using `BinaryClassificationMetrics`

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab-ml").getOrCreate()

In [2]:
spark

In [3]:
join_df = spark.read.parquet("s3://jiaruxu-bigdatasummer/a4/merged_data/")

In [4]:
join_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [5]:
# convert tip amount to double type to be able to apply binarizer
join_df = join_df.withColumn('tip_amount',join_df['tip_amount'].cast('double'))

In [6]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler,Binarizer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml import Pipeline, Model
from pyspark.sql.functions import to_timestamp, year, month, dayofweek

In [7]:
# convert our target to a binary feature

In [8]:
bi_tip = Binarizer(threshold=0.0, inputCol="tip_amount", outputCol="labels")

In [9]:
join_df = bi_tip.transform(join_df)

In [10]:
# transform timestamp features into year,month and dayofweek 

In [11]:
# for pickup datetime
join_df = join_df.withColumn('pickup_year', year('pickup_datetime'))
join_df = join_df.withColumn('pickup_month', month('pickup_datetime'))
join_df = join_df.withColumn('pickup_dayofweek', dayofweek('pickup_datetime'))

#for dropoff datetime
join_df = join_df.withColumn('dropoff_year', year('dropoff_datetime'))
join_df = join_df.withColumn('dropoff_month', month('dropoff_datetime'))
join_df = join_df.withColumn('dropoff_dayofweek', dayofweek('dropoff_datetime'))

In [12]:
# join_df.show(10)

In [13]:
join_df = join_df.filter("medallion != ''" ).filter("hack_license != ''" ) \
                 .filter("vendor_id != ''" ).filter("store_and_fwd_flag != ''" ) \
                 .filter("pickup_longitude != ''" ).filter("pickup_latitude != ''" ) \
                 .filter("dropoff_longitude != ''" ).filter("dropoff_latitude != ''" ) \
                 .filter("payment_type != ''" ).fillna(0)

In [14]:
join_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = false)
 |-- trip_distance: float (nullable = false)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = false)
 |-- surcharge: float (nullable = false)
 |-- mta_tax: float (nullable = false)
 |-- tip_amount: double (nullable = false)
 |-- tolls_amount: float (nullable = false)
 |-- total_amount: float (nullable = false)
 |-- labels: double (nullable = false)
 |-- 

In [15]:
splitted_data = join_df.randomSplit([0.8, 0.18, 0.02], 666)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 69327594
Number of testing records : 15601460
Number of prediction records : 1731860


In [16]:
# timestampcol = ['pickup_datetime','dropoff_datetime']
#                 ---- >
#               ['pickup_year','pickup_month','pickup_dayofweek',
#                'dropoff_year', 'dropoff_month', 'dropoff_dayofweek']

# intcol = ['passenger_count','rate_code']

# floatcol = ['trip_time_in_secs','trip_distance','fare_amount','surcharge',
#            'mta_tax','tip_amount','tolls_amount','total_amount']

# strcol = ['medallion','hack_license','vendor_id','store_and_fwd_flag','pickup_longitude',
#           'pickup_latitude','dropoff_longitude','dropoff_latitude','payment_type']

In [17]:
#stringIndexer
stringIndexer_medallion = StringIndexer(inputCol="medallion", outputCol="medallion_IX").setHandleInvalid("keep")
stringIndexer_hack_license = StringIndexer(inputCol="hack_license", outputCol="hack_license_IX").setHandleInvalid("keep")
stringIndexer_vendor_id = StringIndexer(inputCol="vendor_id", outputCol="vendor_id_IX").setHandleInvalid("keep")
stringIndexer_store_and_fwd_flag = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_IX").setHandleInvalid("keep")
stringIndexer_pickup_longitude = StringIndexer(inputCol="pickup_longitude", outputCol="pickup_longitude_IX").setHandleInvalid("keep")
stringIndexer_pickup_latitude = StringIndexer(inputCol="pickup_latitude", outputCol="pickup_latitude_IX").setHandleInvalid("keep")
stringIndexer_dropoff_longitude = StringIndexer(inputCol="dropoff_longitude", outputCol="dropoff_longitude_IX").setHandleInvalid("keep")
stringIndexer_dropoff_latitude = StringIndexer(inputCol="dropoff_latitude", outputCol="dropoff_latitude_IX").setHandleInvalid("keep")
stringIndexer_payment_type = StringIndexer(inputCol="payment_type", outputCol="payment_type_IX").setHandleInvalid("keep")

In [18]:
vectorAssembler_features = VectorAssembler(
    inputCols=["medallion_IX", "hack_license_IX", "vendor_id_IX", "store_and_fwd_flag_IX",
              "pickup_longitude_IX","pickup_latitude_IX","dropoff_longitude_IX","dropoff_latitude_IX",
              "payment_type_IX", # 9 stringindexder columns
              "trip_time_in_secs","trip_distance", "fare_amount", "surcharge" , "mta_tax",
              "tolls_amount", "total_amount", # 7 float columns
               "passenger_count","rate_code", # 2 integer columns
               "pickup_year", "pickup_month","pickup_dayofweek",
               "dropoff_year","dropoff_month","dropoff_dayofweek"], #  6 timestamp transformed columns
    outputCol="features")

In [19]:
vectorAssembler_features

VectorAssembler_82892944eb1d

In [20]:
# join_df.select('tip_amount').filter((join_df.tip_amount == 0)).count()

In [21]:
# Logistic Regression
logit = LogisticRegression(labelCol="labels", featuresCol="features")

In [22]:
pipeline_logit = Pipeline(stages=[stringIndexer_medallion, stringIndexer_hack_license, 
                               stringIndexer_vendor_id, stringIndexer_store_and_fwd_flag,
                               stringIndexer_pickup_longitude, stringIndexer_pickup_latitude,
                               stringIndexer_payment_type, stringIndexer_dropoff_longitude,
                               stringIndexer_dropoff_latitude,
                               vectorAssembler_features, 
                               logit])

In [23]:
model_logit = pipeline_logit.fit(train_data)

In [24]:
# test_data

In [25]:
predictions = model_logit.transform(test_data)

In [32]:
# predictions.printSchema()

In [None]:
# save predictions to s3 bucket in case of exceeding memory limit

In [33]:
predictions.write.parquet('s3a://jiaruxu-bigdatasummer/a4/predictions')

In [34]:
spark.stop()

In [None]:
#########################################################################################################################

In [None]:
####################################### execute when restart the kernel #################################################

In [None]:
#########################################################################################################################

In [3]:
#read data from s3 bucket 
predictions = spark.read.parquet("s3://jiaruxu-bigdatasummer/a4/predictions/")

In [6]:
predictionAndLabels = predictions.select('prediction','labels').rdd

In [8]:
# predictionAndLabels = predictions.rdd.map(lambda lp: (lp.prediction, lp.labels))

In [9]:
# predictionAndLabels.take(10)

In [7]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
evaluatorLogit = BinaryClassificationMetrics(predictionAndLabels)

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the following cell, print the Area Under the Curve (AUC) for your binary classifier.

In [10]:
evaluatorLogit.areaUnderROC

0.9869663209409401

In the following cell, provide the code that saves your model your S3 bucket.

In [26]:
model_logit.save('s3a://jiaruxu-bigdatasummer/a4/model')

In [27]:
spark.stop()