In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from os.path import abspath
from pyspark import StorageLevel

from pyspark.sql.types import  IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler, Normalizer
from pyspark.ml.feature import StringIndexer, VectorIndexer

from pyspark.sql.functions import isnan, when, count, col, udf, round, first, avg, lit
import pyspark.sql.functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pandas as pd
import time, datetime

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier


# Define the user dataset to load and the evaluation period 
 <a id='home'></a>

In [2]:
log_file_size = 'giga'



evaluation_period_config = {
    "tiny" : { # the last one day of the 3 days logs
        "start_date_str" : "21/12/2017",
        "end_date_str" : "22/12/2018",
   },
    "mini" : { # the last two weeks of the 2 months logs
        "start_date_str" : "20/11/2018",
        "end_date_str" : "04/12/2018",
   },
    "giga" : { # the last two weeks of the 2 months logs
        "start_date_str" : "20/11/2018",
        "end_date_str" : "04/12/2018",
   }
}

evaluation_start_date_str = evaluation_period_config[log_file_size]["start_date_str"]
evaluation_start_dt = datetime.datetime.strptime(evaluation_start_date_str,"%d/%m/%Y") 
evaluation_start_tms = int(datetime.datetime.timestamp(evaluation_start_dt)) * 1000


user_log_table = log_file_size+'_user_logs'
user_data_table = log_file_size+'_user_data_'+evaluation_start_date_str.replace('/','_')


def showNice(sdf, n=5):
    return pd.DataFrame(sdf.take(n), columns=sdf.columns)


In [3]:
# Start or get a Spark session with Hive support

spark = SparkSession \
    .builder \
    .appName("sparkify-features-"+log_file_size) \
    .config("spark.sql.warehouse.dir", abspath('spark-warehouse')) \
    .config("spark.driver.memory", "8g")\
    .enableHiveSupport() \
    .getOrCreate()


# Load user data from Hive 

In [4]:
# Load Users table to DF and define churn label

print(f"Loading user dataFrame from table {user_data_table}")
df_user_data = spark.sql("select * from {}".format(user_data_table))

# Initializing the list of features Of Interest to predict churn of users
colOI = ['avg_nb_logs', 'max_days_before_eval', 'avg_nb_session_per_day', 'avg_length_per_day', 'avg_length_per_session']

# Preview of the user dataset
showNice(df_user_data, 2)

Loading user dataFrame from table giga_user_data_20_11_2018


Unnamed: 0,userId,max_days_before_eval,active_days,avg_nb_session_per_day,avg_length_per_day,avg_length_per_session,sum_nb_NextSong,avg_nb_NextSong,avg_pct_NextSong,sum_nb_ThumbsUp,...,avg_nb_Error,avg_pct_Error,sum_nb_SubmitDowngrade,avg_nb_SubmitDowngrade,avg_pct_SubmitDowngrade,sum_nb_logs,avg_nb_logs,avg_pct_logs,churn,avg_active_days
0,1169050,50,41,1.487805,31062.292683,24612.613821,5139,125.341463,0.821971,274,...,0.121951,0.000509,1,0.02439,0.0004,6147,149.926829,1.0,0,0.82
1,1105878,50,40,1.35,26677.75,22427.4,4279,106.975,0.831642,191,...,0.1,0.00051,1,0.025,0.000115,5051,126.275,1.0,1,0.8


***
# How to Prepare dataframe for ML 

