# Predicting User Churn with Apache Spark and AWS EMR

This notebook contains data cleaning, feature creation and ML optimization for the Sparkify dataset.

## Setup

In [1]:
# imports
# pyspark sql
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.functions import from_unixtime, udf, col, when, isnan, desc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import min, max
from pyspark.sql.types import DoubleType

# pyspark ml
from pyspark.ml.feature import VectorAssembler, Normalizer, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200828100649-0003
KERNEL_ID = 5c26be5a-e536-469c-896d-681990c5b6c2


In [2]:
# python imports
import datetime
from typing import NewType
pysparkdf = NewType('pysparkdf', object)

In [3]:
# ibm config
import ibmos2spark

# config
# @hidden_cell
credentials = {
    'endpoint': 'https://s3.eu-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-147e1161-7da9-41fe-ac00-c144730def00',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': 'kAtvjdC8VIYYUmU3gDaOYIK2fCvP3nkjYYlDiNuu4gw6'
}

configuration_name = 'os_76774389dfa04fb5acbb1640b3e11704_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

In [4]:
# Create Spark session
spark = SparkSession.builder.appName("User Churn").getOrCreate()

# Read in data from IBM Cloud
data_df = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-fnqu5byx41gcai'))

print("Started Spark version {}".format(spark.version))

Started Spark version 2.3.4


### EDA

In [6]:
print("Row 1:")
data_df = df
data_df.head(1)

Row 1:


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Martin Orford,Logged In,Joseph,M,20,Morales,597.55057,free,"Corpus Christi, TX",PUT,NextSong,1532063507000,292,Grand Designs,200,1538352011000,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",293
1,John Brown's Body,Logged In,Sawyer,M,74,Larson,380.21179,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1538069638000,97,Bulls,200,1538352025000,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",98


In [7]:
# count users
print("Number of users:")
data_df.agg(countDistinct("userId")).collect()

Number of users:


[Row(count(DISTINCT userId)=449)]

In [8]:
# count events
print("Number of log events:")
print((data_df.count(), len(data_df.columns)))

Number of log events:
(543705, 18)


In [9]:
print("start and end unix time:")
data_df.select(min("ts"), max("ts")).first()

start and end unix time:


Row(min(ts)=1538352011000, max(ts)=1543622466000)

## Data Wrangling

In [10]:
def clean_df(data_df):
    """Remove non useful columns and drop missing values for user and session.
    """
    # lets remove some of the columns we don't think will be useful from data exploration
    cols_to_drop = ['firstName', 'lastName','artist', 'song', 'method', 'status', 'userAgent']
    user_log_df = data_df.drop(*cols_to_drop)
    
    # drop rows with missing info
    return user_log_df.dropna(how = "any", subset = ["userId", "sessionId"])

def unix_to_datetime(data_df):
    """ onvert unix timestamps to datetime.
    """
    # event unix to datetime
    data_df = data_df.withColumn("timestampDatetime",
                                         from_unixtime(user_log_valid.ts/1000,
                                                       format='yyyy-MM-dd HH:mm:ss'))
    # registration unix to datetime
    data_df = data_df.withColumn("registrationDatetime",
                                         from_unixtime(user_log_valid.registration/1000,
                                                       format='yyyy-MM-dd HH:mm:ss'))
    return data_df

def create_us_states(data_df):
    """Create US states column from location.
    """
    # we don't really want to drop these rows as the col isn't vital 
    # so replace missing values to allow split
    data_df = data_df.fillna({'location':'unknown'})

    # define UDFs
    # create state column
    loc_split = udf(lambda x: x.split(', ')[-1], StringType())

    # Sates seem to be appended, so take latest
    state_split = udf(lambda x: x.split('-')[-1], StringType())

    # apply UDFs
    data_df = data_df.withColumn("usStateAbbr",
                                 when(data_df.location.isNotNull(),
                                      loc_split(data_df.location)).otherwise(''))
    data_df = data_df.withColumn("usStateAbbr",
                                 when(data_df.usStateAbbr.isNotNull(),
                                      state_split(data_df.usStateAbbr)).otherwise(''))
    return data_df

def replace_missing_gender(data_df):
    """Replace missing gender with 'unknown'.
    """
    return data_df.fillna({'gender':'unknown'})

In [11]:
user_log_valid = clean_df(data_df)
user_log_valid = unix_to_datetime(user_log_valid)
user_log_valid = create_us_states(user_log_valid)
user_log_valid = replace_missing_gender(user_log_valid)

