# Predictive analysis of rides

In [2]:
import json
import pandas as pd
import numpy as np

with open('ultimate_data_challenge.json') as json_file:
    data = json.load(json_file)

data[0]

{'avg_dist': 3.67,
 'avg_rating_by_driver': 5.0,
 'avg_rating_of_driver': 4.7,
 'avg_surge': 1.1,
 'city': "King's Landing",
 'last_trip_date': '2014-06-17',
 'phone': 'iPhone',
 'signup_date': '2014-01-25',
 'surge_pct': 15.4,
 'trips_in_first_30_days': 4,
 'ultimate_black_user': True,
 'weekday_pct': 46.2}

In [3]:
spark

# Explore features

In [4]:
df = pd.DataFrame(data)
df.head()

Unnamed: 0,avg_dist,avg_rating_by_driver,avg_rating_of_driver,avg_surge,city,last_trip_date,phone,signup_date,surge_pct,trips_in_first_30_days,ultimate_black_user,weekday_pct
0,3.67,5.0,4.7,1.1,King's Landing,2014-06-17,iPhone,2014-01-25,15.4,4,True,46.2
1,8.26,5.0,5.0,1.0,Astapor,2014-05-05,Android,2014-01-29,0.0,0,False,50.0
2,0.77,5.0,4.3,1.0,Astapor,2014-01-07,iPhone,2014-01-06,0.0,3,False,100.0
3,2.36,4.9,4.6,1.14,King's Landing,2014-06-29,iPhone,2014-01-10,20.0,9,True,80.0
4,3.13,4.9,4.4,1.19,Winterfell,2014-03-15,Android,2014-01-27,11.8,14,False,82.4


In [5]:
df.describe()

Unnamed: 0,avg_dist,avg_rating_by_driver,avg_rating_of_driver,avg_surge,surge_pct,trips_in_first_30_days,weekday_pct
count,50000.0,49799.0,41878.0,50000.0,50000.0,50000.0,50000.0
mean,5.796827,4.778158,4.601559,1.074764,8.849536,2.2782,60.926084
std,5.707357,0.446652,0.617338,0.222336,19.958811,3.792684,37.081503
min,0.0,1.0,1.0,1.0,0.0,0.0,0.0
25%,2.42,4.7,4.3,1.0,0.0,0.0,33.3
50%,3.88,5.0,4.9,1.0,0.0,1.0,66.7
75%,6.94,5.0,5.0,1.05,8.6,3.0,100.0
max,160.96,5.0,5.0,8.0,100.0,125.0,100.0


In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 12 columns):
avg_dist                  50000 non-null float64
avg_rating_by_driver      49799 non-null float64
avg_rating_of_driver      41878 non-null float64
avg_surge                 50000 non-null float64
city                      50000 non-null object
last_trip_date            50000 non-null object
phone                     49604 non-null object
signup_date               50000 non-null object
surge_pct                 50000 non-null float64
trips_in_first_30_days    50000 non-null int64
ultimate_black_user       50000 non-null bool
weekday_pct               50000 non-null float64
dtypes: bool(1), float64(6), int64(1), object(4)
memory usage: 4.2+ MB


In [7]:
df['churned'] = np.where(df['trips_in_first_30_days']>0,0,1)
df.head(5)

Unnamed: 0,avg_dist,avg_rating_by_driver,avg_rating_of_driver,avg_surge,city,last_trip_date,phone,signup_date,surge_pct,trips_in_first_30_days,ultimate_black_user,weekday_pct,churned
0,3.67,5.0,4.7,1.1,King's Landing,2014-06-17,iPhone,2014-01-25,15.4,4,True,46.2,0
1,8.26,5.0,5.0,1.0,Astapor,2014-05-05,Android,2014-01-29,0.0,0,False,50.0,1
2,0.77,5.0,4.3,1.0,Astapor,2014-01-07,iPhone,2014-01-06,0.0,3,False,100.0,0
3,2.36,4.9,4.6,1.14,King's Landing,2014-06-29,iPhone,2014-01-10,20.0,9,True,80.0,0
4,3.13,4.9,4.4,1.19,Winterfell,2014-03-15,Android,2014-01-27,11.8,14,False,82.4,0


