In [147]:
# import libraries
from pyspark.sql import SparkSession
from pathlib import Path # better file paths# import libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Setup spark session

In [148]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName('Sparkify') \
    .getOrCreate()
spark# create a Spark session

# Read in data

In [194]:
user_data = Path.cwd() / "data" / "TRANSFORMED_mini_sparkify_event_data.csv" 
user_df = spark.read.csv(str(user_data), header='true', inferSchema="true")
user_df.head()

Row(userId=100010, churn=0, gender='F', subscription_level='free', page_upgraded=1, page_downgraded=0, auth_logged_in_cnt=381, auth_logged_out_cnt=0, auth_guest_cnt=0, status_404_cnt=0, status_307_cnt=31, page_next_song_cnt=0, page_thumbs_up_cnt=17, page_thumbs_down_cnt=5, page_playlist_cnt=7, page_friend_cnt=4, page_roll_ad_cnt=52, page_logout_cnt=5, page_help_cnt=2, artist_cnt=252, song_cnt=269, session_cnt=7)

In [150]:
user_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- churn: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- subscription_level: string (nullable = true)
 |-- page_upgraded: integer (nullable = true)
 |-- page_downgraded: integer (nullable = true)
 |-- auth_logged_in_cnt: integer (nullable = true)
 |-- auth_logged_out_cnt: integer (nullable = true)
 |-- auth_guest_cnt: integer (nullable = true)
 |-- status_404_cnt: integer (nullable = true)
 |-- status_307_cnt: integer (nullable = true)
 |-- page_next_song_cnt: integer (nullable = true)
 |-- page_thumbs_up_cnt: integer (nullable = true)
 |-- page_thumbs_down_cnt: integer (nullable = true)
 |-- page_playlist_cnt: integer (nullable = true)
 |-- page_friend_cnt: integer (nullable = true)
 |-- page_roll_ad_cnt: integer (nullable = true)
 |-- page_logout_cnt: integer (nullable = true)
 |-- page_help_cnt: integer (nullable = true)
 |-- artist_cnt: integer (nullable = true)
 |-- song_cnt: integer (nullable = true)
 |-- session

In [151]:
# a pandas version for visualizations...
pdf = user_df.toPandas()
pdf.head()

Unnamed: 0,userId,churn,gender,subscription_level,page_upgraded,page_downgraded,auth_logged_in_cnt,auth_logged_out_cnt,auth_guest_cnt,status_404_cnt,...,page_thumbs_up_cnt,page_thumbs_down_cnt,page_playlist_cnt,page_friend_cnt,page_roll_ad_cnt,page_logout_cnt,page_help_cnt,artist_cnt,song_cnt,session_cnt
0,100010,0,F,free,1,0,381,0,0,0,...,17,5,7,4,52,5,2,252,269,7
1,200002,0,M,paid,1,1,474,0,0,0,...,21,6,8,4,7,5,2,339,378,6
2,125,1,M,free,0,0,10,0,0,0,...,0,0,0,0,1,0,0,8,8,1
3,124,0,F,paid,0,1,4825,0,0,6,...,171,41,118,74,4,59,23,2232,3339,29
4,51,1,M,paid,0,1,2463,0,0,1,...,100,21,52,28,0,24,12,1385,1854,10


In [152]:
pdf.describe()

Unnamed: 0,userId,churn,page_upgraded,page_downgraded,auth_logged_in_cnt,auth_logged_out_cnt,auth_guest_cnt,status_404_cnt,status_307_cnt,page_next_song_cnt,page_thumbs_up_cnt,page_thumbs_down_cnt,page_playlist_cnt,page_friend_cnt,page_roll_ad_cnt,page_logout_cnt,page_help_cnt,artist_cnt,song_cnt,session_cnt
count,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0,225.0
mean,65391.013333,0.231111,0.746667,0.684444,1236.008889,0.0,0.0,1.12,103.04,0.0,55.782222,11.315556,29.004444,19.008889,17.48,14.337778,6.462222,696.377778,897.791111,14.115556
std,105396.477919,0.422483,0.43589,0.465773,1329.596992,0.0,0.0,1.472607,111.386727,0.0,65.477925,13.077481,32.716654,20.581717,21.550207,15.346348,7.242585,603.95187,896.387604,14.646885
min,2.0,0.0,0.0,0.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,3.0,1.0
25%,60.0,0.0,0.0,0.0,296.0,0.0,0.0,0.0,25.0,0.0,11.0,2.0,6.0,5.0,3.0,4.0,1.0,207.0,226.0,6.0
50%,116.0,0.0,1.0,1.0,847.0,0.0,0.0,1.0,70.0,0.0,35.0,7.0,17.0,14.0,11.0,11.0,5.0,555.0,634.0,10.0
75%,100017.0,0.0,1.0,1.0,1863.0,0.0,0.0,2.0,146.0,0.0,81.0,16.0,44.0,27.0,22.0,19.0,9.0,1098.0,1420.0,18.0
max,300025.0,1.0,1.0,1.0,9632.0,0.0,0.0,7.0,716.0,0.0,437.0,75.0,240.0,143.0,128.0,124.0,46.0,3544.0,5946.0,107.0


