# Churn prediction for Sparkify Music Service

# <center>3. Features engineering and modeling </center>

## Table of Contents

3. [Feature Engineering]((#feature_eng))
4. [Model training and evaluation]((#modeling))
5. [Conclusion]((#conclusion))


In the data exploration step we could extract potential indicators that can be used to distinguish between churning and engaged customers. 

We observed that the number of visits to some of the pages could be used as indicators to to know if a customer is likely to churn or not. For example the engaged users were having more interactions on the service platform by visitng more often the `ThumbsUp` or `ThumbsDown` pages. Then we decide to use the following features to reflect the pages visits making difference between both types of users:

- Binary feature with value equal to one if the number `ThumbsUp` page visits is greater than 20
- Number of `ThumbsDown` page visits
- Number of Roll Advert Page visits

We observed that the `service usage` and level of engagemnt of the customer can be also a clear indicator. Which helped us to define the following features:

- Average daily sessions duration
- Average monthly sessions duration
- Average daily Number of songs per session
- Average daily Number of items per session
- Daily number of songs over the last 20 days (vector of 20 values)
- Daily number of sessions over the last 20 days (vector of 20 values)

The decision in keeping the usage information over only the last `20 days` was a result of the check of the percentage of the dataset users that could be kept by number of days the customer have been using sparkify service. See the related analysis and plot in the data exploration part [here](#usage_days)

One more feature that could help in having an idea about the customer satisfaction in using the service is to know whether the customer can find the artists songs he wants to listen to or not.

- Number of unique artists the user listened to 

We also decided to have some features to characterize the user subscription:

- Last level of the user (Paid or Free)
- User Account age in days: usage duration since first log event day


In [1]:
import datetime
from time import time
import pprint

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, LongType, IntegerType, DateType, TimestampType
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

import utils

spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [2]:
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
%load_ext autoreload
%autoreload 2
import utils

In [3]:
pp = pprint.PrettyPrinter(indent=4)

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [5]:
spark

In [6]:
# %store -r events_data_path
events_data_path = "../data/mini_sparkify_event_data.json"
events_df = utils.load_and_clean_data(spark, events_data_path)
events_df = utils.add_churn_column(events_df)

# <center>3. Features engineering <a id='feature_eng'></a></center>

## Service usage over the last 20 days
### Number of songs per day

In [7]:
usage_days_df = utils.get_data_last_ndays(events_df, page_filter="NextSong")
songs_last20days_df = usage_days_df \
    .join(events_df, events_df.userId == usage_days_df.userId) \
    .where(events_df["date"].between(col('20_days_before') + 1, col('last_day'))) \
    .groupby([events_df.userId, events_df.date, col('last_day')]).count() \
    .withColumn("date_index", utils.account_age_in_days(col("last_day"), col("date"))) \
    .withColumn("date_index", col("date_index").cast('int'))

In [8]:
songs_last20days_features = songs_last20days_df.toPandas()\
    .groupby('userId').apply(lambda x: utils.vector_builder(x, ['count', 'date_index'])).reset_index()
songs_last20days_features.rename(
    columns={index: "d_songs_{}".format(index) for index in range(0, 20)},
    inplace=True)

### Number of sessions

In [9]:
usage_days_df = utils.get_data_last_ndays(events_df)
sessions_last20days_df = usage_days_df \
    .join(events_df, events_df.userId == usage_days_df.userId) \
    .where(events_df["date"].between(col('20_days_before') + 1, col('last_day'))) \
    .groupby([events_df.userId,
              events_df.date, "sessionId", col("last_day")
    ]).count().groupby(events_df.date, "userId", col("last_day"))\
    .mean("count") \
    .withColumnRenamed("avg(count)", "avg_sessions")\
    .withColumn("date_index", utils.account_age_in_days(col("last_day"), col("date")).cast('int')) 

In [10]:
sessions_last20days_features = sessions_last20days_df.toPandas()\
    .groupby('userId').apply(
        lambda x: utils.vector_builder(x, ['avg_sessions', 'date_index']))\
    .reset_index()
sessions_last20days_features.rename(
    columns={index: "d_sessions_{}".format(index) for index in range(0, 20)},
    inplace=True)
sessions_last20days_features.head(3)

Unnamed: 0,userId,d_sessions_0,d_sessions_1,d_sessions_2,d_sessions_3,d_sessions_4,d_sessions_5,d_sessions_6,d_sessions_7,d_sessions_8,...,d_sessions_10,d_sessions_11,d_sessions_12,d_sessions_13,d_sessions_14,d_sessions_15,d_sessions_16,d_sessions_17,d_sessions_18,d_sessions_19
0,10,0.0,95.0,0.0,0.0,21.0,57.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,78.0,0.0,0.0
1,100,0.0,91.0,196.0,0.0,0.0,23.0,0.0,0.0,30.0,...,66.0,169.0,109.0,0.0,0.0,36.0,125.0,86.0,7.0,0.0
2,100002,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,75.0


In [11]:
users_ids = sessions_last20days_df.select("userId").distinct().rdd.map(lambda r: r[0]).collect()
print("Number of unique users", len(sessions_last20days_features.userId.unique()))

Number of unique users 190


### keep only users with account age older than 20 days

In [12]:
events_df = events_df[events_df.userId.isin(users_ids)]
nb_users = events_df.select('userId').distinct().count()
nb_users

190

### Number of days since user registration: `registration_days`

In [13]:
registration_df = utils.registration_days(events_df)
assert registration_df.select('userId').distinct().count() == nb_users

In [14]:
pd.DataFrame(registration_df.take(5), columns=registration_df.columns)

Unnamed: 0,userId,registration_days
0,100010,55
1,100010,55
2,100010,55
3,100010,55
4,100010,55


### Average Daily and Monthly session duration per user" `avg_daily_session_duration` and `avg_monthly_session_duration`

In [15]:
sess_duration = utils.session_durations(events_df)

assert sess_duration.count() == nb_users

utils.get_dataframe(sess_duration)

Unnamed: 0,userId,avg_daily_session_duration,avg_monthly_session_duration
0,100010,9269.0,9693.5
1,200002,11924.25,21529.2
2,124,25218.162037,35410.519231
3,7,5258.428571,5003.583333
4,15,24441.5,31866.222222


### Average Daily and Monthly number of items per session" `avg_daily_items` and`avg_monthly_items`

In [16]:
items_averages_df = utils.items_averages(events_df)
assert items_averages_df.count() == nb_users
utils.get_dataframe(items_averages_df)

Unnamed: 0,userId,avg_daily_items,avg_monthly_items
0,100010,54.428571,85.0
1,200002,82.375,150.0
2,124,196.444444,530.0
3,7,28.428571,60.0
4,15,154.611111,367.0


### Number of errors: `nb_errors`

In [17]:
errors_df = events_df.filter(events_df.page=="Error") \
            .groupBy(["userId"]).count() \
            .withColumnRenamed('count', 'nb_errors')\
            .groupBy(["userId", "nb_errors"]).mean().select(["userId", "avg(nb_errors)"])
print(errors_df.count())
utils.get_dataframe(errors_df)

111


Unnamed: 0,userId,avg(nb_errors)
0,124,6.0
1,7,1.0
2,15,2.0
3,54,1.0
4,155,3.0


### Fill missing values with zeros

In [18]:
from pyspark.sql.window import Window as W
errors_df = utils.impute_missing_values(spark, errors_df, 'nb_errors', events_df)
assert errors_df.count() == nb_users
utils.get_dataframe(errors_df)

Unnamed: 0,userId,avg(nb_errors)
0,124,6.0
1,7,1.0
2,15,2.0
3,54,1.0
4,155,3.0


### Gender: `gender`

In [19]:
gender_df = events_df \
    .select("userId", "gender") \
    .dropDuplicates() \
    .replace(['Male', 'Female'], ['0', '1'], 'gender') \
    .select('userId', col('gender').cast('int'))
assert gender_df.count() == nb_users
utils.get_dataframe(gender_df)

Unnamed: 0,userId,gender
0,44,
1,46,
2,41,
3,300023,
4,39,


### Account level" `last_level`

As the user level can change over the time, we need to keep only the last level for the feature value

In [20]:
level_df = events_df.orderBy('ts', ascending=False).groupBy('userId') \
                .agg(F.first('level').alias('last_level')) \
                .replace(['free', 'paid'], ['0', '1'], 'last_level') \
                .select('userId', col('last_level').cast('int'))

assert level_df.count() == nb_users
utils.get_dataframe(level_df)

Unnamed: 0,userId,last_level
0,100010,0
1,200002,1
2,124,1
3,7,0
4,15,1


### Churn label: `churn`

In [21]:
label_df = events_df \
    .select("userId", "churn") \
    .dropDuplicates() \
    .replace(['Churner', 'Engaged'], ['1', '0'], 'churn') \
    .select('userId', col('churn').cast('int'))

assert label_df.count() == nb_users

utils.get_dataframe(label_df)

Unnamed: 0,userId,churn
0,73,1
1,100004,1
2,36,0
3,94,0
4,114,0


### Avreage Number of songs per session: `avg_songs_played`

In [22]:
avg_songs_df = events_df.where('page == "NextSong"') \
    .groupby(['userId', 'sessionId']) \
    .count() \
    .groupby(['userId']) \
    .avg('count') \
    .withColumnRenamed('avg(count)', 'avg_songs_played')
assert avg_songs_df.count() == nb_users
utils.get_dataframe(avg_songs_df)

Unnamed: 0,userId,avg_songs_played
0,100010,39.285714
1,200002,64.5
2,124,145.678571
3,7,21.428571
4,54,81.171429


### Number of artists each user listens to: `nb_artists`

In [23]:
nb_artists_df = events_df \
    .filter(events_df.page=="NextSong") \
    .select("userId", "artist") \
    .dropDuplicates() \
    .groupby("userId") \
    .count() \
    .withColumnRenamed("count", "nb_artists")
assert nb_artists_df.count() == nb_users
utils.get_dataframe(nb_artists_df)

Unnamed: 0,userId,nb_artists
0,100010,252
1,200002,339
2,124,2232
3,7,142
4,15,1302


### Number of pages visits events `num_thumbs_down` and 'num_rolladverts`

In [24]:
thumbs_up_df = events_df.select('userId','page')\
    .filter(events_df.page=="Thumbs Up") \
    .groupBy(["userId"]).count() \
    .withColumn("did_thumbs_up", when(col("count")>200, 1).otherwise(0))\
    .select("userId", "did_thumbs_up")
thumbs_down_df = events_df.filter(events_df.page=="Thumbs Down") \
    .groupBy(["userId"]).count() \
    .withColumnRenamed('count', 'num_thumbs_down')
roll_adverts_df = events_df.filter(events_df.page=="Roll Advert") \
    .groupBy(["userId"]).count() \
    .withColumnRenamed('count', 'num_rolladverts')
print("ThumbsUp", thumbs_up_df.count())
print("ThumbsDown", thumbs_down_df.count())
utils.get_dataframe(thumbs_up_df)

ThumbsUp 189
ThumbsDown 179


Unnamed: 0,userId,did_thumbs_up
0,100010,0
1,200002,0
2,124,0
3,7,0
4,54,0


In [25]:
thumbs_down_df = utils.impute_missing_values(spark, thumbs_down_df, 'num_thumbs_down', events_df)
thumbs_up_df = utils.impute_missing_values(spark, thumbs_up_df, 'num_thumbs_up', events_df)
roll_adverts_df = utils.impute_missing_values(spark, roll_adverts_df, 'num_rolladverts', events_df)

assert thumbs_up_df.count() == nb_users
assert thumbs_down_df.count() == nb_users

## Create features vectors by joining all extracted features

In [26]:
from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

songs_last20days_features = sqlContext.createDataFrame(songs_last20days_features)
sessions_last20days_features = sqlContext.createDataFrame(sessions_last20days_features)

In [27]:
all_aggs_df = thumbs_up_df \
    .join(thumbs_down_df, on='userId')\
    .join(avg_songs_df, on='userId') \
    .join(level_df, on='userId') \
    .join(errors_df, on='userId') \
    .join(sess_duration, on='userId') \
    .join(items_averages_df, on='userId') \
    .join(registration_df, on='userId') \
    .join(nb_artists_df, on='userId') \
    .join(label_df, on='userId') \
    .join(songs_last20days_features, on='userId') \
    .join(roll_adverts_df, on='userId') \
    .join(sessions_last20days_features, on='userId')

features_df = all_aggs_df.drop_duplicates()
pd.DataFrame(features_df.take(5), columns=features_df.columns).head()

Unnamed: 0,userId,did_thumbs_up,num_thumbs_down,avg_songs_played,last_level,avg(nb_errors),avg_daily_session_duration,avg_monthly_session_duration,avg_daily_items,avg_monthly_items,...,d_sessions_10,d_sessions_11,d_sessions_12,d_sessions_13,d_sessions_14,d_sessions_15,d_sessions_16,d_sessions_17,d_sessions_18,d_sessions_19
0,100010,0,5,39.285714,0,0.0,9269.0,9693.5,54.428571,85.0,...,0.0,0.0,0.0,112.0,0.0,0.0,0.0,72.0,0.0,0.0
1,200002,0,6,64.5,1,0.0,11924.25,21529.2,82.375,150.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,124,0,41,145.678571,1,6.0,25218.162037,35410.519231,196.444444,530.0,...,0.0,56.0,152.0,0.0,0.0,13.0,162.0,0.0,58.5,89.0
3,7,0,1,21.428571,0,1.0,5258.428571,5003.583333,28.428571,60.0,...,0.0,0.0,0.0,0.0,0.0,29.0,0.0,0.0,0.0,0.0
4,15,0,14,136.714286,1,2.0,24441.5,31866.222222,154.611111,367.0,...,0.0,85.5,38.0,18.0,0.0,0.0,0.0,0.0,0.0,0.0


In [28]:
cols = ['count', 'date_index', 'last_day', 'date']
features_df.drop(*cols);

In [29]:
len(features_df.columns)

54

In [30]:
features_df.count()

189

In [31]:
features_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- did_thumbs_up: long (nullable = true)
 |-- num_thumbs_down: long (nullable = true)
 |-- avg_songs_played: double (nullable = true)
 |-- last_level: integer (nullable = true)
 |-- avg(nb_errors): double (nullable = true)
 |-- avg_daily_session_duration: double (nullable = true)
 |-- avg_monthly_session_duration: double (nullable = true)
 |-- avg_daily_items: double (nullable = true)
 |-- avg_monthly_items: double (nullable = true)
 |-- registration_days: integer (nullable = true)
 |-- nb_artists: long (nullable = false)
 |-- churn: integer (nullable = true)
 |-- d_songs_0: double (nullable = true)
 |-- d_songs_1: double (nullable = true)
 |-- d_songs_2: double (nullable = true)
 |-- d_songs_3: double (nullable = true)
 |-- d_songs_4: double (nullable = true)
 |-- d_songs_5: double (nullable = true)
 |-- d_songs_6: double (nullable = true)
 |-- d_songs_7: double (nullable = true)
 |-- d_songs_8: double (nullable = true)
 |-- d_songs_9: doub

## Vectorizing the features

In [32]:
from pyspark.ml.feature import VectorAssembler

features = [col for col in features_df.columns if col not in ('userId','churn')]

# Vectorizing the features
assembler = VectorAssembler(inputCols=features, outputCol='features')
modelvec_df = assembler.transform(features_df)

modelvec_df = modelvec_df.select(col('features'), col('churn').alias('label'))

In [33]:
type(modelvec_df)

pyspark.sql.dataframe.DataFrame

## <center> 4. Model training and evaluation<a id='modeling'></a></center>
![EDA](./images/model_workflow.png)

Tree-based algorithms are not sensitive to the scale of the features. While Logistic Regression performs poorly when features differ widely in scale. As we are going to try Random Forest and Gradient Based Trees algorithms we would not need to scale the data.

In [34]:
from pyspark.ml.feature import StandardScaler

# Scaling to mean 0 and unit std dev
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)
model_scaler = scaler.fit(modelvec_df)

modelvec_scaled = model_scaler.transform(modelvec_df)

model_data = modelvec_scaled.select(col('scaled_features').alias('features'), col('label'))
model_data.count()

189

### Split the dataset into train and test sets

In [35]:
# Usually the data should be splitted in train, test and validation datasets 
#   but due to small amount of data here
#   we will be using only train and validation data
# 80% train and 20% validation set
train_df, validation_df = model_data.randomSplit([0.8, 0.2], seed=42)
print("Training set size: ", train_df.count())
print("Validation set size: ", validation_df.count())

Training set size:  157
Validation set size:  32


## Models training and evaluation
We try out various models to see how they compare and perform. Then select the winning model based on the F1 score.

We also perform hyperparameters tuning using `ParamGridBuilder`

Given churned users are a fairly small subset, we decided to use F1 Score and accuracy metric to evalute the model performance.

> `F1 score`: balances the tradeoff between precision and recall.
\begin{equation*}
2 * \frac{precision * recall}{precision + recall}
\end{equation*}
    <br>
`The area under the ROC curve (AUC)`: It assesses overall classification performance without placing more emphasis on one class over the other. So it does not reflect the minority class well. It is also scale-invariant. It measures how well predictions are ranked, rather than their absolute values.
<br>


# Logisitc Regression

In [65]:
%%time
results = {}
lr_model = LogisticRegression(
    featuresCol = "features",
    labelCol = "label")
# Create a parameter grid for tuning the model
paramGrid = ParamGridBuilder() \
            .addGrid(lr_model.elasticNetParam, [0.1, 0.5]) \
            .addGrid(lr_model.maxIter, [20, 70]) \
            .build()
f1_evaluator = MulticlassClassificationEvaluator(metricName="f1")
# 5-fold cross validation
crossval_lr = CrossValidator(estimator=lr_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=3)

start = time()
cv_model_lr = crossval_lr.fit(train_df)
end = time()
print("The training process took {} seconds".format(end - start))

The training process took 1572.9458692073822 seconds
CPU times: user 43.9 s, sys: 22.4 s, total: 1min 6s
Wall time: 26min 13s


In [67]:
train_dftModel = cv_model_lr.bestModel
# Save the best model
cv_model_lr.bestModel.save("models/logistic_regression_model")

In [68]:
utils.get_gridsearch_resuts(cv_model_lr)

Unnamed: 0,elasticNetParam,maxIter,AUC score
0,0.1,20,0.612811
1,0.1,70,0.578558
2,0.5,20,0.612811
3,0.5,70,0.578558


In [72]:
%time
start = time()
best_lr =  LogisticRegression(maxIter=20, elasticNetParam=0.1)
end = time()
lr_model = best_lr.fit(train_df)

CPU times: user 8 µs, sys: 4 µs, total: 12 µs
Wall time: 22.9 µs


In [83]:
results = {}
results = utils.evaluate_model(lr_model, start, end,
                               validation_df, "Logistic Regression", results)

Logistic Regression Evaluation:
AUC: 0.6073
F1-Score: 0.625


# Random Forest

In [74]:
%%time
rf_model = RandomForestClassifier(
        featuresCol = "features",
        labelCol = "label",
        maxMemoryInMB = 1000,
        seed = 42)
# Create a parameter grid for tuning the model
paramGrid = ParamGridBuilder() \
        .addGrid(rf_model.maxDepth, [4, 5, 7]) \
        .addGrid(rf_model.numTrees, [20, 50]) \
        .build()

f1_evaluator = MulticlassClassificationEvaluator(metricName="f1")
crossval_rf = CrossValidator(estimator=rf_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=3)

start = time()
cv_model_rf = crossval_rf.fit(train_df)
end = time()
print("The training process took {} seconds".format(end - start))
# Save the best model
cv_model_rf.bestModel.save("models/random_forest_model");

The training process took 374.3420789241791 seconds
CPU times: user 8.88 s, sys: 4.76 s, total: 13.6 s
Wall time: 6min 22s


In [75]:
results = utils.evaluate_model(cv_model_rf, start, end,
                               validation_df, "Random Forest", results)

Logistic Regression Evaluation:
AUC: 0.587
F1-Score: 0.569


In [76]:
utils.get_gridsearch_resuts(cv_model_rf)

Unnamed: 0,maxDepth,numTrees,AUC score
0,4,20,0.663902
1,4,50,0.649645
2,5,20,0.6633
3,5,50,0.640477
4,7,20,0.659962
5,7,50,0.669953


In [58]:
# Retrain the classifier with the best performing parameters
best_rf =  RandomForestClassifier(maxDepth=7, numTrees=50)
start = time()
model_rf = best_rf.fit(train_df)
end = time()

In [59]:
results = utils.evaluate_model(model_rf, start, end,
                               validation_df, "Random Forest", results)

Random Forest Evaluation:
AUC: 0.6235
F1-Score: 0.5505


# Gradient Boosted Tree

In [36]:
%%time
gbt = GBTClassifier(seed=42,
                    featuresCol = "features",
                    labelCol = 'label')
paramGrid = ParamGridBuilder() \
        .addGrid(gbt.maxDepth, [3, 5]) \
        .addGrid(gbt.maxIter, [20, 70]) \
        .build()
f1_evaluator = MulticlassClassificationEvaluator(metricName='f1')
crossval_gbt = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=3)

start = time()
cv_model_gbt = crossval_gbt.fit(train_df)
end = time()
print('The training process took {} seconds'.format(end - start))

The training process took 2950.441709280014 seconds
CPU times: user 1min 6s, sys: 39.2 s, total: 1min 45s
Wall time: 49min 10s


In [42]:
utils.get_gridsearch_resuts(cv_model_gbt)

Unnamed: 0,maxDepth,maxIter,AUC score
0,3,20,0.684958
1,3,70,0.706143
2,5,20,0.658231
3,5,70,0.668953


In [45]:
# Retrain the classifier with the best performing parameters
best_gbt =  GBTClassifier(maxIter=70, maxDepth=3)
start = time()
model_gbt = best_gbt.fit(train_df)
end = time()

In [54]:
results = utils.evaluate_model(model_gbt, start, end,
                               validation_df, "Gradient-Boosted Trees", results)

Gradient-Boosted Trees Evaluation:
AUC: 0.6397
F1-Score: 0.5908


## Models evaluation results

In [60]:
results_df = pd.DataFrame(results)
results_df

Unnamed: 0,Random Forest,Logistic Regression,Gradient-Boosted Trees
F1-Score,0.5505,0.569,0.5908
AUC,0.6235,0.587,0.6397
Training Time,61.489262,1573.2,452.706035


# Conclusion

Let’s take a step back and look at the whole journey.

We wanted to predict customers churn for a hypothetical music streaming service. That using Apache Spark in all the Machine Learning workflow steps. For that we needed to have a binary classifier for the `Churner` and `Engaged` customers.

For that I performed the `data cleaning` to remove log events without a user Id and checked the missing vakues in the dataset. We then did multiple `data explorations` to see how various indicators can help in distinguishing between `Churned` and `Engaged` customers. Then,I defined the customer churn indicator based on wether the user visited the any of the pages `Cancellation Confirmation` and `Downgrade Submission` or not. Next in the features engineering step I extracted categorical and numerical features. For that I used the observed indicators during the data exploration. I also explored the last 20 days of service usage to represent the behaviour of the user before the churn event based on the number of sessions and the number of songs each day.
We split the data into training and validation data sets. And as a final step I performed model training by trying out various models varying from simple to complex ones: Logistic Regression, Random Forest and Gradient-Boosted Trees. I leveraged cross validation and grid search to fine tune the different models. Their `performance` got compared using the `AUC` metric.

Gradient-Boosted Trees turned to be the winning model. We achieved about `0.64` AUC, and `0.59` F1 Score. Potentially with the whole dataset, the data exploration observation and features engineering will be more informative and stable. The model might also be enhanced.

### Potential Improvements

We Could try other models algorithms. But before that we would like to do more substantial data exploration and features engineering to have a more accurate model in detecting whether a user is likely to churn or not. For that we would:

- Add more temporal features reflecting the service usage over the last N days.
- Optimize the data analysis and feature engineering steps applying more Spark best practices for having efficient data exploration as well as model training and testing processes.
- Perform data exploration on bigger batches of data subsets before using the big dataset due to the substential statistical differences with the big dataset.
- With a higher computations power, performing a better Hyperparameter tuning for other model algorithms on Spark Cluster.