# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, min, max, countDistinct, count
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Normalizer, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1617359977174_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Capstone Project") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [3]:
sparkify_data = 's3n://udacity-dsnd/sparkify/sparkify_event_data.json'
df = spark.read.json(sparkify_data)
df.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [4]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
# Print information for each column
for column in df.columns:
    df.describe([column]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|           artist|
+-------+-----------------+
|  count|         20850272|
|   mean|511.6179537476772|
| stddev|968.6388398781422|
|    min|              !!!|
|    max|           ÃÂ¼NN|
+-------+-----------------+

+-------+----------+
|summary|      auth|
+-------+----------+
|  count|  26259199|
|   mean|      null|
| stddev|      null|
|    min| Cancelled|
|    max|Logged Out|
+-------+----------+

+-------+---------+
|summary|firstName|
+-------+---------+
|  count| 25480720|
|   mean| Infinity|
| stddev|      NaN|
|    min|    Aaden|
|    max|Zytavious|
+-------+---------+

+-------+--------+
|summary|  gender|
+-------+--------+
|  count|25480720|
|   mean|    null|
| stddev|    null|
|    min|       F|
|    max|       M|
+-------+--------+

+-------+------------------+
|summary|     itemInSession|
+-------+------------------+
|  count|          26259199|
|   mean|106.56267561702853|
| stddev|  117.658126175238|
|    min|                 0|
|

In [6]:
# Print null values for each column
for column in df.columns:
    print(column, "with null values: ", df.filter(df[column].isNull()).count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

artist with null values:  5408927
auth with null values:  0
firstName with null values:  778479
gender with null values:  778479
itemInSession with null values:  0
lastName with null values:  778479
length with null values:  5408927
level with null values:  0
location with null values:  778479
method with null values:  0
page with null values:  0
registration with null values:  778479
sessionId with null values:  0
song with null values:  5408927
status with null values:  0
ts with null values:  0
userAgent with null values:  778479
userId with null values:  0

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [7]:
# Add a column for the "churn" indicator
df = df.withColumn("Churn", df["page"] == "Cancellation Confirmation")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Check whether the average number of "itemInSession" differs for churn==true and false
df.groupBy("Churn").avg("itemInSession").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+
|Churn|avg(itemInSession)|
+-----+------------------+
| true|101.91645012992204|
|false|106.56356100182995|
+-----+------------------+

In [9]:
# Check whether the number of users differs for churn==true and false
df.groupBy("Churn").agg(countDistinct("UserId")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------------------+
|Churn|count(DISTINCT UserId)|
+-----+----------------------+
| true|                  5003|
|false|                 22278|
+-----+----------------------+

In [10]:
# Check whether the number of rows differs for churn==true and false
df.groupBy("Churn").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------+
|Churn|   count|
+-----+--------+
| true|    5003|
|false|26254196|
+-----+--------+

In [11]:
# Check the correlation between churn and the gender
df.groupBy("Churn", "Gender").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+--------+
|Churn|Gender|   count|
+-----+------+--------+
| true|     F|    2347|
|false|     M|13296906|
| true|     M|    2656|
|false|     F|12178811|
|false|  null|  778479|
+-----+------+--------+

In [12]:
# Check all values of the "page" column
df.select("page").distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
| Submit Registration|
|            Settings|
|               Login|
|            Register|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
+--------------------+
only showing top 20 rows

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

## Feature to be engineered

I want to use the following features in my model:  
1) thumbs down and up per UserId  
2) songs played  
3) duration used (max_ts - min_ts per UserId)  
4) songs played
5) errors per hour or day  
6) add to playlist per day

In [13]:
# Print different values of column "page"
df.select("page").distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
| Submit Registration|
|            Settings|
|               Login|
|            Register|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
+--------------------+
only showing top 20 rows