In [8]:
df.churned.value_counts()

0    34610
1    15390
Name: churned, dtype: int64

# Load data into SparkSQL

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window 
from pyspark.sql.functions import row_number,lit

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# spark is an existing SparkSession
df = spark.read.json("ultimate_data_challenge.json")

df = df.na.drop()

# create a user_id column that enumerates through the users
w = Window().orderBy(lit('A')) #dummy value since no sorting of users is necessary
df = df.withColumn("user_id", row_number().over(w))

# show new df with user_id field
df.select('user_id','signup_date').show(5)

sc = spark.sparkContext

# register the DataFrame as a table.
df.createOrReplaceTempView("riders")

# create a churned users field
df.registerTempTable('riders')
new_df = sqlContext.sql("""
        SELECT *, CAST(IF(trips_in_first_30_days > 0, 0,1) AS STRING) as churned
        FROM riders 
""")

new_df.show(5)

+-------+-----------+
|user_id|signup_date|
+-------+-----------+
|      1| 2014-01-25|
|      2| 2014-01-29|
|      3| 2014-01-06|
|      4| 2014-01-10|
|      5| 2014-01-27|
+-------+-----------+
only showing top 5 rows

+--------+--------------------+--------------------+---------+--------------+--------------+-------+-----------+---------+----------------------+-------------------+-----------+-------+-------+
|avg_dist|avg_rating_by_driver|avg_rating_of_driver|avg_surge|          city|last_trip_date|  phone|signup_date|surge_pct|trips_in_first_30_days|ultimate_black_user|weekday_pct|user_id|churned|
+--------+--------------------+--------------------+---------+--------------+--------------+-------+-----------+---------+----------------------+-------------------+-----------+-------+-------+
|    3.67|                 5.0|                 4.7|      1.1|King's Landing|    2014-06-17| iPhone| 2014-01-25|     15.4|                     4|               true|       46.2|      1|      0|
|

# Vectorize categorical and numeric columns

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

cols = new_df.columns 

categoricalColumns = ["phone","city"]
stages = [] # stages in our Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [12]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="churned", outputCol="label")
stages += [label_stringIdx]

In [13]:
# Transform all features into a vector using VectorAssembler
numericCols = ["avg_dist",
               "avg_rating_by_driver",
               "avg_rating_of_driver",
               "avg_surge","surge_pct",
               "ultimate_black_user",
               "weekday_pct"]

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [15]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(new_df)
dataset = pipelineModel.transform(new_df)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
dataset.limit(10).toPandas()

Unnamed: 0,label,features,avg_dist,avg_rating_by_driver,avg_rating_of_driver,avg_surge,city,last_trip_date,phone,signup_date,surge_pct,trips_in_first_30_days,ultimate_black_user,weekday_pct,user_id,churned
0,0.0,"[1.0, 0.0, 0.0, 3.67, 5.0, 4.7, 1.1, 15.4, 1.0...",3.67,5.0,4.7,1.1,King's Landing,2014-06-17,iPhone,2014-01-25,15.4,4,True,46.2,1,0
1,1.0,"[0.0, 0.0, 1.0, 8.26, 5.0, 5.0, 1.0, 0.0, 0.0,...",8.26,5.0,5.0,1.0,Astapor,2014-05-05,Android,2014-01-29,0.0,0,False,50.0,2,1
2,0.0,"[1.0, 0.0, 1.0, 0.77, 5.0, 4.3, 1.0, 0.0, 0.0,...",0.77,5.0,4.3,1.0,Astapor,2014-01-07,iPhone,2014-01-06,0.0,3,False,100.0,3,0
3,0.0,"[1.0, 0.0, 0.0, 2.36, 4.9, 4.6, 1.14, 20.0, 1....",2.36,4.9,4.6,1.14,King's Landing,2014-06-29,iPhone,2014-01-10,20.0,9,True,80.0,4,0
4,0.0,"[0.0, 1.0, 0.0, 3.13, 4.9, 4.4, 1.19, 11.8, 0....",3.13,4.9,4.4,1.19,Winterfell,2014-03-15,Android,2014-01-27,11.8,14,False,82.4,5,0
5,0.0,"[1.0, 1.0, 0.0, 10.56, 5.0, 3.5, 1.0, 0.0, 1.0...",10.56,5.0,3.5,1.0,Winterfell,2014-06-06,iPhone,2014-01-09,0.0,2,True,100.0,6,0
6,0.0,"[1.0, 1.0, 0.0, 2.04, 5.0, 5.0, 1.0, 0.0, 0.0,...",2.04,5.0,5.0,1.0,Winterfell,2014-01-29,iPhone,2014-01-28,0.0,2,False,100.0,7,0
7,0.0,"[0.0, 1.0, 0.0, 4.36, 5.0, 4.5, 1.0, 0.0, 0.0,...",4.36,5.0,4.5,1.0,Winterfell,2014-02-01,Android,2014-01-21,0.0,2,False,100.0,8,0
8,0.0,"[1.0, 1.0, 0.0, 4.28, 4.9, 5.0, 1.0, 0.0, 1.0,...",4.28,4.9,5.0,1.0,Winterfell,2014-05-30,iPhone,2014-01-13,0.0,1,True,100.0,9,0
9,0.0,"[1.0, 1.0, 0.0, 3.81, 5.0, 4.0, 1.0, 0.0, 0.0,...",3.81,5.0,4.0,1.0,Winterfell,2014-01-10,iPhone,2014-01-06,0.0,3,False,100.0,10,0


