# Sparkify notebook "Machine Learning" on AWS

This notebook connects to the full "Sparkify" dataset on an S3 storage at AWS. It bases on the local evaluation of the small dataset provided by udacity. The PySpark Kernel available on the EMR-Notebook at AWS misses some of the python libraries I needed to process my data and the code for plotting the data differs from my local notebook. A very good introduction on how to adjust the code is provided by Amazon: https://aws.amazon.com/de/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/

## install missing libraries

In [1]:
sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1620388372275_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Downloading https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl (10.4MB)
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.1

## import all libraries needed

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc, count, countDistinct
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import mean as Fmean
from pyspark.sql.functions import round as Fround
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime

import pandas as pd
import numpy as np

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## define metrics for model evaluation

In [3]:
def model_evaluator(results):
    """
    This function calculates the true/false positive/negative prediction from the predicted test dataset.
    Those are then used to calculate the model evaluation metrics and a confusion matrix.
    
    INPUT:
    results: test-dataset including the prediction column
    
    OUTPUT:
    standard metrics for model evaluation and a confusion matrix
    """
    
    #extract right and wrong predicted values and count their numbers 
    true_negative = results.filter((results.prediction == 0)&(results.churn==0)).count() * 1.0 
    false_positive = results.filter((results.prediction == 1)&(results.churn==0)).count() * 1.0 
    false_negative = results.filter((results.prediction == 0)&(results.churn==1)).count() * 1.0 
    true_positive = results.filter((results.prediction == 1)&(results.churn==1)).count() * 1.0 
    
    #calculate standard measures for evaluating the model
    accuracy = (true_positive+true_negative)/(true_negative+false_positive+false_negative+true_positive)
    precision = true_positive/(true_positive+false_positive)
    recall = true_positive/(true_positive+false_negative)
    f1 = 2.0 * (precision * recall)/(precision + recall)
    
    #
    print("Accuracy: {}".format(accuracy))
    print("Precision: {}".format(precision))
    print("F1-Score: {}".format(f1))
    print("Recall: {}".format(recall))
    
    #create confusion matrix to illustrate model quality
    print("\n Confusion Matrix \n")
    print("TRUE_NEGATIVE:{} 	 FALSE_POSITIVE:{}".format(true_negative,false_positive))
    print("FALSE_NEGATIVE:{} 	 TRUE_POSITIVE: {}".format(false_negative, true_positive))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## setup spark session

In [4]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## import data from S3-repository

In [5]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
user_log = spark.read.json(event_data)
user_log.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

## clean dataset

