# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import * 
# isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import * 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import *

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
# run the session and prepare the data
spark = SparkSession \
    .builder \
    .appName("CapstoneProject") \
    .getOrCreate()
path = "mini_sparkify_event_data.json"
sparkify = spark.read.json(path)
df_valid = sparkify.dropna(how = "any", subset = ["userId", "sessionId"])
df_valid = df_valid.filter(df_valid["userId"] != "")
df_valid = df_valid.sort(["userId","ts","sessionId"]).withColumn("pk",monotonically_increasing_id())
sparkify.createOrReplaceTempView("sparkifytable")
churnUserId = df_valid.filter(df_valid.page == "Cancellation Confirmation").select('UserId').distinct().toPandas()
churnUserId = churnUserId.UserId.tolist()
df_valid = df_valid.withColumn('ChurnedUser', when(df_valid.userId.isin(churnUserId),1).otherwise(0))

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

### Data Preprocessing and Model Implementation 
From our EDA, we would like to consider, length of songs played, average length of played songs, number of days active for all subscribers during free and paid levels of subscription, total number of sessions

In [2]:
# feature engineering and data prep for modeling
# total length of the song
songlength = df_valid.where(df_valid.song != 'null').groupby('userId') \
.agg({'length':'sum'}) \
.orderBy('userId') \
.withColumnRenamed("sum(length)", "songlength")
# average length of the song
songlengthavg = df_valid.where(df_valid.song != 'null').groupby('userId') \
.agg({'length':'avg'}) \
.orderBy('userId') \
.withColumnRenamed("avg(length)", "songlengthavg")
# number of songs
songcount = df_valid.where(df_valid.song != 'null').groupby('userId') \
.agg({'song':'count'}) \
.orderBy('userId') \
.withColumnRenamed("count(song)", "songcount")
# number of sessions
numsessions = df_valid.select(['userId','sessionId']) \
.distinct() \
.groupby('userId') \
.count() \
.orderBy('userId') \
.withColumnRenamed('count','numsessions')
# Add friend event
friends = df_valid.where(df_valid.page == "Add Friend") \
.groupby('userId').agg({'page':'count'})\
.orderBy('userId')\
.withColumnRenamed("count(page)","friendcount")
friends = friends.replace(float('nan'), None)
# Add Playlist event
playlist = df_valid.where(df_valid.page == "Add to Playlist") \
.groupby('userId').agg({'page':'count'})\
.orderBy('userId')\
.withColumnRenamed("count(page)","addtoplaylist")
# churn data
label = df_valid.groupby('userId').agg({'ChurnedUser':'first'}).orderBy('userId')\
.withColumnRenamed('first(ChurnedUser)','label')
# number of days active for users
time = 60*60*24*1000
min_ts = df_valid.select(['userId','ts']).groupby('userId').min('ts')
max_ts = df_valid.select(['userId','ts']).groupby('userId').max('ts')
days_active = min_ts.join(max_ts,on='userId')
days_active = days_active.withColumn('days_active',(col('max(ts)')-col('min(ts)'))/time)
days_active = days_active.select(['userId','days_active']).orderBy('userId')
modelfile = (songcount
             .join(days_active, 'userId','full')
             .join(friends, 'userId', 'full')
             .join(numsessions, 'userId','full')
             .join(playlist, 'userId','full')
             .join(label,'userId','full'))
modelfile = modelfile.fillna({'friendcount':0})
modelfile = modelfile.fillna({'addtoplaylist':0})

In [3]:
train, test = modelfile.randomSplit([0.8,0.2])
features = train.drop('label','userId').schema.names
vector_assembler = VectorAssembler(inputCols=features,outputCol = 'Features')
modelinput = vector_assembler.transform(modelfile)
Scaler1 = StandardScaler(withMean=True,withStd=True,inputCol='Features',outputCol='ScaledFeatures')
FeatureScaler1Fit = Scaler1.fit(modelinput)
ScaledInput1 = FeatureScaler1Fit.transform(modelinput)
modeldata = ScaledInput1.select(ScaledInput1.label.alias('label'),ScaledInput1.ScaledFeatures.alias('features'))

In [4]:
train, test = modeldata.randomSplit([0.75,0.25])

In [5]:
# Base Logit
lr = LogisticRegression(maxIter=10)
f1_evaluator=MulticlassClassificationEvaluator(metricName='f1')
paramGrid = ParamGridBuilder().build()
crossvalidation_baselr = CrossValidator(estimator=lr,
                                        evaluator=f1_evaluator,
                                        estimatorParamMaps=paramGrid,
                                        numFolds=3)

In [6]:
baselr = crossvalidation_baselr.fit(train)
baselr.avgMetrics

[0.7862241999886661]

In [11]:
baselrtest = baselr.transform(test)
evaluator=MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy:{}'.format(evaluator.evaluate(baselrtest,{evaluator.metricName: "accuracy"})))
print('F1 Score:{}'.format(evaluator.evaluate(baselrtest,{evaluator.metricName: "f1"})))

Accuracy:0.7846153846153846
F1 Score:0.7659763313609467


In [13]:
# decision tree classifier
dtc = DecisionTreeClassifier()
f1_evaluator=MulticlassClassificationEvaluator(metricName='f1')
dtc_parameterGrid = ParamGridBuilder().build()
dtc_crossval = CrossValidator(estimator = dtc, estimatorParamMaps=dtc_parameterGrid, 
                             evaluator=f1_evaluator,
                             numFolds=3)

In [14]:
basedtc = dtc_crossval.fit(train)
basedtc.avgMetrics

[0.765654048348515]

In [15]:
basedtc = dtc_crossval.fit(train)