In [12]:
wrangled_data = user_log_valid.withColumnRenamed("auth", "authLevel").\
                               withColumnRenamed("length", "sessionLength_s").\
                               withColumnRenamed("level", "subLevel").\
                               withColumnRenamed("ts", "unixEventTS").\
                               withColumnRenamed("registration", "unixRegistrationTS")

## Feature Engineering

In [13]:
def create_phase_feature(data_df: pysparkdf) -> pysparkdf:
    """Use the cancellation to identify churned users.
    """
    # flag any cancellation confirmation events in pages
    flag_cancellation_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
    data_df = data_df.withColumn("churn", flag_cancellation_event("page"))

    # search for flags and fill user rows as churned
    windowval = Window.partitionBy("userId").orderBy(desc("unixEventTS")).rangeBetween(Window.unboundedPreceding, 0)
    return data_df.withColumn("label", Fsum("churn").over(windowval))

def avg_items_in_session(data_df: pysparkdf)-> pysparkdf:
    """Calculate avg items in session for each user.
    """
    # calculate metric and join dfs back together
    return data_df.join(data_df.groupBy('userId').avg('itemInSession'),
                               on='userId')

def avg_user_listening_time(data_df: pysparkdf) -> pysparkdf:
    """Calculate average listening time.
    """
    return data_df.join(data_df.groupBy('userId').avg('sessionLength_s'),
                               on='userId')

def recommendation_performance_good(data_df: pysparkdf) -> pysparkdf:
    """Number of positive recommendation events.
    """
    # flag events
    data_df = data_df.withColumn("recc_performance_good_events",
                                 when((data_df["page"] == 'Add to Playlist') |\
                                      (data_df["page"] == 'Add Friend') |\
                                      (data_df["page"] == 'Thumbs Up'),
                                       1).otherwise(0))
    # calculate number
    return data_df.join(data_df.groupBy('userId').sum('recc_performance_good_events'),
                               on='userId')

def recommendation_performance_bad(data_df: pysparkdf) -> pysparkdf:
    """Number of bad recommendation events.
    """
    # flag events
    data_df = data_df.withColumn("recc_performance_bad_events",
                                 when((data_df["page"] == 'Thumbs Down'),
                                       1).otherwise(0))
    # calculate number
    return data_df.join(data_df.groupBy('userId').sum('recc_performance_bad_events'),
                               on='userId')

def system_performance_bad(data_df: pysparkdf) -> pysparkdf:
    """Number of bad system events.
    """
    # flag events
    data_df = data_df.withColumn("sys_performance_bad",
                                 when((data_df["page"] == 'Help') |\
                                      (data_df["page"] == 'Upgrade') |\
                                      (data_df["page"] == 'Error'),
                                       1).otherwise(0))
    # calculate number                                
    return data_df.join(data_df.groupBy('userId').sum('sys_performance_bad'),
                               on='userId')

In [14]:
# create features
wrangled_data = create_phase_feature(wrangled_data)
wrangled_data = avg_items_in_session(wrangled_data)
wrangled_data = avg_user_listening_time(wrangled_data)
wrangled_data = recommendation_performance_good(wrangled_data)
wrangled_data = recommendation_performance_bad(wrangled_data)
wrangled_data = system_performance_bad(wrangled_data)

In [15]:
# create ml features df
ml_features_data = wrangled_data.select('label',
                                        'userId',
                                        'gender',
                                        'usStateAbbr',
                                        col('avg(itemInSession)').alias("avg_item_in_session"),
                                        col('avg(sessionLength_s)').alias('avg_session_length'),
                                        col('sum(recc_performance_good_events)').alias('num_good_recc'),
                                        col('sum(recc_performance_bad_events)').alias('num_bad_recc'),
                                        col('sum(sys_performance_bad)').alias('num_bad_sys'))

In [16]:
ml_features_data = ml_features_data.drop_duplicates()  # remove duplicates as features will be created on each event row
data_df.unpersist();  # clean up to free up space
wrangled_data.unpersist();  # clean up to free up space

In [17]:
ml_features_data.limit(2).toPandas()

Unnamed: 0,label,userId,gender,usStateAbbr,avg_item_in_session,avg_session_length,num_good_recc,num_bad_recc,num_bad_sys
0,1,100010,F,CT,33.839416,269.48907,8,3,2
1,1,200002,M,WI,54.832911,253.857506,23,5,3