In [6]:
#drop invalid (guest) userId's
user_log = user_log.where(user_log.userId != "")
print("dataframe w/o empty userId's has",user_log.count(),"rows and",len(user_log.columns),"columns.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataframe w/o empty userId's has 26259199 rows and 18 columns.

In [7]:
#drop duplicate rows
user_log = user_log.dropDuplicates()
print("dataframe w/o duplicates has",user_log.count(),"rows and",len(user_log.columns),"columns.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataframe w/o duplicates has 26259199 rows and 18 columns.

## feature creation and selection

### create new feature "membership_days" as mutual time reference for all users

In [8]:
user_log=user_log.withColumn("membership_days", Fround((col('ts')/1000-col('registration')/1000)/86400).cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### convert columns with categorical variables into 0 and 1 columns

In [9]:
column_list=['auth','gender','level','status','page']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
cols_add=[]
for column in column_list:
    categories = user_log.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    cols_add = cols_add + [F.when(F.col(column) == cat, 1).otherwise(0).alias(column + "_" + str(cat).lower().replace(" ","_")) for cat in categories]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
user_log_features =user_log.select("userId","membership_days",*cols_add)
user_log.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- membership_days: integer (nullable = true)

In [12]:
print("dataframe has",user_log_features.count(),"rows and",len(user_log_features.columns),"columns.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataframe has 26259199 rows and 36 columns.

### aggregate data by "userId" to prepare for ML

In [13]:
df=user_log_features\
        .groupBy("userId") \
        .agg( \
             Fmax("membership_days").alias("max_membership_days"), \
             countDistinct("membership_days").alias("active_days"), \
             Fmax("gender_m").alias("gender_m"), \
             Fmax("level_paid").alias("level_paid"), \
             Fsum("status_200").alias("sum_status_200"), \
             Fsum("status_307").alias("sum_status_307"), \
             Fsum("status_404").alias("sum_status_404"), \
             Fsum("page_nextsong").alias("sum_page_nextsong"), \
             Fsum("page_add_to_playlist").alias("sum_page_add_to_playlist"), \
             Fsum("page_roll_advert").alias("sum_page_roll_advert"), \
             Fsum("page_thumbs_up").alias("sum_page_thumbs_up"), \
             Fsum("page_home").alias("sum_page_home"), \
             Fsum("page_logout").alias("sum_page_logout"), \
             Fsum("page_help").alias("sum_page_help"), \
             Fsum("page_upgrade").alias("sum_page_upgrade"), \
             Fsum("page_add_friend").alias("sum_page_add_friend"), \
             Fsum("page_settings").alias("sum_page_settings"), \
             Fsum("page_submit_upgrade").alias("sum_page_submit_upgrade"), \
             Fsum("page_about").alias("sum_page_about"), \
             Fsum("page_submit_downgrade").alias("sum_page_submit_downgrade"), \
             Fsum("page_error").alias("sum_page_error"), \
             Fsum("page_save_settings").alias("sum_page_save_settings"), \
             Fsum("page_cancel").alias("sum_page_cancel"), \
             Fsum("page_cancellation_confirmation").alias("churn") \
             ).dropna()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- max_membership_days: integer (nullable = true)
 |-- active_days: long (nullable = false)
 |-- gender_m: integer (nullable = true)
 |-- level_paid: integer (nullable = true)
 |-- sum_status_200: long (nullable = true)
 |-- sum_status_307: long (nullable = true)
 |-- sum_status_404: long (nullable = true)
 |-- sum_page_nextsong: long (nullable = true)
 |-- sum_page_add_to_playlist: long (nullable = true)
 |-- sum_page_roll_advert: long (nullable = true)
 |-- sum_page_thumbs_up: long (nullable = true)
 |-- sum_page_home: long (nullable = true)
 |-- sum_page_logout: long (nullable = true)
 |-- sum_page_help: long (nullable = true)
 |-- sum_page_upgrade: long (nullable = true)
 |-- sum_page_add_friend: long (nullable = true)
 |-- sum_page_settings: long (nullable = true)
 |-- sum_page_submit_upgrade: long (nullable = true)
 |-- sum_page_about: long (nullable = true)
 |-- sum_page_submit_downgrade: long (nullable = true)
 |-- sum_page_error: lo

In [15]:
print("dataframe 'df' has",df.count(),"rows and",len(df.columns),"columns.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataframe 'df' has 22277 rows and 25 columns.

In [16]:
df.sort("userId").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+-----------+--------+----------+--------------+--------------+--------------+-----------------+------------------------+--------------------+------------------+-------------+---------------+-------------+----------------+-------------------+-----------------+-----------------------+--------------+-------------------------+--------------+----------------------+---------------+-----+
| userId|max_membership_days|active_days|gender_m|level_paid|sum_status_200|sum_status_307|sum_status_404|sum_page_nextsong|sum_page_add_to_playlist|sum_page_roll_advert|sum_page_thumbs_up|sum_page_home|sum_page_logout|sum_page_help|sum_page_upgrade|sum_page_add_friend|sum_page_settings|sum_page_submit_upgrade|sum_page_about|sum_page_submit_downgrade|sum_page_error|sum_page_save_settings|sum_page_cancel|churn|
+-------+-------------------+-----------+--------+----------+--------------+--------------+--------------+-----------------+------------------------+--------------------+--

## machine learning section
### Split In Test And Train Dataset

In [17]:
#Split the data
(training_data, test_data) = df.randomSplit([0.8,0.2], seed = 42)
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training Dataset Count: 17775
Test Dataset Count: 4502

### Setting up ML Pipeline

In [18]:
#definition of features
input_cols=df.columns[1:-2]
print('Feature overview:',input_cols)

#Configure an ML pipeline, which consists of three stages: assemble, normalize, estimator
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

scaler = StandardScaler(inputCol="features", outputCol="ScaledFeatures", withMean=True, withStd=True) 

rf = RandomForestClassifier(labelCol="churn", 
                            featuresCol="ScaledFeatures")
dt = DecisionTreeClassifier(featuresCol = "ScaledFeatures", 
                            labelCol = "churn")

pipeline_rf=Pipeline(stages=[assembler, scaler, rf])
pipeline_dt=Pipeline(stages=[assembler, scaler, dt])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Feature overview: ['max_membership_days', 'active_days', 'gender_m', 'level_paid', 'sum_status_200', 'sum_status_307', 'sum_status_404', 'sum_page_nextsong', 'sum_page_add_to_playlist', 'sum_page_roll_advert', 'sum_page_thumbs_up', 'sum_page_home', 'sum_page_logout', 'sum_page_help', 'sum_page_upgrade', 'sum_page_add_friend', 'sum_page_settings', 'sum_page_submit_upgrade', 'sum_page_about', 'sum_page_submit_downgrade', 'sum_page_error', 'sum_page_save_settings']

### decision tree classifier

In [20]:
#Fit Decision Tree Classifier to training data and transform test data
model_dt = pipeline_dt.fit(training_data)
dt_predictions = model_dt.transform(test_data)
#Determine accuracy, f1, and precision of prediction
model_evaluator(dt_predictions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.8218569524655709
Precision: 0.7478108581436077
F1-Score: 0.5157004830917875
Recall: 0.3935483870967742

 Confusion Matrix 

TRUE_NEGATIVE:3273.0 	 FALSE_POSITIVE:144.0
FALSE_NEGATIVE:658.0 	 TRUE_POSITIVE: 427.0

In [21]:
importances_dt = model_dt.stages[-1].featureImportances.toArray()
feature_ranking_dt=pd.DataFrame(data={'Features': np.array(input_cols),'Importance':importances_dt})\
                    .sort_values('Importance', ascending=False).reset_index(drop=True)
feature_ranking_dt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     Features  Importance
0         max_membership_days    0.806115
1           sum_page_settings    0.072764
2                 active_days    0.049326
3        sum_page_roll_advert    0.045484
4              sum_status_200    0.013333
5               sum_page_home    0.005393
6               sum_page_help    0.003053
7             sum_page_logout    0.002348
8    sum_page_add_to_playlist    0.002183
9              sum_status_404    0.000000
10          sum_page_nextsong    0.000000
11             sum_status_307    0.000000
12         sum_page_thumbs_up    0.000000
13                 level_paid    0.000000
14           sum_page_upgrade    0.000000
15        sum_page_add_friend    0.000000
16                   gender_m    0.000000
17    sum_page_submit_upgrade    0.000000
18             sum_page_about    0.000000
19  sum_page_submit_downgrade    0.000000
20             sum_page_error    0.000000
21     sum_page_save_settings    0.000000

### random forest classifier

In [24]:
#Fit Random Forest Classifier to training data and transform test data
model_rf = pipeline_rf.fit(training_data)
rf_predictions = model_rf.transform(test_data)

#Determine accuracy, f1, and precision of prediction
model_evaluator(rf_predictions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.8163038649489116
Precision: 0.8086124401913876
F1-Score: 0.44976713240186295
Recall: 0.31152073732718893

 Confusion Matrix 

TRUE_NEGATIVE:3337.0 	 FALSE_POSITIVE:80.0
FALSE_NEGATIVE:747.0 	 TRUE_POSITIVE: 338.0

In [25]:
#rank features
importances_rf = model_rf.stages[-1].featureImportances.toArray()
feature_ranking_rf=pd.DataFrame(data={'Features': np.array(input_cols),'Importance':importances_rf})\
                    .sort_values('Importance', ascending=False).reset_index(drop=True)
feature_ranking_rf

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     Features  Importance
0         max_membership_days    0.742850
1        sum_page_roll_advert    0.052942
2                 active_days    0.043147
3              sum_status_200    0.025373
4                  level_paid    0.021159
5           sum_page_nextsong    0.019594
6             sum_page_logout    0.018473
7           sum_page_settings    0.013851
8          sum_page_thumbs_up    0.013340
9               sum_page_home    0.011949
10             sum_status_307    0.009380
11   sum_page_add_to_playlist    0.005932
12    sum_page_submit_upgrade    0.005595
13        sum_page_add_friend    0.004801
14             sum_page_about    0.002794
15           sum_page_upgrade    0.002171
16             sum_page_error    0.001988
17              sum_page_help    0.001798
18  sum_page_submit_downgrade    0.001643
19             sum_status_404    0.000568
20     sum_page_save_settings    0.000381
21                   gender_m    0.000267

# Hyperparameter Tuning Via CrossValidator

### Random Forest Classifier - setting up parameter space

In [19]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxBins, [20, 30]) \
    .addGrid(rf.maxDepth, [5, 6]) \
    .addGrid(rf.numTrees, [30, 40]) \
    .build()

crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol = "churn", metricName = 'f1'),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel_rf = crossval_rf.fit(training_data)
prediction_rf = cvModel_rf.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create Report Of Performance

In [23]:
#Show results for all model parameters
params_rf_cv = [{p.name: v for p, v in m.items()} for m in cvModel_rf.getEstimatorParamMaps()]
results_rf_cv=pd.DataFrame.from_dict([
    {cvModel_rf.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params_rf_cv, cvModel_rf.avgMetrics)
])

results_rf_cv

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

         f1  maxBins  maxDepth  numTrees
0  0.797160       20         5        30
1  0.799783       20         5        40
2  0.802508       20         6        30
3  0.803834       20         6        40
4  0.800993       30         5        30
5  0.802211       30         5        40
6  0.807227       30         6        30
7  0.807959       30         6        40

In [21]:
# check best model performance
model_evaluator(prediction_rf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.8254109284762328
Precision: 0.8271334792122538
F1-Score: 0.49027237354085607
Recall: 0.34838709677419355

 Confusion Matrix 

TRUE_NEGATIVE:3338.0 	 FALSE_POSITIVE:79.0
FALSE_NEGATIVE:707.0 	 TRUE_POSITIVE: 378.0

In [22]:
#rank features of best model
importances_rf_cv = cvModel_rf.bestModel.stages[-1].featureImportances.toArray()
feature_ranking_rf_cv=pd.DataFrame(data={'Features': np.array(input_cols),'Importance':importances_rf_cv})\
                    .sort_values('Importance', ascending=False).reset_index(drop=True)
feature_ranking_rf_cv

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     Features  Importance
0         max_membership_days    0.704152
1                 active_days    0.056557
2        sum_page_roll_advert    0.053258
3               sum_page_home    0.022783
4          sum_page_thumbs_up    0.019508
5             sum_page_logout    0.016715
6              sum_status_307    0.016213
7           sum_page_nextsong    0.016152
8              sum_status_200    0.016103
9           sum_page_settings    0.015634
10                 level_paid    0.015213
11        sum_page_add_friend    0.011751
12   sum_page_add_to_playlist    0.008344
13    sum_page_submit_upgrade    0.007886
14              sum_page_help    0.006186
15           sum_page_upgrade    0.003344
16             sum_page_error    0.002107
17     sum_page_save_settings    0.001933
18             sum_status_404    0.001882
19             sum_page_about    0.001821
20  sum_page_submit_downgrade    0.001691
21                   gender_m    0.000770