basedtctest = basedtc.transform(test)
evaluator=MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy:{}'.format(evaluator.evaluate(basedtctest,{evaluator.metricName: "accuracy"})))
print('F1 Score:{}'.format(evaluator.evaluate(basedtctest,{evaluator.metricName: "f1"})))

Accuracy:0.8307692307692308
F1 Score:0.8324610097805973


In [17]:
## Modeling Methodology
# splitting the data into training test and testing set 

# vectorize the features by dropping the variables not needed for prediction - outcome variable and userID as well

# we are scaling the features to overcome any variable that has a larger scale overpowering the model
#scale = MinMaxScaler(inputCol='Features',outputCol='Scaler')

In [10]:
# Logistic Regression
# Pipelines took really long time
lr = LogisticRegression(maxIter=10)
pipeline_lr = Pipeline(stages=[lr])
f1_evaluator=MulticlassClassificationEvaluator(metricName='f1')


# create parameter grids
lr_parameterGrid = (ParamGridBuilder().addGrid(lr.regParam,[0.0,0.1]).build())

# cross validation 3-step
lr_crossval = CrossValidator(estimator=pipeline_lr, 
                            estimatorParamMaps=lr_parameterGrid,
                            evaluator=f1_evaluator,
                            numFolds=3)

In [11]:
# decision tree classifier

dtc = DecisionTreeClassifier()
pipeline_dtc = Pipeline(stages=[dtc])

# creating parameter grids
dtc_parameterGrid = ParamGridBuilder().addGrid(dtc.maxDepth,[1,2]).build()

# cross validation 3-step
dtc_crossval = CrossValidator(estimator = pipeline_dtc, 
                              estimatorParamMaps=dtc_parameterGrid, 
                             evaluator=f1_evaluator,
                             numFolds=3)

## Model Evaluation

Keeping our machine learning analysis modest, we are running logistic regression and decision tree classifier on our sparkify subscribers data set

In [35]:
# Py4jError troubleshooting - do not use if the model trains successfully
spark.sparkContext.setCheckpointDir('checkpoint')
train.checkpoint()
#train.explain(extended=True)
train = spark.createDataFrame(train.rdd, schema=train.schema)

In [12]:
# train the model
trained_lr = lr_crossval.fit(train)
dtc_train = dtc_crossval.fit(train)

In [17]:
# get the hyperpar
bestModellr = trained_lr.bestModel
#print(bestModellr.getRegParam())
#print(bestModellr.getMaxIter())
bestModellr.stages[-1]
#bestModellr._java_obj.getRegParam()
bestModellr.coefficients

LogisticRegressionModel: uid = LogisticRegression_460c912c35ef, numClasses = 2, numFeatures = 5

In [None]:
#bestModeldtc = dtc_train.bestModel
dtc_train1 = dtc_crossval.fit(train)

In [16]:
dtc_train.bestModel.stages[-1]

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_eed9dd4a1784) of depth 2 with 5 nodes

## Model Validation and Metrics Evaluation

We will be looking at accuracy metric and f1-score for both these algorithms to evaluate their effectiveness.

In [18]:
# validation logistic regresssion
evaluator=MulticlassClassificationEvaluator(predictionCol='prediction')
logisticregression = trained_lr.transform(test)
lr_score = evaluator.evaluate(logisticregression)
print(lr_score)
print('Accuracy: {}'.format(evaluator.evaluate(logisticregression, {evaluator.metricName: 'accuracy'})))
print('f1 score: {}'.format(evaluator.evaluate(logisticregression, {evaluator.metricName: 'f1'})))

0.8642026825633384
Accuracy: 0.8688524590163934
f1 score: 0.8642026825633384


In [19]:
# validation decision tree classifier

decisiontree = dtc_train.transform(test)
print('Accuracy: {}'.format(evaluator.evaluate(decisiontree, {evaluator.metricName: 'accuracy'})))
print('f1 score: {}'.format(evaluator.evaluate(decisiontree, {evaluator.metricName: 'f1'})))

Accuracy: 0.8032786885245902
f1 score: 0.7761664564943254


In [None]:
lr1 = [0.7846153846153846, 0.7659763313609467]
dt1 = [0.8307692307692308, 0.8324610097805973]
lr2 = [0.8688524590163934, 0.8642026825633384]
dt2 = [0.8032786885245902, 0.7761664564943254]
hz = ['Logit', 'DecisionTree']
vt = ['BeforeTuning', 'AfterTuning']

## Modeling and Justification 

The logistic regression yields an accuracy score of ... and f1 score of ..., for our churning model.

The decision tree classifier yields an accuracy score of 0.7391304347826086 and f1 score of 0.7171954563258911. This metric is quite impressive as having developed predictive models in my previous assignments with real world data with variations within data set that is out of control, any accuracy score above 60% is considered a good metric to me.

Lack of sample size is concerning and potentially effectiveness will be more obvious if the number of users were to be in few thousands.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

### Reflection and Improvement

As someone who has played with Apache Spark during Scala days, it was a pleasure to discover the PySpark matured into a proper package with full support for data frames and growing traction for PySpark ML library. 

Also, at the same time, I understand the pain points of running relatively small batch of data into a distributed system. It is going to be ridicule to use RDD for a data set that has little over 250 rows with few columns. Nevertheless, it has been a great practice for me and I remain thanking for that.

The data set is somewhat cleaned and relatively less messy. The behavior of churners (subscribers) does not seem relatively trivial -- one would imagine someone spends less time on the streaming service, does not add more friends or songs to play list would likely to dropout however we have good mix of data that has subscribers who appear to be loyal to service and yet have decided to downgrade or execute the event "cancellation confirmation" so this was interesting.

There is also the concept of system tuning or troubleshooting the resource allocation (job scheduling, memory reallocation, etc...) from the system level that would play a crucial role in duration it takes for each job to run and data to persist.