In [20]:
def onehot_encoder(ml_features_data):
    ## https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml

    cols = ['gender', 'usStateAbbr']

    indexers = [
        StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
        for c in cols
    ]

    encoders = [
        OneHotEncoder(
            inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
        for indexer in indexers
    ]

    assembler = VectorAssembler(
        inputCols=[encoder.getOutputCol() for encoder in encoders],
        outputCol="features2"
    )

    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    return pipeline.fit(ml_features_data).transform(ml_features_data)

ml_features_data = onehot_encoder(ml_features_data)

In [21]:
print('Shape of ML table:')
print((ml_features_data.count(), len(ml_features_data.columns)))

Shape of ML table:
(449, 14)


In [22]:
ml_features_data = ml_features_data.na.drop()  # catch any remaining nans

In [23]:
print('Shape of ML table:')
print((ml_features_data.count(), len(ml_features_data.columns)))

Shape of ML table:
(448, 14)


In [24]:
def vector_assembler(ml_features_data):

    # this vector is created in prep for ml
    assembler = VectorAssembler(inputCols=["avg_item_in_session",
                                           "avg_session_length",
                                           "num_good_recc",
                                           "num_bad_recc",
                                           "num_bad_sys",
                                           'gender_indexed',
                                           'usStateAbbr_indexed'],
                                outputCol="raw_features",)
                                #handleInvalid="skip")

    return assembler.transform(ml_features_data)

ml_features_data = vector_assembler(ml_features_data)

In [25]:
def feature_scaler(ml_features_data):
    """apply scaler
    """
    scaler = Normalizer(inputCol="raw_features", outputCol="features")
    return scaler.transform(ml_features_data)

ml_df = feature_scaler(ml_features_data)

## Machine Learning

### Custom Evaluators

In [26]:
def train_set_evaluator(train):
    """Check for data skew
    """
    churned_count = train.filter(train['label']==1).count()
    non_churned_count = train.filter(train['label']==0).count()
    
    print("{} churned users".format(churned_count))
    print("{} non-churned users".format(non_churned_count))
    print("{} ratio of churned/non-churned users".format(churned_count/non_churned_count))

def custom_evaluation(results):
    """Customer function to evaluate result.
    """
    # Generic evaluation
    total_results = results.count()
    correct_pred = results.filter(results.label == results.prediction).count()
    incorrect_pred = results.filter(results.label != results.prediction).count()
    
    print('Total users predicted correctly: {}'.format(correct_pred))
    print('Total users predicted wrongly: {}'.format(incorrect_pred))
    print("Percentage predicted correct (%): {} \n".format((correct_pred/total_results)*100))
    
    # Correct churn predictions
    churn_correct = results.filter((results.label == 1) & (results.prediction == 1)).count()
    actual_churned_users = results.filter(results.label == 1).count()
    print('User churned and predicted to churn: {}'.format(churn_correct))
    print('Total users churned : {}'.format(actual_churned_users))
    print('Percent churned user events predicted correctly(%): {}\n'.format((churn_correct/actual_churned_users)*100))
    
    # Incorrect churn predictions
    churn_incorrect = results.filter((results.label == 0) & (results.prediction == 1)).count()
    print('User did not churn and predicted to: {}'.format(churn_incorrect))
    print('Percent churned users predicted incorrectly(%): {}\n'.format((churn_incorrect/total_results)*100))

def model_metrics(results):
    # ref: https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-apache-spark-2-4-5-and-pyspark-python

    # Create both evaluators
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol="target",
                                                       predictionCol="prediction")
    evaluator = BinaryClassificationEvaluator(labelCol="target",
                                              rawPredictionCol="prediction",
                                              metricName='areaUnderROC')
    
    # Make predicitons
    results_cast = results.withColumn("target", col("label").cast(DoubleType()))
    predictionAndTarget = results_cast.select("target","prediction")

    # Get metrics
    acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
    print("Accuracy: {}".format(acc))
    
    f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
    print("F1Score: {}".format(f1))
     
    weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
    print("WeightedPrecision: {}".format(weightedPrecision))

    weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
    print("WeightedRecall: {}".format(weightedRecall))
    
    auc = evaluator.evaluate(predictionAndTarget)
    print("AUC: {}".format(auc))

### Data Split

In [28]:
ml_df = ml_df.select(['label', 'features'])

In [29]:
ml_df.limit(2).toPandas()

Unnamed: 0,label,features
0,1,"[0.1243353042150227, 0.9901768242441354, 0.029..."
1,1,"[0.2102401746006758, 0.9733396424936257, 0.088..."


In [30]:
# train test split for ML validation
train, test =  ml_df.randomSplit([0.7, 0.3], seed=42)  # more equal fit to combat overfitting

