In [2]:
# Starter code
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

### Import Libraries

In [109]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, count, when, isnull, collect_list
from pyspark.sql.types import IntegerType, BooleanType, FloatType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.storagelevel import StorageLevel
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


VBox()

In [4]:
print((df.count(), len(df.columns)))

VBox()

(26259199, 18)

In [132]:
df.columns

VBox()

['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']

In [133]:
df.select('page').distinct().show(50)

VBox()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
| Submit Registration|
|            Settings|
|               Login|
|            Register|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+

### Get rid of unecessary columns and add label for churned users

In [5]:
df_clean = df.select('artist','auth','firstName','gender','lastName','length','level','location','page','song','ts','userId')

VBox()

In [6]:
df_churn = df_clean.groupby('userId').agg(collect_list('auth').alias("auths"))

VBox()

In [7]:
churned = udf(lambda x: 'Cancelled' in x)

VBox()

In [8]:
df_churn = df_churn.withColumn("Churned", churned(df_churn.auths))
df_churn = df_churn.drop('auths')


VBox()

In [9]:
df_label = df_churn.join(df_clean,'userId')

VBox()

In [135]:
df_label.show()

VBox()

+-------+-------+--------------------+---------+---------+------+--------+---------+-----+-----------+-------------+--------------------+-------------+
| userId|Churned|              artist|     auth|firstName|gender|lastName|   length|level|   location|         page|                song|           ts|
+-------+-------+--------------------+---------+---------+------+--------+---------+-----+-----------+-------------+--------------------+-------------+
|1000280|   true|            Coldplay|Logged In|    Frank|     M|  Warren|307.51302| free|Findlay, OH|     NextSong|              Clocks|1538428238000|
|1000280|   true|  Charttraxx Karaoke|Logged In|    Frank|     M|  Warren|225.17506| free|Findlay, OH|     NextSong|           Fireflies|1538428545000|
|1000280|   true|        Dixie Chicks|Logged In|    Frank|     M|  Warren|351.84281| free|Findlay, OH|     NextSong|             Lullaby|1538428770000|
|1000280|   true|             Erasure|Logged In|    Frank|     M|  Warren|242.28526| fre

In [14]:
df_label.persist(StorageLevel.MEMORY_AND_DISK)

VBox()

DataFrame[userId: string, Churned: string, artist: string, auth: string, firstName: string, gender: string, lastName: string, length: double, level: string, location: string, page: string, song: string, ts: bigint]

In [15]:
df_label.select(["userId","Churned"]).distinct().groupBy("Churned").count().collect()

VBox()

[Row(Churned=u'false', count=17275), Row(Churned=u'true', count=5003)]

#### Check for nulls

In [134]:
df_label.select([count(when(isnull(c), c)).alias(c) for c in df_label.columns]).show()

VBox()

+------+-------+-------+----+---------+------+--------+-------+-----+--------+----+-------+---+
|userId|Churned| artist|auth|firstName|gender|lastName| length|level|location|page|   song| ts|
+------+-------+-------+----+---------+------+--------+-------+-----+--------+----+-------+---+
|     0|      0|5408927|   0|   778479|778479|  778479|5408927|    0|  778479|   0|5408927|  0|
+------+-------+-------+----+---------+------+--------+-------+-----+--------+----+-------+---+

### Add Features

In [16]:
thumbsUp = df_label.where(df_label.page=='Thumbs Up').groupby("userId").agg(count(col('page')).alias('ThumbsUp')).orderBy('userId')


VBox()

In [17]:
thumbsDown = df_label.where(df_label.page=='Thumbs Down').groupby("userId").agg(count(col('page')).alias('ThumbsDown')).orderBy('userId')


VBox()

In [18]:
allThumbs = thumbsUp.join(thumbsDown,'userId')

VBox()

In [19]:
songsPlayed = df_label.where(col('song')!='null').groupby("userId").agg(count(col('song')).alias('SongsPlayed')).orderBy('userId')


VBox()

In [20]:
df_features = df_churn.join(songsPlayed,'userId')
df_features = df_features.join(allThumbs,'userId')

VBox()

In [21]:
days = df_label.groupby('userId').agg(((max(col('ts')) - min(col('ts')))/86400000).alias("Days"))
df_features = df_features.join(days, "userId")

VBox()