In [5]:
def prepare_dfs_4_model(df, features_list):
    
    cols_to_drop = ("features","ScaledFeatures")
    df = df.drop(* cols_to_drop)

    # Create a vector NumFeatures with numerical features
    featureAssembler = VectorAssembler(inputCols=features_list, outputCol="features")
    df = featureAssembler.transform(df)

    # Scale the features vector 
    featureScaler = StandardScaler(inputCol="features", outputCol="ScaledFeatures", withStd=True)
    featureScalerModel = featureScaler.fit(df)
    df = featureScalerModel.transform(df)

    df = df.select(col("userId"), col("churn").alias("label"), col("features"), col('ScaledFeatures'))

    # Index the label
    cols_to_drop = ("indexedLabel", "indexedFeatures")
    df = df.drop(* cols_to_drop)
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)
    df = labelIndexer.transform(df)
    featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2).fit(df)
    df = featureIndexer.transform(df)

    (trainingData, testData) = df.randomSplit([0.8, 0.2])

    return (trainingData, testData)

#(trainingData, testData) = prepare_dfs_4_model(df_user_data, colOI)

***

# How to evaluate models

In [20]:
def get_model_recall_on_label(df_ml_pred, label):
    evaluator_recall = MulticlassClassificationEvaluator(metricName='recallByLabel', metricLabel=label)
    score = evaluator_recall.evaluate(df_ml_pred)
    return score

def get_model_f1(df_ml_pred):
    evaluator = MulticlassClassificationEvaluator(metricName='f1')
    score = evaluator.evaluate(df_ml_pred)
    return score


***
# How to execute models

In [50]:
def evaluate_model(df, model_name = '', details=False):

        
    score_recall_0 = get_model_recall_on_label(df, 0)
    score_recall_1 = get_model_recall_on_label(df, 1)
    score_f1 = get_model_f1(df)
    ret = {"score_f1":score_f1,"score_recall_0":score_recall_0, "score_recall_1":score_recall_1} 

    if(details):
        print("-"*50, "\n", model_name)
        print(f"Recall ->  on Renovations = {score_recall_0:.2f}, on Cancelations = {score_recall_1:.2f}")
        model_evaluation_df = df.groupBy().agg(
            count(col('userId')).alias('nb_users'),
            count_if(col('label') == 1).alias('nb_cancellations'),
            count_if((col('label') == 1) & (col('prediction') == col('label'))).alias('nb_cancellations_predicted'),
            count_if(col('label') == 0).alias('nb_renovations'),
            count_if((col('label') == 0) & (col('prediction') == col('label'))).alias('nb_renovations_predicted')
        ).withColumn(
            'score_f1',lit(score_f1)
        ).withColumn(
            'score_on_renovations_predicted', 
            F.round(col('nb_renovations_predicted')/col('nb_renovations'), 2) 
        ).withColumn(
            'score_on_cancellations_predicted', 
            F.round(col('nb_cancellations_predicted')/col('nb_cancellations'), 2) 
        )
        model_evaluation_df.show()
        
    return ret

## Logistic Regression

In [60]:
def execute_lr(trainingData, testData, details=False):
    lr = LogisticRegression(featuresCol="ScaledFeatures", standardization=False, labelCol="label", \
                            maxIter=100, regParam=0.3, elasticNetParam=0.2, fitIntercept=False, \
                            threshold=0.12)
    lr_model = lr.fit(trainingData)
    lr_pred = lr_model.transform(testData)

    f1_score = get_model_f1(lr_pred)
    print('Logistic Regression - f1 score = {:0.5f}'.format(f1_score))
    return f1_score
    
#lr_scores = execute_lr(trainingData, testData, True)

Logistic Regression - f1 score = 0.86188


## Random forest

In [61]:
def execute_rf(trainingData, testData, details=False):

    randomForestClassifier = RandomForestClassifier(numTrees=10, \
                                                  featuresCol='ScaledFeatures' )
    rfc_model = randomForestClassifier.fit(trainingData)
    rfc_pred = rfc_model.transform(testData)

    f1_score = get_model_f1(rfc_pred)
    print('Random Forest Classifier - f1 score = {:0.5f}'.format(f1_score))
    return f1_score

#rf_scores = execute_rf(trainingData, testData, True)

Random Forest Classifier - f1 score = 0.89802


## Gradient-boosted tree

