# Build Predictive Model(s)

In this workbook, you will read the merged dataset you created previously and you will create pipelines to build a binary classification model to predict wether a trip has a tip or not.

Instructions:

1. Read in your merged dataset
2. Use transformes and encoders to perform feature engineering
3. Split into training and testing
4. Build `LogisticRegression` model(s) and train them using pipelines
5. Evaluate the performance of the model(s) using `BinaryClassificationMetrics`

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [35]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab-ml").getOrCreate()

In [36]:
df_data = spark.read\
  .format('parquet')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://gu-502-course/trip_fare_combined')

In [37]:
df_data.show(1)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+---+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|tip|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+---+
|6F910ABF764B97720...|205994280

In [38]:
splitted_data = df_data.randomSplit([0.8, 0.20], seed=24)
train_data = splitted_data[0]
test_data = splitted_data[1]

In [39]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [106]:
stringIndexer_label = StringIndexer(inputCol="tip", outputCol="label", handleInvalid='skip')
stringIndexer_payment_type = StringIndexer(inputCol="payment_type",
                                           outputCol="payment_type_IX", handleInvalid='skip')
stringIndexer_saff = StringIndexer(inputCol="store_and_fwd_flag",
                                   outputCol="store_and_fwd_flag_IX", handleInvalid='skip')
encoder1 = OneHotEncoder(inputCol="store_and_fwd_flag_IX", outputCol="store_and_fwd_flag_vec")
encoder2 = OneHotEncoder(inputCol="payment_type_IX", outputCol="payment_type_vec")


In [113]:
vectorAssembler_features = VectorAssembler(
    inputCols=['passenger_count', 'rate_code', 
              'fare_amount', 'trip_distance', 'store_and_fwd_flag_IX', 'payment_type_IX'],
    outputCol='features', handleInvalid='skip')

In [103]:
lr = LogisticRegression(labelCol='label', featuresCol='features')

In [49]:
# Used to convert label indexes back into the actual labels.  
labelConverter = IndexToString(inputCol='prediction', outputCol='predictedLabel',
                             labels=stringIndexer_label.fit(df_data).labels)

In [114]:
pipeline_lr = Pipeline(stages=[stringIndexer_label, stringIndexer_payment_type, stringIndexer_saff,
                               encoder1, encoder2, vectorAssembler_features, lr, labelConverter])

In [115]:
model_lr = pipeline_lr.fit(train_data)

In [110]:
predictions = model_lr.transform(test_data)

In [111]:
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction',
                                         metricName='areaUnderROC')

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the following cell, print the Area Under the Curve (AUC) for your binary classifier.

In [112]:
auc = evaluator.evaluate(predictions)
auc

0.9815434407576723

In the following cell, provide the code that saves your model your S3 bucket.

In [116]:
model_lr.save("s3://gu-502-course/trip_fare_model")