In [22]:
df_features.select([count(when(isnull(c), c)).alias(c) for c in ["userId", "SongsPlayed", "ThumbsUp", "ThumbsDown", "Days"]]).show()


VBox()

+------+-----------+--------+----------+----+
|userId|SongsPlayed|ThumbsUp|ThumbsDown|Days|
+------+-----------+--------+----------+----+
|     0|          0|       0|         0|   0|
+------+-----------+--------+----------+----+

In [51]:
upPerSong = udf(lambda numUp, songs: float(numUp)/float(songs), FloatType())
downPerSong = udf(lambda numDown, songs: float(numDown)/float(songs), FloatType())
songsPerHour = udf(lambda numSongs, numDays: float(numSongs)/float((numDays*24)), FloatType())

VBox()

In [47]:
columns = ['id', 'num1', 'num2']
vals = [
     (1, 8, 4),
     (2, 7, 5)
]
smalldf = spark.createDataFrame(vals, columns)

VBox()

In [50]:
smalldf.drop("UpPerSong")
smalldf = smalldf.withColumn("UpPerSong", upPerSong(smalldf.num1, smalldf.num2))
smalldf.show()

VBox()

+---+----+----+---------+
| id|num1|num2|UpPerSong|
+---+----+----+---------+
|  1|   8|   4|      2.0|
|  2|   7|   5|      1.4|
+---+----+----+---------+

In [52]:
df_features = df_features.withColumn("UpPerSong", upPerSong(df_features.ThumbsUp, df_features.SongsPlayed))

VBox()

In [53]:
df_features.show()

VBox()

+-------+-------+-----------+--------+----------+------------------+-----------+
| userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong|
+-------+-------+-----------+--------+----------+------------------+-----------+
|1000280|   true|       1022|      53|        33|43.075775462962966|  0.0518591|
|1002185|  false|       1778|      92|        14| 57.97252314814815|0.051743533|
|1017805|  false|        250|       7|         4|24.672766203703702|      0.028|
|1030587|  false|       1472|      66|        16| 49.20280092592593|0.044836957|
|1033297|  false|        236|      10|         3| 33.12201388888889|0.042372882|
|1057724|  false|       3847|     200|        29|         58.329375| 0.05198856|
|1059049|  false|        559|      29|         6| 37.39221064814815|0.051878355|
|1069552|  false|        455|      26|         6|55.920520833333335|0.057142857|
|1071308|   true|       1409|      74|        12| 38.46319444444445| 0.05251952|
|1076191|   true|         47

In [54]:
df_features = df_features.withColumn("DownPerSong", downPerSong(df_features.ThumbsDown, df_features.SongsPlayed))


VBox()

In [55]:
df_features = df_features.withColumn("SongsPerHour", songsPerHour(df_features.SongsPlayed, df_features.Days))

VBox()

In [136]:
df_features.select("SongsPerHour", "Churned").groupby("Churned").agg(avg(col('SongsPerHour'))).show()

VBox()

+-------+-----------------+
|Churned|avg(SongsPerHour)|
+-------+-----------------+
|  false|1.076505910115338|
|   true|2.284715742469593|
+-------+-----------------+

In [56]:
df_features.select([count(when(isnull(c), c)).alias(c) for c in ["SongsPlayed", "UpPerSong", "DownPerSong", "Days", "SongsPerHour"]]).show()



VBox()

+-----------+---------+-----------+----+------------+
|SongsPlayed|UpPerSong|DownPerSong|Days|SongsPerHour|
+-----------+---------+-----------+----+------------+
|          0|        0|          0|   0|           0|
+-----------+---------+-----------+----+------------+

### Vectorize and scale features

In [57]:
assembler = VectorAssembler(inputCols=["SongsPlayed", "UpPerSong", "DownPerSong", "Days", "SongsPerHour"], outputCol="FeatureVector")
df_features = assembler.transform(df_features)

VBox()

In [58]:
scaler = StandardScaler(inputCol="FeatureVector", outputCol="ScaledFeatures", withStd=True)
scalerModel = scaler.fit(df_features)
df_features = scalerModel.transform(df_features)

VBox()

In [59]:
df_features.show()

VBox()