In [63]:
def execute_gbt(trainingData, testData, details=False):

    gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="ScaledFeatures")
    gbt_model = gbt.fit(trainingData)
    gbt_pred = gbt_model.transform(testData)

    f1_score = get_model_f1(gbt_pred)
    print('Gradient-boosted tree - f1 score = {:0.5f}'.format(f1_score))
    return f1_score
    
gbt_scores = execute_gbt(trainingData, trainingData, True)

Gradient-boosted tree - f1 score = 0.89939


***

# List of features from the step 1 - preprocessing

In [64]:
colOI = ['active_days', 'avg_nb_session_per_day', 'avg_length_per_day', 'avg_length_per_session', 'max_days_before_eval', 'sum_nb_NextSong', 'avg_nb_NextSong',  'sum_nb_ThumbsUp', 'avg_nb_ThumbsUp', 'sum_nb_AddtoPlaylist', 
'avg_nb_AddtoPlaylist', 'sum_nb_AddFriend', 'avg_nb_AddFriend', 'sum_nb_Downgrade', 'avg_nb_Downgrade', 'sum_nb_ThumbsDown', 
'avg_nb_ThumbsDown', 'sum_nb_Help', 'avg_nb_Help', 'sum_nb_SaveSettings', 'avg_nb_SaveSettings', 'sum_nb_Error', 'avg_nb_Error',
'sum_nb_SubmitDowngrade', 'avg_nb_SubmitDowngrade', 'avg_nb_logs', 'sum_nb_logs', 
'avg_pct_NextSong', 'avg_pct_ThumbsUp', 'avg_pct_AddtoPlaylist', 'avg_pct_AddFriend', 'avg_pct_ThumbsDown', 'avg_pct_Help', 'avg_pct_SaveSettings', 'avg_pct_Error'  ]


***
# Evaluate features by correlation score 
We will calculate the correlation score between the features and the churn label.  
We will mark as relevant all the features having a correlation with churn superior to 0.05 or inferior to 0.05 

In [11]:
def get_correlation_with_churn(df):
    corr_features = {}
    relevant_features = [] 

    df_users_sample_ids = ['userId', 'churn']

    list_features = [f for f in df.columns if f not in df_users_sample_ids]
    for f in list_features:
        corr_value = df.stat.corr("churn", f)
        corr_features[f] = corr_value 
        if(abs(corr_value) > 0.05):
            relevant_features.append(f)
    
    return corr_features, relevant_features

corr_feature_exp0, relevant_features = get_correlation_with_churn(df_user_data)

In [12]:
print("Correlation score for each features")
df_features_corr = pd.DataFrame.from_dict(corr_feature_exp0, orient='index', columns=['corr']).sort_values(by='corr', ascending=False)
df_features_corr.head(50)

Correlation score for each features


Unnamed: 0,corr
active_days,0.257317
avg_active_days,0.251368
sum_nb_NextSong,0.245814
sum_nb_Downgrade,0.239975
sum_nb_ThumbsDown,0.239476
sum_nb_AddtoPlaylist,0.237141
sum_nb_AddFriend,0.23634
sum_nb_ThumbsUp,0.208192
sum_nb_SaveSettings,0.188833
avg_nb_NextSong,0.12553


In [13]:
print("\n List of features with more than 0.05 or less than -0.05 ")
print("We defined {} relevant features for churn prediction".format(len(relevant_features)))
relevant_features


 List of features with more than 0.05 or less than -0.05 
We defined 22 relevant features for churn prediction


['max_days_before_eval',
 'active_days',
 'avg_length_per_day',
 'avg_length_per_session',
 'sum_nb_NextSong',
 'avg_nb_NextSong',
 'avg_pct_NextSong',
 'sum_nb_ThumbsUp',
 'avg_nb_ThumbsUp',
 'sum_nb_AddtoPlaylist',
 'avg_nb_AddtoPlaylist',
 'sum_nb_AddFriend',
 'avg_nb_AddFriend',
 'sum_nb_Downgrade',
 'avg_nb_Downgrade',
 'avg_pct_Downgrade',
 'sum_nb_ThumbsDown',
 'avg_nb_ThumbsDown',
 'sum_nb_SaveSettings',
 'sum_nb_SubmitDowngrade',
 'sum_nb_logs',
 'avg_active_days']

