# Build Predictive Model(s)

In this workbook, you will read the merged dataset you created previously and you will create transformer, estimators and pipelines to build a binary classification model to predict wether a trip has a tip or not.

## Instructions:

1. Read in your merged dataset
2. Use transformes and encoders to perform feature engineering
3. Split into training and testing
4. Build `LogisticRegression` model(s) and train them using pipelines
5. Evaluate the performance of the model(s) using `BinaryClassificationMetrics`

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("a4-model").getOrCreate()

In [2]:
spark

In [86]:
# 1. Read in your merged dataset
df_data = spark.read.parquet('s3://bigdata6278exercise/merged.parquet')

In [87]:
df_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: float (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [88]:
# 2. Use transformers and encoders to perform feature engineering
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [89]:
## 2.1 first create the label whether the trip has a tip or not
## Values less than or equal to threshold is mapped to 0, else to 1.

from pyspark.ml.feature import Binarizer

df_data = df_data.withColumn('tip_amount',df_data.tip_amount.cast("double"))

tip_binarizer = Binarizer(threshold=0, inputCol="tip_amount", outputCol="label") 
df_data = tip_binarizer.transform(df_data)


In [90]:
df_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: float (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- label: double (nullable = true)



In [91]:
## 2.2 encode the string variable with StringIndexer
si_store = StringIndexer(inputCol="store_and_fwd_flag", outputCol="sff_ix").setHandleInvalid("skip")#.fit(df_data)
si_pmt = StringIndexer(inputCol="payment_type", outputCol="pmt_type_ix").setHandleInvalid("skip")

In [92]:
## 2.3 Extract information from the timestamps
from pyspark.sql.functions import year, month, dayofmonth, hour
df_data = df_data.select("*").withColumn('pick_year',year(df_data.pickup_datetime))\
                        .withColumn('pick_month',month(df_data.pickup_datetime))\
                        .withColumn('pick_day',dayofmonth(df_data.pickup_datetime))\
                        .withColumn('pick_hour',hour(df_data.pickup_datetime))\
                        .withColumn('drop_hour',hour(df_data.dropoff_datetime))


In [93]:
df_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: float (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- label: double (nullable = true)
 |-- pick_year: integ

In [94]:
## 2.4 create a feature vector by combining all string features together usinf the vectorAssembler method
vectorAssembler_features = VectorAssembler(
    inputCols=["sff_ix","pmt_type_ix", "passenger_count", "rate_code",
               'pick_year','pick_month','pick_day','pick_hour','drop_hour',
              'fare_amount','surcharge','mta_tax','tolls_amount','total_amount','passenger_count','trip_time_in_secs', 
              'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'], 
    outputCol="features")

In [95]:
# 3. Split into training and testing
splitted_data = df_data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 138548331
Number of testing records : 31174255
Number of prediction records : 3462505


In [96]:
# 4. Build LogisticRegression model(s) and train them using pipelines
log = LogisticRegression(labelCol="label", featuresCol="features",maxIter=10, regParam=0.01)

In [97]:
pipeline_log = Pipeline(stages=[si_store, si_pmt, 
                               vectorAssembler_features, 
                               log])

In [98]:
train_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: float (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- label: double (nullable = true)
 |-- pick_year: integ

In [99]:
model_log = pipeline_log.fit(train_data)

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the following cell, print the Area Under the Curve (AUC) for your binary classifier.

In [100]:
predictions = model_log.transform(test_data)
evaluatorLog = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluatorLog.evaluate(predictions)

print("Accuracy = %g" % accuracy)
#print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.988373


In the following cell, provide the code that saves your model your S3 bucket.

In [103]:
model_log.save("s3://bigdata6278exercise/model")