In [14]:
# Calculate the number of downvotes per user id
thumbs_down = df.filter(df["page"] == "Thumbs Down").groupby("UserId").agg(count("Page").alias("Number of Thumbs Down"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Calculate the number of upvotes per user id
thumbs_up = df.filter(df["page"] == "Thumbs Up").groupby("UserId").agg(count("Page").alias("Number of Thumbs Up"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Calculate the number of songs per user id
songs = df.filter(df["Song"].isNotNull()).groupby("UserId").agg(count("Page").alias("Number of Songs"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Calculate the usage time of a user id by using the time span between min ts and max ts
timespan = df.groupby("UserId").agg(((max(("ts"))-min(("ts")))/(3600*1000)).cast(IntegerType()).alias("Duration in Hours"))
timespan = timespan.withColumn("Duration in Days", col("Duration in Hours")/24)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Calculate the number of songs added to a playlist per user id
playlist = df.filter(df["page"] == "Add to Playlist").groupby("UserId").agg(count("Page").alias("Added to Playlist"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Calculate the number of error pages seen per user id
error = df.filter(df["status"] == "404").groupby("UserId").agg(count("Page").alias("Number of Errors"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Add the column "Churn" based on the "page" column value
churn = df.withColumn("churn", df["page"] == "Cancellation Confirmation")
churn = churn.select(col("churn").cast("integer"), "UserId").distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Count the number of distinct user ids
churn.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

27281

In [22]:
# join all data
feature_df = thumbs_down\
    .join(thumbs_up, on='userId')\
    .join(timespan, on='userId')\
    .join(playlist, on='userId')\
    .join(error, on='userId')\
    .join(songs, on='userId')\
    .join(churn, on='userId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Relativize the absolute numbers according to a period of time
feature_df = feature_df.withColumn("Errors per Day", col("Number of Errors")/col("Duration in Days"))
feature_df = feature_df.drop("Number of Errors")
feature_df = feature_df.withColumn("Playlist Add per Day", col("Added to Playlist")/col("Duration in Days"))
feature_df = feature_df.drop("Added to Playlist")
feature_df = feature_df.drop("UserId")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
feature_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Number of Thumbs Down: long (nullable = false)
 |-- Number of Thumbs Up: long (nullable = false)
 |-- Duration in Hours: integer (nullable = true)
 |-- Duration in Days: double (nullable = true)
 |-- Number of Songs: long (nullable = false)
 |-- churn: integer (nullable = true)
 |-- Errors per Day: double (nullable = true)
 |-- Playlist Add per Day: double (nullable = true)

In [25]:
# Calculate the number of null values per columns
for column in feature_df.columns:
    print(column, "with null values: ", feature_df.filter(feature_df[column].isNull()).count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of Thumbs Down with null values:  0
Number of Thumbs Up with null values:  0
Duration in Hours with null values:  0
Duration in Days with null values:  0
Number of Songs with null values:  0
churn with null values:  0
Errors per Day with null values:  0
Playlist Add per Day with null values:  0

Since no columns have any null values anymore, we can proceed without any further data manipulation.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

For this project, I will user three different classification algorithms:  
1) Logistic Regression  
2) Random Forest Classifier  
3) Gradient-Boosted Tree Classifier

### Prepare the data and the models

In [26]:
inputCols = [
 'Number of Thumbs Down',
 'Number of Thumbs Up',
 'Duration in Hours',
 'Duration in Days',
 'Playlist Add per Day',
 'Errors per Day'
]

assembler = VectorAssembler(inputCols=inputCols, outputCol="output_features")
indexer = StringIndexer(inputCol="churn", outputCol="label")
norm = Normalizer(inputCol="output_features", outputCol="features")
#df = assembler.transform(feature_df)
#df = norm.transform(df)
#df = df.select("features", col("churn").alias("label"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Split data in train and test data
train, test = feature_df.randomSplit([0.7, 0.3], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Pipeline for the linear regression model
lr = LogisticRegression()
pipeline_lr = Pipeline(stages=[indexer, assembler, norm, lr])

# Pipeline for the random forest model
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages=[indexer, assembler, norm, rf])

# Pipeline for the gradient boosted tree model
gbt = GBTClassifier()
pipeline_gbt = Pipeline(stages=[indexer, assembler, norm, gbt])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Linear Regression

In [29]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.3])\
    .addGrid(lr.maxIter, [10])\
    .build()

evaluator = MulticlassClassificationEvaluator(metricName="f1")

crossval = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cv_lr = crossval.fit(train) 
evaluator.evaluate(cv_lr.transform(test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7394148547333945

### Random Forest

In [30]:
paramGrid = ParamGridBuilder() \
    .build()

evaluator = MulticlassClassificationEvaluator(metricName="f1")

crossval = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cv_rf = crossval.fit(train) 
evaluator.evaluate(cv_rf.transform(test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7448101494437359

### Gradient Boosted Tree

In [31]:
paramGrid = ParamGridBuilder() \
    .build()

evaluator = MulticlassClassificationEvaluator(metricName="f1")

crossval = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cv_gbt = crossval.fit(train) 
evaluator.evaluate(cv_gbt.transform(test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-31:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 4288



0.7593075788670144

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.