***
# Split the dataset and fit some ML models

In [65]:
%%time
#pack the data and train ML

# Features list is colOI or relevant_features

(trainingData, testData) = prepare_dfs_4_model(df_user_data, colOI)
trainingData.persist(StorageLevel.MEMORY_ONLY)
testData.persist(StorageLevel.MEMORY_ONLY)

lr_scores = execute_lr(trainingData, testData, True)
rf_scores = execute_rf(trainingData, testData, True)
gbt_scores = execute_gbt(trainingData, testData, True)


Logistic Regression - f1 score = 0.86271
Random Forest Classifier - f1 score = 0.89360
Gradient-boosted tree - f1 score = 0.89520
CPU times: user 91.7 ms, sys: 25.7 ms, total: 117 ms
Wall time: 1min 8s


In [15]:
def execute_gbt(trainingData, testData, details=False):

    gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
    gbt_model = gbt.fit(trainingData)
    gbt_pred = gbt_model.transform(testData)

    gbt_score = evaluate_model(gbt_pred, 'Gradient-boosted tree', details)

# Features list is relevant_features | colOI
# The gbt model give better results with the full list of available features 
(trainingData, testData) = prepare_dfs_4_model(df_user_data, colOI)
trainingData.persist(StorageLevel.MEMORY_ONLY)
testData.persist(StorageLevel.MEMORY_ONLY)

gbt_scores = execute_gbt(trainingData, testData, True)
gbt_scores    

-------------------------------------------------- 
 Gradient-boosted tree
Recall ->  on Renovations = 1.00, on Cancelations = 0.02
+--------+----------------+--------------------------+--------------+------------------------+-----------------+------------------------------+--------------------------------+
|nb_users|nb_cancellations|nb_cancellations_predicted|nb_renovations|nb_renovations_predicted|         score_f1|score_on_renovations_predicted|score_on_cancellations_predicted|
+--------+----------------+--------------------------+--------------+------------------------+-----------------+------------------------------+--------------------------------+
|    4482|             307|                         5|          4175|                    4167|0.900237754323376|                           1.0|                            0.02|
+--------+----------------+--------------------------+--------------+------------------------+-----------------+------------------------------+-----------------

In [16]:
%%time
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="ScaledFeatures", maxIter=10, maxDepth=5)

#gbt_model = gbt.fit(trainingData)
#gbt_pred = gbt_model.transform(testData)
#gbt_score = evaluate_model(gbt_pred, 'Gradient-boosted tree')

pipeline = Pipeline(stages=[gbt])

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth,[3, 5, 10]) \
    .addGrid(gbt.maxIter,[10, 20, 50]) \
    .addGrid(gbt.maxBins,[10, 30]) \
    .addGrid(gbt.minInstancesPerNode,[1, 2]) \
    .build()


# Default model hyper params
"""
cacheNodeIds = False
checkpointInterval = 10
featureSubsetStrategy = all
featuresCol = ScaledFeatures
impurity = variance
labelCol = indexedLabel
leafCol = 
lossType = logistic
maxBins = 32
maxDepth = 5 -
maxIter = 10 -
maxMemoryInMB = 256
minInfoGain = 0.3 -
minInstancesPerNode = 1 -
minWeightFractionPerNode = 0.0
predictionCol = prediction
probabilityCol = probability
rawPredictionCol = rawPrediction
seed = 6596411224379334233
stepSize = 0.1
subsamplingRate = 1.0
validationTol = 0.01
"""