# Logistic regression model 

In [31]:
from pyspark.ml.classification import LogisticRegression

# split into train and test data
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print('Rows in training data:',trainingData.count())
print('Rows in test data    :',testData.count())

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# set threshold for the probability above which to predict a 1
lr.setThreshold(0.5) # could use this if knew you had balanced data

# Train model with Training Data
lrModel = lr.fit(trainingData)

# get training summary used for eval metrics and other params
lrTrainingSummary = lrModel.summary

# Find the best model threshold if you would like to use that instead of the empirical positve rate
fMeasure = lrTrainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
lrBestThreshold = (fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)'])
                           .select('threshold').head()['threshold'])
  
print("Best threshold based on model performance on training data is {}".format(lrBestThreshold))

Rows in training data: 29013
Rows in test data    : 12432
Best threshold based on model performance on training data is 0.23844212949221064


In [32]:
# check current model prediction threshold
lrModel._java_obj.getThreshold()

0.5

In [41]:
# rebuild model with optimal threshold from training
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# set threshold for the probability above which to predict a 1
#lr.setThreshold(lrBestThreshold)
lr.setThreshold(0.28) 

# Train model with Training Data
lrModel = lr.fit(trainingData)

lrModel._java_obj.getThreshold()

0.28

In [42]:
# make predictions on test data
lrPredictions = lrModel.transform(testData)

# display predictions
lrPredictions.select("label", "prediction", "probability").limit(10).toPandas()

Unnamed: 0,label,prediction,probability
0,0.0,0.0,"[0.751255702285, 0.248744297715]"
1,0.0,0.0,"[0.736102251833, 0.263897748167]"
2,0.0,0.0,"[0.723467291181, 0.276532708819]"
3,0.0,0.0,"[0.723467291181, 0.276532708819]"
4,0.0,1.0,"[0.696607933726, 0.303392066274]"
5,0.0,1.0,"[0.696699541712, 0.303300458288]"
6,0.0,1.0,"[0.696397080554, 0.303602919446]"
7,0.0,1.0,"[0.696033902709, 0.303966097291]"
8,0.0,1.0,"[0.695064233473, 0.304935766527]"
9,0.0,1.0,"[0.694122321394, 0.305877678606]"


# Check model performance

In [43]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