In [31]:
train_mini = train.limit(50)
test_mini = test.limit(50)
train = train_mini
test = test_mini

In [32]:
train_set_evaluator(train)

11 churned users
39 non-churned users
0.28205128205128205 ratio of churned/non-churned users


### Logistic Regression

In [33]:
print('Baseline Logistic Regression Model')
lr_model = LogisticRegression().fit(train)
lr_results = lr_model.transform(test)

Baseline Logistic Regression Model


In [34]:
custom_evaluation(lr_results)

Total users predicted correctly: 35
Total users predicted wrongly: 15
Percentage predicted correct (%): 70.0 

User churned and predicted to churn: 5
Total users churned : 14
Percent churned user events predicted correctly(%): 35.714285714285715

User did not churn and predicted to: 6
Percent churned users predicted incorrectly(%): 12.0



In [35]:
model_metrics(lr_results)

Accuracy: 0.7
F1Score: 0.6880000000000001
WeightedPrecision: 0.6811188811188811
WeightedRecall: 0.7
AUC: 0.5952380952380953


In [36]:
lr_results.unpersist();  # clean up to free up space!

DataFrame[label: bigint, features: vector, rawPrediction: vector, probability: vector, prediction: double]

### RFC

In [37]:
print('Baseline Random Forest Classifier Model')
rfc_model = RandomForestClassifier().fit(train)
rfc_results = rfc_model.transform(test)

Baseline Random Forest Classifier Model


In [38]:
custom_evaluation(rfc_results)

Total users predicted correctly: 33
Total users predicted wrongly: 17
Percentage predicted correct (%): 66.0 

User churned and predicted to churn: 1
Total users churned : 14
Percent churned user events predicted correctly(%): 7.142857142857142

User did not churn and predicted to: 4
Percent churned users predicted incorrectly(%): 8.0



In [39]:
model_metrics(rfc_results)

Accuracy: 0.66
F1Score: 0.5983625730994152
WeightedPrecision: 0.5680000000000001
WeightedRecall: 0.66
AUC: 0.4801587301587301


In [40]:
rfc_results.unpersist()   # clean up to free up space!

DataFrame[label: bigint, features: vector, rawPrediction: vector, probability: vector, prediction: double]

### GBT

In [41]:
print('Baseline GBT Classifier Model')
gbt_model = GBTClassifier().fit(train)
gbt_results = gbt_model.transform(test)

Baseline GBT Classifier Model


In [42]:
gbt_model.featureImportances

SparseVector(7, {0: 0.1441, 1: 0.2835, 2: 0.0555, 3: 0.1037, 4: 0.0528, 5: 0.1618, 6: 0.1986})

In [43]:
custom_evaluation(gbt_results)

Total users predicted correctly: 28
Total users predicted wrongly: 22
Percentage predicted correct (%): 56.00000000000001 

User churned and predicted to churn: 1
Total users churned : 14
Percent churned user events predicted correctly(%): 7.142857142857142

User did not churn and predicted to: 9
Percent churned users predicted incorrectly(%): 18.0



In [44]:
model_metrics(gbt_results)

Accuracy: 0.56
F1Score: 0.5349122807017545
WeightedPrecision: 0.514
WeightedRecall: 0.56
AUC: 0.41071428571428575


In [45]:
gbt_results.unpersist()  # clean up to free up space!

DataFrame[label: bigint, features: vector, rawPrediction: vector, probability: vector, prediction: double]

### Cross Validation

In [49]:
# pipeline, just running it on classifier no transformations
gbt_cv_model = GBTClassifier()

pipeline = Pipeline(stages=[gbt_cv_model])

# set up param grid to iterate over
paramGrid = ParamGridBuilder()\
.addGrid(gbt_model.maxDepth, [4, 6])\
.addGrid(gbt_model.maxBins, [15, 40])\
.addGrid(gbt_model.stepSize, [0.05, 0.2])\
.build()

# default: 5
# default: 32
# default: 0.1

# set up crossvalidator to tune parameters and optimize, returns best model
crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=paramGrid,
                         evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',
                                                                     labelCol='label',
                                                                     metricName='f1'),
                         numFolds=3) #3

#https://towardsdatascience.com/20-popular-machine-learning-metrics-part-1-classification-regression-evaluation-metrics-1ca3e282a2ce
#Recall= True_Positive/ (True_Positive+ False_Negative)

In [None]:
cvModel = crossval.fit(train)  # train model
cv_results = cvModel.transform(test)  # apply model on test data

In [None]:
custom_evaluation(cv_results) 

In [None]:
model_metrics(cv_results)