# Sparkify Churn Prediction

This notebook provides you the practical machine learning implementation to predict churn using a fictional digital music streaming service.

The dataset is from a fictional digital music streaming service called Sparkify. It contains several potentially interesting fields derived from website interaction logs.

We are using Spark since the dataset is 12GB and we need the power of distributed machine learning technologies to help us with the heavy lifting.

In our case, Churn is defined as page == "Cancellation Confirmation".

In [1]:
sc.list_packages()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,application_1604373716901_0017,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version  
-------------------------- ---------
beautifulsoup4             4.9.1    
boto                       2.49.0   
click                      7.1.2    
jmespath                   0.10.0   
joblib                     0.16.0   
lxml                       4.5.2    
mysqlclient                1.4.2    
nltk                       3.5      
nose                       1.3.4    
numpy                      1.16.5   
pip                        9.0.1    
py-dateutil                2.2      
python37-sagemaker-pyspark 1.4.0    
pytz                       2020.1   
PyYAML                     5.3.1    
regex                      2020.7.14
setuptools                 28.8.0   
six                        1.13.0   
soupsieve                  1.9.5    
tqdm                       4.48.2   
wheel                      0.29.0   
windmill                   1.6

In [2]:
sc.install_pypi_package("pandas")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/bf/4c/cb7da76f3a5e077e545f9cf8575b8f488a4e8ad60490838f89c5cdd5bb57/pandas-1.1.4-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.7.3 (from pandas)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.1.4 python-dateutil-2.8.1

In [3]:
# import pyspark libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, date_trunc, desc, asc, to_date
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import Window

from pyspark.ml.feature import OneHotEncoder, IndexToString, StringIndexer, Normalizer, StandardScaler, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# import python libraries
import datetime as dt
import numpy as np
import pandas as pd

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset

We will load two datasets. First the full 12GB dataset and second a mini dataset for some early data exploration.

In [5]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [None]:
# Read in small sparkify dataset
mini_event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
df_mini = spark.read.json(mini_event_data)
df_mini.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

# Exploratory Data Analysis

We will perform EDA on the small subset of the data

### Define Churn

Churn is defined as page == `Cancellation Confirmation` events. The implementation of this will happen later in our Feature Engineering section


### Explore Data
See exploration below

In [None]:
# find all the members who have churned
df_mini.filter("page = 'Cancellation Confirmation'").toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   artist       auth  ...                                          userAgent  userId
0    None  Cancelled  ...  "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...      18
1    None  Cancelled  ...  "Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...      32
2    None  Cancelled  ...  "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...     125
3    None  Cancelled  ...  Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...     105
4    None  Cancelled  ...  Mozilla/5.0 (Windows NT 6.1; WOW64; rv:24.0) G...      17
5    None  Cancelled  ...  "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...     143
6    None  Cancelled  ...  Mozilla/5.0 (Windows NT 6.2; WOW64; rv:31.0) G...     101
7    None  Cancelled  ...  "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...     129
8    None  Cancelled  ...  Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:31....     121
9    None  Cancelled  ...  "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...      51
10   None  Cancelled  ...  "Mozilla/5.0 (Windows NT 6.1; WOW64) A