# Additional EDA on a few more features

In [153]:
# Toy example - Several histograms for the different values of one column
import plotly.express as px
df = px.data.tips()
fig = px.histogram(df, x="total_bill", color="sex")
fig.show()

In [154]:
# toy example - box plot
import plotly.graph_objects as go
import numpy as np

x0 = np.random.randn(50)
x1 = np.random.randn(50) + 2 # shift mean

fig = go.Figure()
# Use x instead of y argument for horizontal plot
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))

fig.show()

In [155]:
fig = px.histogram(pdf, x="artist_cnt", color="churn")
fig.show()

In [158]:
pdf.loc[pdf['churn'] == 0]

Unnamed: 0,userId,churn,gender,subscription_level,page_upgraded,page_downgraded,auth_logged_in_cnt,auth_logged_out_cnt,auth_guest_cnt,status_404_cnt,...,page_thumbs_up_cnt,page_thumbs_down_cnt,page_playlist_cnt,page_friend_cnt,page_roll_ad_cnt,page_logout_cnt,page_help_cnt,artist_cnt,song_cnt,session_cnt
0,100010,0,F,free,1,0,381,0,0,0,...,17,5,7,4,52,5,2,252,269,7
1,200002,0,M,paid,1,1,474,0,0,0,...,21,6,8,4,7,5,2,339,378,6
3,124,0,F,paid,0,1,4825,0,0,6,...,171,41,118,74,4,59,23,2232,3339,29
5,7,0,M,free,1,0,201,0,0,1,...,7,1,5,1,16,3,1,142,148,7
6,15,0,M,paid,0,1,2278,0,0,2,...,81,14,59,31,1,27,8,1302,1707,15
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
218,145,0,F,paid,0,1,1347,0,0,0,...,61,8,27,34,0,15,1,868,1040,15
219,50,0,F,paid,1,1,651,0,0,2,...,27,3,12,9,37,6,7,423,476,8
220,45,0,F,paid,1,1,1776,0,0,0,...,67,13,43,22,13,19,5,1057,1350,16
221,57,0,M,free,0,0,112,0,0,0,...,3,2,0,0,6,2,1,89,90,2


In [159]:
fig = go.Figure()
# Use x instead of y argument for horizontal plot
x0 = pdf.loc[pdf['churn'] == 0]["artist_cnt"]
x1 = pdf.loc[pdf['churn'] == 1]["artist_cnt"]
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))
# RECALL - 0 is not churn, and 1 is churn
fig.show()

In [160]:
fig = px.histogram(pdf, x="song_cnt", color="churn")
fig.show()

In [164]:
fig = go.Figure()
# Use x instead of y argument for horizontal plot
x0 = pdf.loc[pdf['churn'] == 0]["song_cnt"]
x1 = pdf.loc[pdf['churn'] == 1]["song_cnt"]
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))
# RECALL - 0 is not churn, and 1 is churn
fig.show()

In [162]:
fig = px.histogram(pdf, x="session_cnt", color="churn")
fig.show()

In [165]:
fig = go.Figure()
# Use x instead of y argument for horizontal plot
x0 = pdf.loc[pdf['churn'] == 0]["session_cnt"]
x1 = pdf.loc[pdf['churn'] == 1]["session_cnt"]
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))
# RECALL - 0 is not churn, and 1 is churn
fig.show()

In [166]:
fig = px.histogram(pdf, x="page_friend_cnt", color="churn")
fig.show()

In [167]:
fig = go.Figure()
# Use x instead of y argument for horizontal plot
x0 = pdf.loc[pdf['churn'] == 0]["page_friend_cnt"]
x1 = pdf.loc[pdf['churn'] == 1]["page_friend_cnt"]
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))
# RECALL - 0 is not churn, and 1 is churn
fig.show()