def print_performance_metrics(predictions):
    # Evaluate model
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
    print("auc = {}".format(auc))
    print("aupr = {}".format(aupr))

    # get rdd of predictions and labels for mllib eval metrics
    predictionAndLabels = predictions.select("prediction","label").rdd

    # Instantiate metrics objects
    binary_metrics = BinaryClassificationMetrics(predictionAndLabels)
    multi_metrics = MulticlassMetrics(predictionAndLabels)

    # Area under precision-recall curve
    print("Area under PR = {}".format(binary_metrics.areaUnderPR))
    # Area under ROC curve
    print("Area under ROC = {}".format(binary_metrics.areaUnderROC))
    # Accuracy
    print("Accuracy = {}".format(multi_metrics.accuracy))
    # Confusion Matrix
    print(multi_metrics.confusionMatrix())

    ### Question 5.1 Answer ###

    # F1
    print("F1 = {}".format(multi_metrics.fMeasure()))
    # Precision
    print("Precision = {}".format(multi_metrics.precision()))
    # Recall
    print("Recall = {}".format(multi_metrics.recall()))
    # FPR
    print("FPR = {}".format(multi_metrics.falsePositiveRate(1.0)))
    # TPR
    print("TPR = {}".format(multi_metrics.truePositiveRate(1.0)))
  

In [44]:
print_performance_metrics(lrPredictions)

auc = 0.5748447147192751
aupr = 0.3319140758766116
Area under PR = 0.324764890696272
Area under ROC = 0.5601355747041783
Accuracy = 0.5791505791505791
DenseMatrix([[ 5390.,  3538.],
             [ 1694.,  1810.]])
F1 = 0.5791505791505791
Precision = 0.5791505791505791
Recall = 0.5791505791505791
FPR = 0.39628136200716846
TPR = 0.5165525114155252


# Random Forest model

In [25]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from itertools import chain

# Create initial Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [313]:
# Create ParamGrid for Cross Validation
rfParamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [3,5,7])
               .addGrid(rf.maxBins, [20, 40, 60])
               .addGrid(rf.numTrees, [10,20,30])
               .build())

print('Total Models (with all hyperparameter combinations):', len(rfParamGrid))

Total Models (with all hyperparameter combinations): 27


In [314]:
# set up an evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create CrossValidator
rfCv = CrossValidator(estimator=rf, estimatorParamMaps=rfParamGrid, evaluator=evaluator, numFolds=3)

# Run cross validations
rfCvModel = rfCv.fit(trainingData)

In [315]:
# look at best params from the CV
print(rfCvModel.bestModel._java_obj.getMaxDepth())
print(rfCvModel.bestModel._java_obj.getMaxBins())
print(rfCvModel.bestModel._java_obj.getNumTrees())

rfCvModel.bestModel._java_obj.extractParamMap()

7
60
30


JavaObject id=o87968

In [316]:
rfCvPredictions = rfCvModel.transform(testData)

rfCvPredictions.limit(10).toPandas()