# Hyper params found on CV Best model 
""" 
cacheNodeIds = False
checkpointInterval = 10
featureSubsetStrategy = all
featuresCol = ScaledFeatures
impurity = variance
labelCol = indexedLabel
leafCol = 
lossType = logistic
maxBins = 30
maxDepth = 5 -
maxIter = 10 -
maxMemoryInMB = 256
minInfoGain = 0.0 -
minInstancesPerNode = 1 -
minWeightFractionPerNode = 0.0
predictionCol = prediction
probabilityCol = probability
rawPredictionCol = rawPrediction
seed = -46955970000844093
stepSize = 0.1
subsamplingRate = 1.0
validationTol = 0.01"""

mcEvaluator = MulticlassClassificationEvaluator(metricName='f1')
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=mcEvaluator,
                          numFolds=5)

gbt_CV = crossval.fit(trainingData)
gbt_CV.avgMetrics

CPU times: user 6.78 s, sys: 1.88 s, total: 8.65 s
Wall time: 1h 45min 28s


[0.8918544273595881,
 0.8918544273595881,
 0.8914420162446484,
 0.8914706288259926,
 0.8922723774738587,
 0.8922723774738587,
 0.8918029239834274,
 0.8918032272560891,
 0.8921767039385688,
 0.8924505316123161,
 0.8923447415399302,
 0.8928969002330915,
 0.8930532838161447,
 0.893056637196646,
 0.893946677696132,
 0.8936308315636147,
 0.8923964656198871,
 0.8926428925822978,
 0.8936084995864642,
 0.8936697001937832,
 0.8931527306956744,
 0.8928365304563272,
 0.8933381394731447,
 0.8938742379298663,
 0.8881898635747638,
 0.8898917212110552,
 0.8893753976510401,
 0.8913121901285856,
 0.8888297880962944,
 0.8905626443726189,
 0.8895220369517398,
 0.8923345886940921,
 0.889053266251377,
 0.8911584745833113,
 0.8912674551853655,
 0.8924206703977313]

***
# Trying the best model of gbt

The model tunning did not give a better result. 
# default params -> f1 score = 0.89939
# tunned params  -> f1 score = 0.89639


In [67]:
gbt_pred = gbt_CV.transform(testData)

gbt_best_f1_score = get_model_f1(gbt_pred)
print('Gradient-boosted tree - f1 score = {:0.5f}'.format(gbt_best_f1_score))


Gradient-boosted tree - f1 score = 0.89639


In [69]:
# Looking at the optimal hyper parameters found
bestModel_CV = gbt_CV.bestModel
evaluator_CV = gbt_CV.getEvaluator() 

a_GBTClassificationModel = bestModel_CV.stages[0]
featureImportances = a_GBTClassificationModel.featureImportances
print(f"Feature Importances in the best model = {featureImportances}")

paramMap = a_GBTClassificationModel.extractParamMap() 

print("Hyper parameters of the best model", "\n", "*"*50)
for k, v in paramMap.items():
    print(f"{k.name} = {v}")
    

Feature Importances in the best model = (35,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,27,28,29,30,31,32,33,34],[0.21558188665878564,0.020464568465465655,0.025469584623258514,0.019639445945552798,0.017229910980739272,0.020282067494215766,0.01935076023843168,0.0059147919408299625,0.057962037335869486,0.005199492282964488,0.014565248703292727,0.12355143550902146,0.026570095240251636,0.051389356324734334,0.024497676508029285,0.04204847876499084,0.0126373514202659,0.006552639714615697,0.005523651017292248,0.012240217761812212,0.006609498277893386,0.007880995971337726,0.004580372909836257,0.0223304338502896,0.016358348921942226,0.004732483539005413,0.03315981479554837,0.04186724330559259,0.016331308682029172,0.02351920856717982,0.05973962956477063,0.011169024644134952,0.010228233530330907,0.014822706509689329])
Hyper parameters of the best model 
 **************************************************
cacheNodeIds = False
checkpointInterval = 10
featureSubsetStrategy =