In [168]:
fig = px.histogram(pdf, x="page_help_cnt", color="churn")
fig.show()

In [169]:
fig = go.Figure()
# Use x instead of y argument for horizontal plot
x0 = pdf.loc[pdf['churn'] == 0]["page_help_cnt"]
x1 = pdf.loc[pdf['churn'] == 1]["page_help_cnt"]
fig.add_trace(go.Box(x=x0))
fig.add_trace(go.Box(x=x1))
# RECALL - 0 is not churn, and 1 is churn
fig.show()

* No obvious difference between the churn and non-churned users from these charts

# Random Forest

* The first model used Logistic regression, this time we will try Random Forest and see if we see an improvement in AUC.

## Categorical variables

In [170]:
user_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- churn: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- subscription_level: string (nullable = true)
 |-- page_upgraded: integer (nullable = true)
 |-- page_downgraded: integer (nullable = true)
 |-- auth_logged_in_cnt: integer (nullable = true)
 |-- auth_logged_out_cnt: integer (nullable = true)
 |-- auth_guest_cnt: integer (nullable = true)
 |-- status_404_cnt: integer (nullable = true)
 |-- status_307_cnt: integer (nullable = true)
 |-- page_next_song_cnt: integer (nullable = true)
 |-- page_thumbs_up_cnt: integer (nullable = true)
 |-- page_thumbs_down_cnt: integer (nullable = true)
 |-- page_playlist_cnt: integer (nullable = true)
 |-- page_friend_cnt: integer (nullable = true)
 |-- page_roll_ad_cnt: integer (nullable = true)
 |-- page_logout_cnt: integer (nullable = true)
 |-- page_help_cnt: integer (nullable = true)
 |-- artist_cnt: integer (nullable = true)
 |-- song_cnt: integer (nullable = true)
 |-- session

In [171]:
categorical_columns = ['gender','subscription_level']

In [234]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical_columns]
indexers

[StringIndexer_c7d897eefaf8, StringIndexer_fe62abaec9c8]

In [232]:
[index.getOutputCol() for index in indexers]

['gender_index', 'subscription_level_index']

In [233]:
encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[
        "{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
encoder

OneHotEncoderEstimator_65f0de974f75

In [175]:
encoder.getOutputCols()

['gender_index_encoded', 'subscription_level_index_encoded']

## Continuous variables

In [231]:
numeric_columns = [column for column in list(set(user_df.columns)-set(categorical_columns)-set(['userId','churn']))]
numeric_columns

['page_help_cnt',
 'page_friend_cnt',
 'auth_logged_in_cnt',
 'page_roll_ad_cnt',
 'status_404_cnt',
 'status_307_cnt',
 'auth_logged_out_cnt',
 'page_thumbs_down_cnt',
 'page_logout_cnt',
 'page_upgraded',
 'auth_guest_cnt',
 'song_cnt',
 'page_downgraded',
 'page_playlist_cnt',
 'page_next_song_cnt',
 'session_cnt',
 'artist_cnt',
 'page_thumbs_up_cnt']

* scaling not required for decision tree models such as Random Forest

In [230]:
assembler_numeric = VectorAssembler(inputCols=numeric_columns, outputCol="NumericFeatures")
assembler_numeric

VectorAssembler_a722abc9a4b9

In [227]:
all_feature_columns = ["NumericFeatures"] + encoder.getOutputCols()
all_feature_columns

['NumericFeatures', 'gender_index_encoded', 'subscription_level_index_encoded']

In [229]:
assembler_all = VectorAssembler(inputCols=all_feature_columns, outputCol="features")
assembler_all

VectorAssembler_563cc81d2a2f

In [180]:
rf = RandomForestClassifier(labelCol="churn", featuresCol="features")

In [181]:
pipeline = Pipeline(stages=[indexers, encoder, assembler_numeric, assembler_all, rf])
pipeline

Pipeline_013287c22109

## Cross-validation and grid builder

In [182]:
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 5]).build()
paramGrid