Unnamed: 0,label,features,avg_dist,avg_rating_by_driver,avg_rating_of_driver,avg_surge,city,last_trip_date,phone,signup_date,surge_pct,trips_in_first_30_days,ultimate_black_user,weekday_pct,user_id,retained,rawPrediction,probability,prediction
0,0.0,"(1.0, 1.0, 0.0, 0.0, 5.0, 5.0, 1.0, 0.0, 0.0, ...",0.0,5.0,5.0,1.0,Winterfell,2014-01-25,iPhone,2014-01-24,0.0,1,False,0.0,393,1,"[19.1157995811, 10.8842004189]","[0.63719331937, 0.36280668063]",0.0
1,0.0,"(1.0, 0.0, 1.0, 0.0, 5.0, 3.0, 1.0, 0.0, 0.0, ...",0.0,5.0,3.0,1.0,Astapor,2014-01-25,iPhone,2014-01-25,0.0,1,False,0.0,1465,1,"[17.7389132779, 12.2610867221]","[0.591297109263, 0.408702890737]",0.0
2,0.0,"(1.0, 0.0, 1.0, 0.0, 5.0, 4.0, 1.0, 0.0, 0.0, ...",0.0,5.0,4.0,1.0,Astapor,2014-01-26,iPhone,2014-01-25,0.0,1,False,0.0,7765,1,"[18.0609295261, 11.9390704739]","[0.602030984203, 0.397969015797]",0.0
3,0.0,"(1.0, 0.0, 1.0, 0.0, 5.0, 4.0, 1.0, 0.0, 0.0, ...",0.0,5.0,4.0,1.0,Astapor,2014-02-16,iPhone,2014-01-18,0.0,1,False,0.0,19348,1,"[18.0609295261, 11.9390704739]","[0.602030984203, 0.397969015797]",0.0
4,0.0,"(1.0, 0.0, 0.0, 0.84, 4.0, 5.0, 1.0, 0.0, 0.0,...",0.84,4.0,5.0,1.0,King's Landing,2014-01-26,iPhone,2014-01-26,0.0,1,False,0.0,33093,1,"[13.9044512522, 16.0955487478]","[0.463481708408, 0.536518291592]",1.0
5,0.0,"(1.0, 0.0, 0.0, 0.95, 5.0, 5.0, 1.0, 0.0, 0.0,...",0.95,5.0,5.0,1.0,King's Landing,2014-01-12,iPhone,2014-01-11,0.0,1,False,0.0,6849,1,"[17.6187587468, 12.3812412532]","[0.587291958227, 0.412708041773]",0.0
6,0.0,"(1.0, 0.0, 0.0, 1.0, 5.0, 5.0, 1.0, 0.0, 0.0, ...",1.0,5.0,5.0,1.0,King's Landing,2014-01-19,iPhone,2014-01-18,0.0,1,False,0.0,10537,1,"[17.6187587468, 12.3812412532]","[0.587291958227, 0.412708041773]",0.0
7,0.0,"(1.0, 0.0, 0.0, 1.06, 5.0, 5.0, 1.0, 0.0, 0.0,...",1.06,5.0,5.0,1.0,King's Landing,2014-01-18,iPhone,2014-01-18,0.0,1,False,0.0,22446,1,"[17.6187587468, 12.3812412532]","[0.587291958227, 0.412708041773]",0.0
8,0.0,"(1.0, 0.0, 0.0, 1.22, 5.0, 5.0, 1.0, 0.0, 0.0,...",1.22,5.0,5.0,1.0,King's Landing,2014-01-25,iPhone,2014-01-25,0.0,1,False,0.0,35182,1,"[17.6187587468, 12.3812412532]","[0.587291958227, 0.412708041773]",0.0
9,0.0,"(1.0, 0.0, 0.0, 1.25, 4.0, 5.0, 1.0, 0.0, 0.0,...",1.25,4.0,5.0,1.0,King's Landing,2014-03-29,iPhone,2014-01-23,0.0,1,False,0.0,24725,1,"[17.1360681383, 12.8639318617]","[0.571202271277, 0.428797728723]",0.0


In [317]:
print_performance_metrics(rfCvPredictions)

auc = 0.6608010104427469
aupr = 0.3955089651435811
Area under PR = 0.3333809686549413
Area under ROC = 0.5005308832915992
Accuracy = 0.7176640926640927
DenseMatrix([[ 8912.,    16.],
             [ 3494.,    10.]])
F1 = 0.7176640926640927
Precision = 0.7176640926640927
Recall = 0.7176640926640927
FPR = 0.0017921146953405018
TPR = 0.0028538812785388126


In [318]:
# extract metadata
attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*trainingData
        .schema["features"]
        .metadata["ml_attr"]["attrs"].values())))


# combine with feature importance
rf_feature_coef = [(name, rfCvModel.bestModel.featureImportances[idx])
 for idx, name in attrs
 if rfCvModel.bestModel.featureImportances[idx]]

In [319]:
# convert to dataframe
rf_feature_tbl = pd.DataFrame(list(rf_feature_coef),columns=['feature','coefficient'])

# print out top features with largest coefficients
rf_feature_tbl.sort_values('coefficient', ascending=False).head(20)

Unnamed: 0,feature,coefficient
4,avg_rating_by_driver,0.250192
9,weekday_pct,0.227317
7,surge_pct,0.17987
6,avg_surge,0.12196
5,avg_rating_of_driver,0.085918
3,avg_dist,0.066314
1,cityclassVec_Winterfell,0.030271
2,cityclassVec_Astapor,0.013818
0,phoneclassVec_iPhone,0.012629
8,ultimate_black_user,0.011712


### The Logistic Regression model predicted churn with an accuracy of 57.9%, the Random Forest model showed an improved accuracy of 71.7%, and identified the top features (avg_rating_by_driver, weekday_pct, and surg_pct) that contributed to predicting whether a rider would churn or be retained.