In [None]:
# look at the user journey of a member who has cancelled
df_mini.select(["userId", "firstname", "page", "level", "song"]).where(df_mini.userId == "18").toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    userId firstname  ... level                                               song
0       18    Adriel  ...  paid                                               None
1       18    Adriel  ...  paid                        A Beggar On A Beach Of Gold
2       18    Adriel  ...  paid         ...slowdance On The Inside (Album Version)
3       18    Adriel  ...  paid                                      St. Apollonia
4       18    Adriel  ...  paid                                     Drunk Stripper
..     ...       ...  ...   ...                                                ...
508     18    Adriel  ...  paid  Nasty Girl (Featuring Diddy_ Nelly_ Jagged Edg...
509     18    Adriel  ...  paid                                            Someday
510     18    Adriel  ...  paid                                               None
511     18    Adriel  ...  paid                                               None
512     18    Adriel  ...  paid                                               None

[51

In [None]:
# check out full schema 
df_mini.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [None]:
# describe each column
df_mini.describe().toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  summary  ...              userId
0   count  ...              286500
1    mean  ...   59682.02278593872
2  stddev  ...  109091.94999910519
3     min  ...                    
4     max  ...                  99

[5 rows x 19 columns]

In [None]:
# describe 'ts'
df_mini.describe("ts").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+
|summary|                  ts|
+-------+--------------------+
|  count|              286500|
|   mean|1.540956889810471...|
| stddev|  1.50754396081869E9|
|    min|       1538352117000|
|    max|       1543799476000|
+-------+--------------------+

In [None]:
# look at a orderd count for all events in the page field
df_mini.select('page').groupBy('page').count().orderBy(desc('count')).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                         page   count
0                    NextSong  228108
1                        Home   14457
2                   Thumbs Up   12551
3             Add to Playlist    6526
4                  Add Friend    4277
5                 Roll Advert    3933
6                       Login    3241
7                      Logout    3226
8                 Thumbs Down    2546
9                   Downgrade    2055
10                       Help    1726
11                   Settings    1514
12                      About     924
13                    Upgrade     499
14              Save Settings     310
15                      Error     258
16             Submit Upgrade     159
17           Submit Downgrade      63
18                     Cancel      52
19  Cancellation Confirmation      52
20                   Register      18
21        Submit Registration       5

In [None]:
# what were the top 10 artists played
df_mini.filter(df_mini.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artistcount') \
    .sort(desc('Artistcount')) \
    .show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+
|              Artist|Artistcount|
+--------------------+-----------+
|       Kings Of Leon|       1841|
|            Coldplay|       1813|
|Florence + The Ma...|       1236|
|       Dwight Yoakam|       1135|
|            BjÃÂ¶rk|       1133|
|      The Black Keys|       1125|
|                Muse|       1090|
|       Justin Bieber|       1044|
|        Jack Johnson|       1007|
|              Eminem|        953|
+--------------------+-----------+
only showing top 10 rows

# Feature Engineering
Now that we have familiarized ourselves with the data, lets build out the features we find promising to train our model on. 


In [6]:
def feature_engineering(df):
    '''
    Function to go through all the Feature Engineering steps
    
    INPUT: df - as DataFrame
    OUTPUT: df - prepped with all the features
    
    '''
    # Create OneHotEncoder df
    # use StringIndexer to convert categorical column to indexed
    indexer = StringIndexer(inputCol="page", outputCol="pageIndex")
    indexed = indexer.fit(df).transform(df)

    # use OneHotEncoder to convert indexed to SparseVector
    encoder = OneHotEncoder(dropLast=False,
                            inputCol="pageIndex",
                            outputCol="page_categoryVec")

    encoded = encoder.transform(indexed)
    
    # make a list of all the columns to name the columns of the Matrix DataFrame
    columns_for_vector = pd.Series(['NextSong','Home','Thumbs Up','Add to Playlist',
                          'Add Friend','Roll Advert','Login','Logout','Thumbs Down',
                          'Downgrade','Help','Settings','About','Upgrade','Save Settings',
                          'Error', 'Submit Upgrade', 'Submit Downgrade','Cancel',
                          'Cancellation Confirmation','Register','Submit Registration'])

    existing_columns = pd.Series(encoded.columns)

    # concat full columns list
    columns = pd.concat([existing_columns,columns_for_vector]).tolist()
    
    # create matrix DataFrame from SparseVector
    def extract(row):
        return (row.artist,row.auth,row.firstName,row.gender,
                row.itemInSession,row.lastName,row.length,row.level,
                row.location,row.method,row.page,row.registration,
                row.sessionId,row.song,row.status,row.ts,row.userAgent,
                row.userId,row.pageIndex,
                row.page_categoryVec,) + tuple(row.page_categoryVec.toArray().tolist())

    vector_matrix_df = encoded.rdd.map(extract).toDF(columns)
    
    # Date manipulations
    # Create a function that returns string from a timestamp 
    def format_timestamp(ts):
        return dt.datetime.fromtimestamp(ts).strftime('%Y-%m-%d')

    # Create a date formating UDF
    format_timestamp_udf = udf(lambda x: format_timestamp(x/1000.0))

    # new date column outputs
    vector_matrix_df = vector_matrix_df.withColumn("str_date", format_timestamp_udf(vector_matrix_df.ts))
    vector_matrix_df = vector_matrix_df.withColumn("date", to_date(vector_matrix_df.str_date))
    vector_matrix_df=(vector_matrix_df.withColumn('dayssinceJan11900',F.datediff(vector_matrix_df.date,F.lit(dt.datetime(1900, 1, 1)))))

    # keep necessary columns
    dimensions_to_keep = pd.Series(['userId','date','dayssinceJan11900'])
    numeric_columns_to_keep = columns_for_vector

    # create daily_df
    columns_for_daily_df = pd.concat([dimensions_to_keep,numeric_columns_to_keep]).tolist()
    daily_df_staging = vector_matrix_df.select(columns_for_daily_df)
    daily_df = daily_df_staging.groupBy('userId','date','dayssinceJan11900').sum()

    # drop and rename 
    daily_df = daily_df.drop("sum(dayssinceJan11900)") # not needed
    daily_df = daily_df.drop("sum(Cancel)") # drop this, because it's essentially the same as Churn
    daily_df = daily_df.withColumnRenamed("sum(Cancellation Confirmation)","sum(Churn)") # redefine churn for daily_df
    
    # create numeric columns list
    columns_for_counter = daily_df.columns
    del columns_for_counter[0:3]
    del columns_for_counter[columns_for_counter.index('sum(Churn)')]
    
    # 7d Averages for all numeric matrix 
    # initiate Window value
    windowval = Window \
        .partitionBy("userId") \
        .orderBy(asc("dayssinceJan11900")) \
        .rangeBetween(-7, 0)

    # loop through all columns
    for x in columns_for_counter:
        daily_df = daily_df \
            .withColumn("{}_7d_avg".format(x), F.avg(x).over(windowval))
        
    # 30d Averages for all numeric matrix 
    # initiate Window value
    windowval = Window \
        .partitionBy("userId") \
        .orderBy(asc("dayssinceJan11900")) \
        .rangeBetween(-30, 0)

    # loop through all columns
    for x in columns_for_counter:
        daily_df = daily_df \
            .withColumn("{}_30d_avg".format(x), F.avg(x).over(windowval))
    
    # create numeric columns list subset for daily
    columns_for_daily = daily_df.columns
    start_index = columns_for_daily.index('dayssinceJan11900')
    del columns_for_daily[0:start_index+1]
    end_index = columns_for_daily.index('sum(Submit Registration)')
    del columns_for_daily[end_index+1:]
    del columns_for_daily[columns_for_daily.index('sum(Churn)')]
    
    # create numeric columns list subset for 7d avg
    columns_for_7d_avg = daily_df.columns
    start_index = columns_for_7d_avg.index('sum(Submit Registration)')
    del columns_for_7d_avg[0:start_index+1]
    end_index = columns_for_7d_avg.index('sum(Submit Registration)_7d_avg')
    del columns_for_7d_avg[end_index+1:]
    
    # create numeric columns list subset for 30d avg
    columns_for_30d_avg = daily_df.columns
    start_index = columns_for_30d_avg.index('sum(Submit Registration)_7d_avg')
    del columns_for_30d_avg[0:start_index+1]
    
    # Loop through pairs to create variance to the mean metric
    # Create variance metric
    for i,n in zip(columns_for_daily,columns_for_7d_avg):
        daily_df = daily_df \
            .withColumn('variance_daily_to_7d_avg_{}'.format(i), \
                        (daily_df[i] - daily_df[n]) \
                        / daily_df[n])
        
    # Create variance metric
    for i,n in zip(columns_for_daily,columns_for_30d_avg):
        daily_df = daily_df \
            .withColumn('variance_daily_to_30d_avg_{}'.format(i), \
                        (daily_df[i] - daily_df[n]) \
                        / daily_df[n])
        
    # Create variance metric
    for i,n in zip(columns_for_7d_avg,columns_for_30d_avg):
        daily_df = daily_df \
            .withColumn('variance_7d_avg_to_30d_avg_{}'.format(i), \
                        (daily_df[i] - daily_df[n]) \
                        / daily_df[n])
        
    # Getting days on platform as a tenure metric
    # Adjust window
    windowval = Window \
        .partitionBy("userId") \
        .orderBy(asc("dayssinceJan11900")) \
        .rangeBetween(Window.unboundedPreceding, 0)

    # Create min date
    daily_df = daily_df \
        .withColumn("min_date", F.min("dayssinceJan11900").over(windowval))

    # Create Tenure metric
    daily_df = daily_df \
        .withColumn('tenure', (daily_df['dayssinceJan11900'] - daily_df['min_date']) )
    
    
    # fill NULL values with 0
    daily_df = daily_df.na.fill(0)
    
    return daily_df
    
def vector_assembling(df):    
    '''
    Vector Assembling function
    INPUT: df
    OUTPUT:
        columns_for_vector
        df
    '''

    # create list of column names for features vector
    columns_for_vector = df.columns
    start_index = columns_for_vector.index('dayssinceJan11900')
    del columns_for_vector[0:start_index+1]
    to_make_label_index = columns_for_vector.index('sum(Churn)')
    del columns_for_vector[to_make_label_index]

    # create features vector
    assembler = VectorAssembler(
        inputCols=columns_for_vector,
        outputCol="features")

    # transform daily_df
    df = assembler.transform(df)
    df = df.withColumnRenamed("sum(Churn)","label")
    
    return columns_for_vector, df

    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Modeling
We will split the full dataset into train and test sets. We will test out several machine learning methods. We will evaluate the accuracy of the various models, tuning parameters as necessary. Finally, we will determine the winning model based on test accuracy.

In [7]:
def select_for_model(df):
    # only take needed columns to model
    ml_df = df.select('label','features')
    
    return ml_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
def oversampling(df):
    # Implement Oversampling method

    # calculate ratio
    major_df = df.filter(df.label == 0)
    minor_df = df.filter(df.label == 1)
    ratio = int(major_df.count()/minor_df.count())
    print("ratio: {}".format(ratio))
    a = range(ratio)

    # duplicate the minority rows
    oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in a]))).drop('dummy')

    # combine both oversampled minority rows and previous majority rows 
    combined_df = major_df.unionAll(oversampled_df)
    
    return combined_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# prep mini df