[{Param(parent='RandomForestClassifier_ab3c3aaa4085', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
 {Param(parent='RandomForestClassifier_ab3c3aaa4085', name='numTrees', doc='Number of trees to train (>= 1).'): 5}]

In [183]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3) 
crossval

CrossValidator_8d2eba143c77

* We will train on the entire dataset using cv and use the mini dataset as the holdout/test set.

In [184]:
training = user_df
training.show(5)

+------+-----+------+------------------+-------------+---------------+------------------+-------------------+--------------+--------------+--------------+------------------+------------------+--------------------+-----------------+---------------+----------------+---------------+-------------+----------+--------+-----------+
|userId|churn|gender|subscription_level|page_upgraded|page_downgraded|auth_logged_in_cnt|auth_logged_out_cnt|auth_guest_cnt|status_404_cnt|status_307_cnt|page_next_song_cnt|page_thumbs_up_cnt|page_thumbs_down_cnt|page_playlist_cnt|page_friend_cnt|page_roll_ad_cnt|page_logout_cnt|page_help_cnt|artist_cnt|song_cnt|session_cnt|
+------+-----+------+------------------+-------------+---------------+------------------+-------------------+--------------+--------------+--------------+------------------+------------------+--------------------+-----------------+---------------+----------------+---------------+-------------+----------+--------+-----------+
|100010|    0|     

In [185]:
cvModel = crossval.fit(user_df)
cvModel

TypeError: copy() takes no arguments (1 given)

In [None]:
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="churn", outputCol="indexedLabel").fit(data)

In [None]:
# Output model

In [None]:
# Export predictions to a database

In [186]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [195]:
data = user_df

In [196]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
data = StringIndexer(inputCol="churn", outputCol="indexedLabel").fit(data).transform(data)
data.select(["churn","indexedLabel"]).show(5)

+-----+------------+
|churn|indexedLabel|
+-----+------------+
|    0|         0.0|
|    0|         0.0|
|    1|         1.0|
|    0|         0.0|
|    1|         1.0|
+-----+------------+
only showing top 5 rows



In [197]:
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="NumericFeatures")
data = assembler.transform(data)
data.select("NumericFeatures", "indexedLabel").show(truncate=False)

+----------------------------------------------------------------------------------------------+------------+
|NumericFeatures                                                                               |indexedLabel|
+----------------------------------------------------------------------------------------------+------------+
|[2.0,4.0,381.0,52.0,0.0,31.0,0.0,5.0,5.0,1.0,0.0,269.0,0.0,7.0,0.0,7.0,252.0,17.0]            |0.0         |
|[2.0,4.0,474.0,7.0,0.0,37.0,0.0,6.0,5.0,1.0,0.0,378.0,1.0,8.0,0.0,6.0,339.0,21.0]             |0.0         |
|(18,[2,3,5,11,15,16],[10.0,1.0,1.0,8.0,1.0,8.0])                                              |1.0         |
|[23.0,74.0,4825.0,4.0,6.0,351.0,0.0,41.0,59.0,0.0,0.0,3339.0,1.0,118.0,0.0,29.0,2232.0,171.0] |0.0         |
|[12.0,28.0,2463.0,0.0,1.0,175.0,0.0,21.0,24.0,0.0,0.0,1854.0,1.0,52.0,0.0,10.0,1385.0,100.0]  |1.0         |
|[1.0,1.0,201.0,16.0,1.0,13.0,0.0,1.0,3.0,1.0,0.0,148.0,0.0,5.0,0.0,7.0,142.0,7.0]             |0.0         |
|[8.0,31.0

In [200]:
featureIndexer =\
    VectorIndexer(inputCol="NumericFeatures", outputCol="indexedFeatures", maxCategories=4)
data = featureIndexer.fit(data)
data.select("indexedFeatures", "indexedLabel").show(truncate=False)

AttributeError: 'VectorIndexerModel' object has no attribute 'select'

In [209]:
data = user_dfdata = user_df
categorical_columns = ['gender','subscription_level']
numeric_columns = [column for column in list(set(user_df.columns)-set(categorical_columns)-set(['userId','churn']))]

In [238]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="churn", outputCol="indexedLabel").fit(data)

assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler_numeric, assembler_all, labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "churn", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|churn|            features|
+--------------+-----+--------------------+
|             0|    0|[1.0,1.0,201.0,16...|
|             0|    0|[0.0,5.0,334.0,18...|
|             0|    0|[3.0,6.0,848.0,39...|
|             0|    1|[1.0,10.0,512.0,0...|
|             0|    0|[19.0,25.0,2195.0...|
+--------------+-----+--------------------+
only showing top 5 rows



In [239]:

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

Test Error = 0.275362
OneHotEncoderEstimator_65f0de974f75


In [240]:
rf_auc = BinaryClassificationEvaluator(labelCol="indexedLabel", metricName="areaUnderROC").evaluate(predictions)
print('AUC', rf_auc)

AUC 0.5321350762527233