+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+
| userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|
+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+
|1000280|   true|       1022|      53|        33|43.075775462962966|  0.0518591| 0.032289628|    0.988568|[1022.0,0.0518590...|[0.91242495298452...|
|1002185|  false|       1778|      92|        14| 57.97252314814815|0.051743533| 0.007874016|   1.2779043|[1778.0,0.0517435...|[1.58736943875390...|
|1017805|  false|        250|       7|         4|24.672766203703702|      0.028|       0.016|   0.4221929|[250.0,0.02800000...|[0.22319592783378...|
|1030587|  false|       1472|      66|        16| 49.20280092592593|0.044836957| 0.010869565|   1.2465415|

In [62]:
convertToInt = udf(lambda x: 1 if x=="true" else 0, IntegerType())

VBox()

In [63]:
df_features = df_features.withColumn('label', convertToInt(df_features.Churned))

VBox()

### Split data into training, test and validation

In [64]:
df_features.show()

VBox()

+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+
| userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|label|
+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+
|1000280|   true|       1022|      53|        33|43.075775462962966|  0.0518591| 0.032289628|    0.988568|[1022.0,0.0518590...|[0.91242495298452...|    1|
|1002185|  false|       1778|      92|        14| 57.97252314814815|0.051743533| 0.007874016|   1.2779043|[1778.0,0.0517435...|[1.58736943875390...|    0|
|1017805|  false|        250|       7|         4|24.672766203703702|      0.028|       0.016|   0.4221929|[250.0,0.02800000...|[0.22319592783378...|    0|
|1030587|  false|       1472|      66|        16| 49.20280092592593|0.

In [65]:
train, test = df_features.randomSplit([0.8, 0.2], seed=42)
train, validation = train.randomSplit([0.8, 0.2], seed=42)

VBox()

### Train the model using LogisitcRegression and default params

In [67]:
lr = LogisticRegression(featuresCol = 'FeatureVector', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

VBox()

In [79]:
trainingSummary = lrModel.summary

VBox()

#### Code developed from Apache Spark documention examples [Spark](https://spark.apache.org/docs/2.1.1/ml-classification-regression.html)

In [83]:
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))


VBox()

objectiveHistory:
0.539919604573
0.509239460563
0.475498528905
0.442098634351
0.437150329445
0.424084673606
0.402606328971
0.385793271091
0.373180789382
0.363566860268
0.362864432603
+--------------------+-------------------+
|                 FPR|                TPR|
+--------------------+-------------------+
|                 0.0|                0.0|
|0.001330195436406426|0.01707650273224044|
|0.002353422695180...| 0.0351775956284153|
|0.003581295405709...|0.05293715846994536|
|0.004706845390361199|0.07137978142076502|
| 0.00624168627852246|0.08709016393442623|
|0.007060268085541799|0.10621584699453553|
|0.009106722603090147|0.12158469945355191|
| 0.01012994986186432| 0.1403688524590164|
|0.011767113475902998| 0.1567622950819672|
|0.012994986186432007|0.17349726775956284|
|0.013608922541696511|0.19330601092896174|
|0.015860022510999692|0.20799180327868852|
| 0.01719021794740612|0.22472677595628415|
|0.019032027013199632| 0.2421448087431694|
| 0.02056686790136089|  0.258879781420765|


In [84]:
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']

VBox()

In [85]:
fMeasure.show()

VBox()

+------------------+-------------------+
|         threshold|          F-Measure|
+------------------+-------------------+
|0.9710608906938891|0.03343363423604146|
|0.9115624899646096|0.06745252128356254|
|0.8940708087293442|0.09942270686337396|
|0.8822248997747707|0.13132265158655357|
|0.8706226669247632|0.15721331689272502|
|0.8606218801695937|0.18802902055622733|
|0.8503564372998402| 0.2110880521790691|
|0.8405966828998392|0.23909249563699825|
|0.8319074916080038| 0.2621359223300971|
|0.8222063007289495| 0.2851529609879315|
|0.8133804957436582| 0.3121036669423766|
|0.8049741760959546|0.32990249187432286|
|0.7963324846658016| 0.3505594033031433|
|0.7879623520054181| 0.3709128956317029|
|0.7791499519799358|0.39001800874710574|
|0.7685809722581675| 0.4083080040526849|
|0.7584430249145967| 0.4272908366533864|
|0.7477319327542814| 0.4427555773473891|
| 0.737471651564837|0.45942028985507244|
|0.7266642335110222| 0.4709800190294957|
+------------------+-------------------+
only showing top

In [105]:
print(bestThreshold)
print(maxFMeasure)

VBox()

0.302889278373
Row(max(F-Measure)=0.6617299511580275)

In [90]:
pr = trainingSummary.pr


VBox()

In [91]:
pr.show()

VBox()

+-------------------+------------------+
|             recall|         precision|
+-------------------+------------------+
|                0.0|0.7936507936507936|
|0.01707650273224044|0.7936507936507936|
| 0.0351775956284153|0.8174603174603174|
|0.05293715846994536|0.8157894736842105|
|0.07137978142076502|0.8196078431372549|
|0.08709016393442623|0.8069620253164557|
|0.10621584699453553|0.8184210526315789|
|0.12158469945355191|               0.8|
| 0.1403688524590164|0.8058823529411765|
| 0.1567622950819672|0.7996515679442509|
|0.17349726775956284|               0.8|
|0.19330601092896174|0.8097281831187411|
|0.20799180327868852|0.7971204188481675|
|0.22472677595628415|0.7966101694915254|
| 0.2421448087431694|0.7921787709497207|
|  0.258879781420765|0.7904066736183525|
|0.27527322404371585|0.7901960784313725|
| 0.2930327868852459|0.7886029411764706|
| 0.3084016393442623|0.7845351867940921|
|0.32479508196721313|0.7846534653465347|
+-------------------+------------------+
only showing top

### Make predictions on validation set

In [97]:
predictions = lrModel.transform(validation)


VBox()

In [93]:
predictions.show()

VBox()

+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
| userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|label|       rawPrediction|         probability|prediction|
+-------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|1002185|  false|       1778|      92|        14| 57.97252314814815|0.051743533| 0.007874016|   1.2779043|[1778.0,0.0517435...|[1.58736943875390...|    0|[2.76861114742403...|[0.94095587201545...|       0.0|
|1017805|  false|        250|       7|         4|24.672766203703702|      0.028|       0.016|   0.4221929|[250.0,0.02800000...|[0.22319592783378...|    0|[-0.3961962869

In [113]:
print(predictions.filter(predictions.label == predictions.prediction).count())
print(predictions.count())

VBox()

2757
3186

In [115]:
print(2757.0/3186.0)*100

VBox()

86.5348399247

In [98]:
evaluator = BinaryClassificationEvaluator()
print('Area Under ROC', evaluator.evaluate(predictions))

VBox()

('Test Area Under ROC', 0.8617849953633544)

### Tune hyper-parameters

In [99]:
lr.setThreshold(bestThreshold)

VBox()

LogisticRegression_903c0d8e676c

In [110]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()

VBox()

In [111]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

VBox()

In [112]:
cvModel = crossval.fit(train)
prediction = cvModel.transform(validation)

VBox()

In [116]:
print(prediction.filter(prediction.label == prediction.prediction).count())
print(prediction.count())

VBox()

2618
3186

In [118]:
print(2618.0/3186.0 * 100)

VBox()

82.172002511

In [121]:
print('Area Under ROC', evaluator.evaluate(prediction))

VBox()

('Area Under ROC', 0.861853414154208)

### Apply the model to the test data

In [119]:
predictions_test = cvModel.transform(test)


VBox()

In [120]:
print(predictions_test.filter(predictions_test.label == predictions_test.prediction).count())
print(predictions_test.count())

VBox()

3366
4005

In [124]:
print(3366.0/4005.0)*100

VBox()

84.0449438202

In [122]:
print('Area Under ROC', evaluator.evaluate(predictions_test))

VBox()

('Area Under ROC', 0.8762050562457436)

#### The default model seemed to work best. Try it on the test data:

In [127]:
predictions = lrModel.transform(test)


VBox()

In [137]:
print(predictions.filter(predictions.label == predictions.prediction).count())
print(predictions.count())

VBox()

Exception in thread cell_monitor-137:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 4298



3377
4005

In [130]:
print(3377.0/4005.0)*100

VBox()

84.3196004994

In [129]:
print('Area Under ROC', evaluator.evaluate(predictions))

VBox()

('Area Under ROC', 0.8765631972006295)

#### Conclusion, the default model is slightly more accurate than the tuned one.

In [131]:
lrModel.coefficients

VBox()

DenseVector([0.0008, -19.7478, 11.9394, -0.1098, -0.1607])

Based on the coefficients, the features that contribute the most are:

1. Average number of thumbsdown per song played
2. Number of songs played