# apply feature engineering
prepped_df_mini = feature_engineering(df_mini)

# apply vector assembling
columns_for_vector, prepped_df_mini = vector_assembling(prepped_df_mini)

# select columns for model
prepped_df_mini = select_for_model(prepped_df_mini)

# apply oversampling
prepped_df_mini = oversampling(prepped_df_mini)

# split dataframe into train and test datasets
train_mini, test_mini = prepped_df_mini.randomSplit([0.7, 0.3], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ratio: 60

In [9]:
# prep full df

# apply feature engineering
prepped_df = feature_engineering(df)

# apply vector assembling
columns_for_vector, prepped_df = vector_assembling(prepped_df)

# select columns for model
prepped_df = select_for_model(prepped_df)

# apply oversampling
prepped_df = oversampling(prepped_df)

# split dataframe into train and test datasets
train, test = prepped_df.randomSplit([0.7, 0.3], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ratio: 57

In [None]:
# LogisticRegression

# create transformers
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')
normalizer = Normalizer(inputCol='scaled_features',outputCol='norm_scaled_features')

# set regression model
lr =  LogisticRegression(labelCol="label", featuresCol="norm_scaled_features",maxIter=10, regParam=0.0, elasticNetParam=0)

# create pipeline
pipeline = Pipeline(stages=[scaler, normalizer, lr])

# run CV on train data
lr_pipe = pipeline.fit(train)

# create prediction column on test data
results = lr_pipe.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-63:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 4893



All correct predections count:  87022
Total count:  172261
Accuracy %:  50.517528633875344
Recall %:  0.0

In [None]:
# evaluate results
correct_count = results.filter(results.label == results.prediction).count()
total_count = results.count()

correct_1_count = results.filter((results.label == 1) & (results.prediction == 1)).count()
total_1_test = results.filter((results.label == 1)).count()
total_1_predict = results.filter((results.prediction == 1)).count()


print("All correct predections count: ",correct_count)
print("Total count: ",total_count)
print("Accuracy %: ",(correct_count / total_count)*100)
print("Recall %: ",(correct_1_count / total_1_test)*100)
print("Precision %: ",(correct_1_count / total_1_predict)*100)

In [None]:
# RandomForestClassifier

# create transformers
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')
normalizer = Normalizer(inputCol='scaled_features',outputCol='norm_scaled_features')

# set rf model
rf = RandomForestClassifier(labelCol="label", featuresCol="norm_scaled_features")

# instantiate pipeline
pipeline = Pipeline(stages=[scaler, normalizer, rf])

# train model
model_rf = pipeline.fit(train)

# create prediction column on test data
results = model_rf.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-64:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 5097



All correct predections count:  119908
Total count:  172261
Accuracy %:  69.60832689929816
Recall %:  66.4070015720688

In [None]:
# evaluate results
correct_count = results.filter(results.label == results.prediction).count()
total_count = results.count()

correct_1_count = results.filter((results.label == 1) & (results.prediction == 1)).count()
total_1_test = results.filter((results.label == 1)).count()
total_1_predict = results.filter((results.prediction == 1)).count()


print("All correct predections count: ",correct_count)
print("Total count: ",total_count)
print("Accuracy %: ",(correct_count / total_count)*100)
print("Recall %: ",(correct_1_count / total_1_test)*100)
print("Precision %: ",(correct_1_count / total_1_predict)*100)

In [None]:
# extract feature information
tree = model_rf.stages[-1]

t = tree.featureImportances

df_f = pd.DataFrame(t.toArray(),
                   index=columns_for_vector,
                   columns=['importance'])

df_f.sort_values('importance',ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                            importance
sum(Downgrade)                                0.162432
variance_daily_to_7d_avg_sum(Downgrade)       0.117103
variance_daily_to_30d_avg_sum(Downgrade)      0.096459
sum(Add Friend)_30d_avg                       0.082330
sum(Downgrade)_7d_avg                         0.055010
...                                                ...
sum(Error)                                    0.000000
sum(Upgrade)                                  0.000000
variance_daily_to_30d_avg_sum(Roll Advert)    0.000000
variance_daily_to_30d_avg_sum(Login)          0.000000
variance_daily_to_7d_avg_sum(Register)        0.000000

[122 rows x 1 columns]

In [None]:
# USE the smaller dataset to go through the crossvalidation, the large dataset was too large
# Gradient Boosting Tree

# create transformers
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')
normalizer = Normalizer(inputCol='scaled_features',outputCol='norm_scaled_features')

# set a GBT model.
gbt = GBTClassifier(labelCol="label", featuresCol="norm_scaled_features", maxIter=10)

# create ParamGrid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth,[10,20]) \
    .build()

# set model Evaluator to "f1"
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")

# use CrossValidator to loop through the Paramgrid and use the best model
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# instantiate pipeline
pipelineCV = Pipeline(stages=[scaler, normalizer, crossval])

# run CV on train data
cv_gbt = pipelineCV.fit(train_mini)

# create prediction column on test dataframe
results = cv_gbt.transform(test_mini)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-12:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2495

name 'total_1_test' is not defined
Traceback (most recent call last):
NameError: name 'total_1_test' is not defined



In [None]:
# evaluate results
correct_count = results.filter(results.label == results.prediction).count()
total_count = results.count()

correct_1_count = results.filter((results.label == 1) & (results.prediction == 1)).count()
total_1_test = results.filter((results.label == 1)).count()
total_1_predict = results.filter((results.prediction == 1)).count()


print("All correct predections count: ",correct_count)
print("Total count: ",total_count)
print("Accuracy %: ",(correct_count / total_count)*100)
print("Recall %: ",(correct_1_count / total_1_test)*100)
print("Precision %: ",(correct_1_count / total_1_predict)*100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All correct predections count:  1878
Total count:  1891
Accuracy %:  99.31253305129562
Recall %:  100.0
Precision %:  98.6815415821501

In [None]:
# extract best model and feature information
treeCV = cv_gbt.stages[-1].bestModel
print(treeCV.extractParamMap())

# extract feature information
tree = cv_gbt.stages[-1]

t = treeCV.featureImportances

df_cv = pd.DataFrame(t.toArray(),
                   index=columns_for_vector,
                   columns=['importance'])

df_cv.sort_values('importance',ascending=False).head(10)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='GBTClassifier_d2dd7fa4733f', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False, Param(parent='GBTClassifier_d2dd7fa4733f', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10, Param(parent='GBTClassifier_d2dd7fa4733f', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'all', Param(parent='GBTClassifier_d2dd7fa4733f', name='featuresCol', doc='features column name'): 'norm_scaled_features', Param(parent='GBTClassifier_d2dd7fa4733f', name='labelCol', doc='label column nam

### Note: Running Gradient Boosting Tree with parameter input from CrossValidator

In [10]:
# Gradient Boosting Tree

# create transformers
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')
normalizer = Normalizer(inputCol='scaled_features',outputCol='norm_scaled_features')

# set a GBT model: MaxDepth=10, due to performance issues, should be 20 based on crossvalidation above
gbt = GBTClassifier(labelCol="label", featuresCol="norm_scaled_features" \
                    , maxIter=10 \
                    , maxDepth=10)

# instantiate pipeline
pipeline = Pipeline(stages=[scaler, normalizer, gbt])

# run CV on train data
pipe_gbt = pipeline.fit(train)

# create prediction column on test dataframe
results = pipe_gbt.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# evaluate results
correct_count = results.filter(results.label == results.prediction).count()
total_count = results.count()

correct_1_count = results.filter((results.label == 1) & (results.prediction == 1)).count()
total_1_predict = results.filter((results.prediction == 1)).count()
total_1_test  = results.filter((results.label == 1)).count()

print("All correct predections count: ",correct_count)
print("Total count: ",total_count)
print("Accuracy %: ",(correct_count / total_count)*100)
print("Recall %: ",(correct_1_count / total_1_test)*100)
print("Precision %: ",(correct_1_count / total_1_predict)*100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All correct predections count:  151035
Total count:  172261
Accuracy %:  87.67800024381607
Recall %:  94.64323423825056
Precision %:  82.88332716886532

In [12]:
# extract feature information
tree = pipe_gbt.stages[-1]

t = tree.featureImportances

df_gbt = pd.DataFrame(t.toArray(),
                   index=columns_for_vector,
                   columns=['importance'])

df_gbt.sort_values('importance',ascending=False).head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                          importance
sum(Downgrade)              0.037025
sum(NextSong)               0.035929
sum(Add Friend)             0.034679
sum(Add Friend)_30d_avg     0.032163
sum(Thumbs Up)              0.029579
sum(Settings)               0.026360
sum(Thumbs Down)_30d_avg    0.025391
sum(Logout)                 0.022366
sum(Home)                   0.019005
sum(Thumbs Down)            0.018565

# Conclusion


In this notebook, I gave you some practical tools to predict churn:
- We created a large set of calculated engagement metrics in our Feature Engineering segment
- We gave you the secret to working with imbalanced data: Oversampling
- We saw that the GradientBoostingTree Classifier works great on this dataset with x% accuracy

If you are running this business, I would suggest looking at your Ads per user (or per session).


## Reflection
Particularly interesting to me was finding the oversampling method to work so effectively and giving me such accurate predictions.

## Improvement
The way I'm dealing with Vectors can probably be more elegant. I'm struggling to pull information in and out of these structures. Additionally I could have used PCA to reduce my feauter dimensionality, which would have improved overall speed of execution of the models.
