# SPARKIFY PROJECT - MODELING
**This notebook explores a tiny subset (128MB)** of the full dataset available (12GB).  
Both can be retrieved here:
* 128MB subset: [s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json](s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json)
* full 12GB dataset: [s3n://udacity-dsnd/sparkify/sparkify_event_data.json](s3n://udacity-dsnd/sparkify/sparkify_event_data.json)

After the ***Data Understanding*** phase done in this [notebook](1_Sparkify_Data_Understanding.ipynb) and the ***Data Exploration*** phase done in this [notebook](2_Sparkify_Data_Exploration.ipynb) and the , we are now entering the ***Modeling phase***.

## Import libraries, init Spark and load dataset

In [45]:
import pyspark
from pyspark.sql import SparkSession, Window

from pyspark.sql.functions import udf, desc, isnan, when, count, col, lit
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import mean as Fmean
from pyspark.sql.types import IntegerType, FloatType

import numpy as np
import pandas as pd

from datetime import datetime
from datetime import timedelta
import re

# MODELING imports
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
# It is useful to know the version we are using when reading the pyspark documentations
pyspark.__version__

'2.4.3'

In [3]:
# Create or retrieve a Spark session
spark = SparkSession.builder.appName("dsnd-p7-sparkify").getOrCreate()

In [4]:
df = spark.read.json("mini_sparkify_event_data.json")
df.show(3)

+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+-----------------+------+-------------+--------------------+------+
|          artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|             song|status|           ts|           userAgent|userId|
+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+-----------------+------+-------------+--------------------+------+
|  Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|NextSong|1538173362000|       29|        Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|NextSong|1538331630000|        8|   

In [5]:
print("Loaded pyspark dataframe has shape ({}, {})".format(df.count(), len(df.columns)))

Loaded pyspark dataframe has shape (286500, 18)


# CLEAN: remove empty users and build the `churn` target feature
We copy the code from the previous notebook:

In [6]:
df_clean_users = df.filter((~(isnan(df['userId']))) & (df['userId'].isNotNull()) & (df['userId'] != ""))
print("Cleaned pyspark dataframe has now shape ({}, {})".format(df_clean_users.count(), len(df_clean_users.columns)))

# Define the UDF (do not forget to precise type of column otherwise String is taken by default)
user_has_churned = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

# Apply this function on a specific column of the whole dataset
# (made with the help of: https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
# and https://docs.databricks.com/spark/latest/spark-sql/udf-python.html)
df_users_with_churn = df_clean_users.withColumn("churn", user_has_churned("page"))

print("Pyspark dataframe has now shape ({}, {})".format(df_users_with_churn.count(), len(df_users_with_churn.columns)))

df_users_with_churn_full = df_users_with_churn.withColumn("churn", Fmax('churn').over(Window.partitionBy("userId")))

# Check that we are still talking about our 52 users
df_users_with_churn_full.filter(df_users_with_churn_full['churn'] == 1).select('userId').dropDuplicates().count()

Cleaned pyspark dataframe has now shape (278154, 18)
Pyspark dataframe has now shape (278154, 19)


52

---
# 1. FEATURE ENGINEERING
In the previous notebook, the conclusion after all observations and plots was to build some new features based on those observations as they could help the model to detect churn. As a reminder it was:
* Transform as binary 0/1 the `level` of subscription (paid or not)
* we can dummy the `gender` (binary 0/1 as well)
* `registration` time for the user
* `engagement` of the user with the number of artists, songs or even the total length of music listened, add to playlist number
* `social interactions` with likes/dislikes, friends, etc
* `upgrade/downgrade` the subscription level
* `user operating system` which could help us to identify users of a version that does not give entire satisfaction
* `errors_encountered` which could help us to identify users who had several issues and then maybe quit

## 1.1. Keep only useful columns

In [7]:
df_filtered = df_users_with_churn_full.select(['artist', 'gender', 'length', 'level', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId', 'churn'])
print("Filtered dataframe has shape ({}, {})".format(df_filtered.count(), len(df_filtered.columns)))

Filtered dataframe has shape (278154, 13)


## 1.2. Transform `level` and `gender` into binary 0/1

In [8]:
# Define the UDF (do not forget to precise type of column otherwise String is taken by default)
to_dummy_level = udf(lambda x: 1 if x == "paid" else 0, IntegerType())
to_dummy_gender = udf(lambda x: 1 if x == "M" else 0, IntegerType())

df_filtered = df_filtered.withColumn("level", to_dummy_level("level"))
df_filtered = df_filtered.withColumn("gender", to_dummy_level("gender"))
df_filtered.show(5)

+--------------------+------+---------+-----+---------+-------------+---------+--------------------+------+-------------+--------------------+------+-----+
|              artist|gender|   length|level|     page| registration|sessionId|                song|status|           ts|           userAgent|userId|churn|
+--------------------+------+---------+-----+---------+-------------+---------+--------------------+------+-------------+--------------------+------+-----+
|Sleeping With Sirens|     0|202.97098|    0| NextSong|1538016340000|       31|Captain Tyin Knot...|   200|1539003534000|"Mozilla/5.0 (iPh...|100010|    0|
|Francesca Battist...|     0|196.54485|    0| NextSong|1538016340000|       31|Beautiful_ Beauti...|   200|1539003736000|"Mozilla/5.0 (iPh...|100010|    0|
|              Brutha|     0|263.13098|    0| NextSong|1538016340000|       31|          She's Gone|   200|1539003932000|"Mozilla/5.0 (iPh...|100010|    0|
|                null|     0|     null|    0|Thumbs Up|153801634

In [9]:
df_lvl = df_filtered.select('userId', 'level').groupby('userId').agg({'level': 'max'}).withColumnRenamed('max(level)', 'level')

## 1.3. Count number of days the service has been used by users

In [10]:
# Define the UDF (do not forget to precise type of column otherwise String is taken by default)
to_delta_in_days = udf(lambda x: timedelta(seconds=x).days, IntegerType())

# Build a time diff column
df_time_delta = df_filtered.select('userId', 'registration', 'ts').withColumn('timedelta', (df_filtered.ts - df_filtered.registration)/1000)
df_time_delta = df_time_delta.withColumn("timedelta", to_delta_in_days("timedelta"))

# Keep only the max per user
df_time_delta = df_time_delta.select('userId', 'timedelta').groupBy('userId').agg({'timedelta': 'max'}).withColumnRenamed('max(timedelta)', 'timedelta')
df_time_delta.show(5)

+------+---------+
|userId|timedelta|
+------+---------+
|100010|       55|
|200002|       70|
|   125|       71|
|    51|       19|
|   124|      131|
+------+---------+
only showing top 5 rows



## 1.4. Measure engagement of the user with the number of artists/songs, total length, etc.

In [11]:
def count_nb_artist_songs(df, col_name, count_unique):
    """
    Count the number of artists or songs listened per user (the count is distinct if count_unique is True)
    """
    innerdf = df.filter(df_filtered[col_name] != 'null').select('userId', col_name)
    if count_unique:
        innerdf = innerdf.dropDuplicates()
    return innerdf.groupBy('userId').count().withColumnRenamed('count', 'nb_{}_{}s'.format('unique' if count_unique else 'total', col_name))

In [12]:
def count_nb_page(df, page_value, total_col_name):
    """
    Build a new dataframe filtered on a certain page value and count the number of times each user has seen this page
    """
    return df.filter(df['page'] == page_value).select(['page', 'userId']).groupBy('userId').agg({'page': 'count'}).withColumnRenamed('count(page)', total_col_name)

In [13]:
df_users_unique_songs = count_nb_artist_songs(df_filtered, 'song', True)
df_users_total_songs = count_nb_artist_songs(df_filtered, 'song', False)
df_users_unique_artists = count_nb_artist_songs(df_filtered, 'artist', True)
df_users_total_artists = count_nb_artist_songs(df_filtered, 'artist', False)

df_total_length = df_filtered.select('userId', 'length').groupBy('userId').agg({'length': 'sum'}).withColumnRenamed('sum(length)', 'total_length')

In [14]:
df_add_playlist = count_nb_page(df_filtered, 'Add to Playlist', 'total_add_playlist')

## 1.5. Measure social interactions with likes/dislikes, add friend, etc.

In [15]:
df_add_friend = count_nb_page(df_filtered, 'Add Friend', 'total_add_friend')
df_thumbs_up = count_nb_page(df_filtered, 'Thumbs Up', 'total_thumbs_up')
df_thumbs_down = count_nb_page(df_filtered, 'Thumbs Down', 'total_thumbs_down')
df_rolling_ads = count_nb_page(df_filtered, 'Roll Advert', 'total_ads')

## 1.6. `Upgrade/Downgrade`

In [16]:
df_view_upgrade = count_nb_page(df_filtered, 'Upgrade', 'think_upgrade')
df_count_upgrade = count_nb_page(df_filtered, 'Submit Upgrade', 'has_upgraded')
df_view_downgrade = count_nb_page(df_filtered, 'Downgrade', 'think_downgrade')
df_count_downgrade = count_nb_page(df_filtered, 'Submit Downgrade', 'has_downgraded')

## 1.7. Extract Operating System information

In [17]:
# Define the regex (regex101 is your friend to validate it works!)
def extract_systeminfo(txt):
    matches = re.match(".*Mozilla/[0-9.]+\s\(([a-zA-Z0-9\s.]+)(;|\))", txt)
    if matches:
        return matches.group(1)
    else:
        return ""

to_os = udf(lambda x: extract_systeminfo(x))

In [18]:
df_os = df_filtered.select('userId', 'userAgent').withColumn("os", to_os("userAgent"))
df_os.show(10)

+------+--------------------+--------------+
|userId|           userAgent|            os|
+------+--------------------+--------------+
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
|     9|"Mozilla/5.0 (Win...|Windows NT 6.1|
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
|     9|"Mozilla/5.0 (Win...|Windows NT 6.1|
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
|     9|"Mozilla/5.0 (Win...|Windows NT 6.1|
|     9|"Mozilla/5.0 (Win...|Windows NT 6.1|
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
|    30|Mozilla/5.0 (Wind...|Windows NT 6.1|
+------+--------------------+--------------+
only showing top 10 rows



In [19]:
def rename_os_col(val_to_replace, replaced_value):
    """
    Rename a value within the 'os' column by another one
    """
    return df_os.withColumn("os", when(df_os.os == val_to_replace, lit(replaced_value)).otherwise(df_os.os))

In [20]:
df_os = rename_os_col('compatible', 'Windows NT 6.1')
df_os = rename_os_col('X11', 'Linux')
df_os = rename_os_col('Windows NT 5.1', 'Windows XP')
df_os = rename_os_col('Windows NT 6.0', 'Windows Vista')
df_os = rename_os_col('Windows NT 6.1', 'Windows Seven')
df_os = rename_os_col('Windows NT 6.2', 'Windows 8')
df_os = rename_os_col('Windows NT 6.3', 'Windows 81')
df_os.show(10)

+------+--------------------+-------------+
|userId|           userAgent|           os|
+------+--------------------+-------------+
|    30|Mozilla/5.0 (Wind...|Windows Seven|
|     9|"Mozilla/5.0 (Win...|Windows Seven|
|    30|Mozilla/5.0 (Wind...|Windows Seven|
|     9|"Mozilla/5.0 (Win...|Windows Seven|
|    30|Mozilla/5.0 (Wind...|Windows Seven|
|     9|"Mozilla/5.0 (Win...|Windows Seven|
|     9|"Mozilla/5.0 (Win...|Windows Seven|
|    30|Mozilla/5.0 (Wind...|Windows Seven|
|    30|Mozilla/5.0 (Wind...|Windows Seven|
|    30|Mozilla/5.0 (Wind...|Windows Seven|
+------+--------------------+-------------+
only showing top 10 rows



In [21]:
# How to build dummies? Found something on SO: https://stackoverflow.com/questions/46528207/dummy-encoding-using-pyspark
df_os_tmp = df_os.groupby('userId').agg({'os': 'max'}).withColumnRenamed('max(os)', 'os')
os_list = df_os_tmp.select('os').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [when(col('os') == os, 1).otherwise(0).alias(str(os)) for os in os_list]
df_os_tmp = df_os_tmp.select(exprs + df_os_tmp.columns)

## 1.8. Count number of errors per user

In [22]:
df_errors = count_nb_page(df_filtered, 'Error', 'nb_404')

## 1.9. Merge everything into a single final dataframe

After a series of different issues I have decided to transform each subset into pandas before doing the join that I hope will be faster.  
The main issue was about some timeouts due to computation time. I have read few things on Stackoverflow and changed some configuration parameters (for example [this](https://stackoverflow.com/questions/43984068/does-spark-sql-autobroadcastjointhreshold-work-for-joins-using-datasets-join-op)) and set:  
```spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)```

But anyway it still did not work, maybe due to the power of the machine it runs on or because I do something wrong as I am new with Spark. Maybe the too many select/filters and columns manipulations ends with something too big to be played on this kind of machine (the explain plan was very impressive so I guess this is the root cause).

Anyway, as we know that we have only few users, it will fit within classic pandas DataFrame. So let's go!

In [23]:
%%time
# Save as pandas for the join later (will be much much mush faster...)
df_pd_users = df_filtered.select('userId', 'churn', 'gender').dropDuplicates().toPandas()
df_pd_lvl = df_lvl.toPandas()
df_pd_time_delta = df_time_delta.toPandas()
df_pd_users_unique_songs = df_users_unique_songs.toPandas()
df_pd_users_total_songs = df_users_total_songs.toPandas()
df_pd_users_unique_artists = df_users_unique_artists.toPandas()
df_pd_total_length = df_total_length.toPandas()
df_pd_add_playlist = df_add_playlist.toPandas()
df_pd_add_friend = df_add_friend.toPandas()
df_pd_thumbs_up = df_thumbs_up.toPandas()
df_pd_thumbs_down = df_thumbs_down.toPandas()
df_pd_rolling_ads = df_rolling_ads.toPandas()
df_pd_view_upgrade = df_view_upgrade.toPandas()
df_pd_count_upgrade = df_count_upgrade.toPandas()
df_pd_view_downgrade = df_view_downgrade.toPandas()
df_pd_count_downgrade = df_count_downgrade.toPandas()
df_pd_errors = df_errors.toPandas()
df_pd_os = df_os_tmp.toPandas()

CPU times: user 286 ms, sys: 61.1 ms, total: 347 ms
Wall time: 3min 45s


In [34]:
df_list = [df_pd_users, df_pd_lvl, df_pd_time_delta, df_pd_users_unique_songs, df_pd_users_total_songs, 
           df_pd_users_unique_artists, df_pd_total_length, df_pd_add_playlist, 
           df_pd_add_friend, df_pd_thumbs_up, df_pd_thumbs_down, df_pd_rolling_ads, df_pd_view_upgrade, 
           df_pd_count_upgrade, df_pd_view_downgrade, df_pd_count_downgrade, df_pd_errors, df_pd_os]
for a_df in df_list:
    print(a_df.shape)

(225, 3)
(225, 2)
(225, 2)
(225, 2)
(225, 2)
(225, 2)
(225, 2)
(215, 2)
(206, 2)
(220, 2)
(203, 2)
(207, 2)
(168, 2)
(131, 2)
(154, 2)
(49, 2)
(117, 2)
(225, 11)


In [41]:
df_pd_final = df_pd_users
for a_df in df_list[1:]:
    df_pd_final = df_pd_final.merge(a_df, on='userId', how='left')
# In the end remove the userId that is now useless and fill potential NaN with 0's
df_pd_final = df_pd_final.drop(['userId', 'os'], axis=1)
df_pd_final = df_pd_final.fillna(0)

In [42]:
df_pd_final.shape

(225, 27)

In [43]:
df_pd_final.head(10)

Unnamed: 0,churn,gender,level,timedelta,nb_unique_songs,nb_total_songs,nb_unique_artists,total_length,total_add_playlist,total_add_friend,...,nb_404,Windows 8,iPad,iPhone,Macintosh,Linux,Windows Vista,Windows 81,Windows XP,Windows Seven
0,0,0,1,69,626,667,545,165790.33923,12.0,14.0,...,0.0,0,0,0,0,0,0,0,0,1
1,0,0,1,100,542,583,461,144836.59439,26.0,11.0,...,1.0,0,0,0,0,0,0,0,0,1
2,0,0,0,100,29,29,29,7079.69297,0.0,7.0,...,0.0,0,0,0,1,0,0,0,0,0
3,0,0,1,95,1539,1728,1163,429447.28834,53.0,28.0,...,2.0,0,0,0,1,0,0,0,0,0
4,1,0,0,48,349,367,317,93505.00914,11.0,15.0,...,0.0,0,0,1,0,0,0,0,0,0
5,0,0,1,138,1012,1098,824,273173.73246,37.0,14.0,...,0.0,0,0,0,1,0,0,0,0,0
6,1,0,1,60,2562,3028,1804,754517.56257,89.0,47.0,...,0.0,0,0,0,1,0,0,0,0,0
7,0,0,1,125,980,1045,806,260526.52036,38.0,24.0,...,0.0,0,0,0,0,0,0,0,0,1
8,1,0,1,75,1076,1169,861,289959.35898,32.0,10.0,...,0.0,0,0,0,1,0,0,0,0,0
9,0,0,1,60,2300,2676,1672,664572.01781,77.0,40.0,...,3.0,0,0,0,0,0,0,0,0,1


In [44]:
# Save locally so that it can be reused later
df_pd_final.to_csv('df_final_subset128.csv', header=1, index=False)

In [46]:
# Sanity check
pd.read_csv('df_final_subset128.csv', header=0).head(10)

Unnamed: 0,churn,gender,level,timedelta,nb_unique_songs,nb_total_songs,nb_unique_artists,total_length,total_add_playlist,total_add_friend,...,nb_404,Windows 8,iPad,iPhone,Macintosh,Linux,Windows Vista,Windows 81,Windows XP,Windows Seven
0,0,0,1,69,626,667,545,165790.33923,12.0,14.0,...,0.0,0,0,0,0,0,0,0,0,1
1,0,0,1,100,542,583,461,144836.59439,26.0,11.0,...,1.0,0,0,0,0,0,0,0,0,1
2,0,0,0,100,29,29,29,7079.69297,0.0,7.0,...,0.0,0,0,0,1,0,0,0,0,0
3,0,0,1,95,1539,1728,1163,429447.28834,53.0,28.0,...,2.0,0,0,0,1,0,0,0,0,0
4,1,0,0,48,349,367,317,93505.00914,11.0,15.0,...,0.0,0,0,1,0,0,0,0,0,0
5,0,0,1,138,1012,1098,824,273173.73246,37.0,14.0,...,0.0,0,0,0,1,0,0,0,0,0
6,1,0,1,60,2562,3028,1804,754517.56257,89.0,47.0,...,0.0,0,0,0,1,0,0,0,0,0
7,0,0,1,125,980,1045,806,260526.52036,38.0,24.0,...,0.0,0,0,0,0,0,0,0,0,1
8,1,0,1,75,1076,1169,861,289959.35898,32.0,10.0,...,0.0,0,0,0,1,0,0,0,0,0
9,0,0,1,60,2300,2676,1672,664572.01781,77.0,40.0,...,3.0,0,0,0,0,0,0,0,0,1


---
# 2. MODELING
To be able to start from here, the result of the `Feature Engineering` phase has been saved into a CSV file. So let's start by loading it:

In [4]:
df = spark.read.csv("df_final_subset128.csv", header=True)
df.show(3)

+-----+------+-----+---------+---------------+--------------+-----------------+------------------+------------------+----------------+---------------+-----------------+---------+-------------+------------+---------------+--------------+------+---------+----+------+---------+-----+-------------+----------+----------+-------------+
|churn|gender|level|timedelta|nb_unique_songs|nb_total_songs|nb_unique_artists|      total_length|total_add_playlist|total_add_friend|total_thumbs_up|total_thumbs_down|total_ads|think_upgrade|has_upgraded|think_downgrade|has_downgraded|nb_404|Windows 8|iPad|iPhone|Macintosh|Linux|Windows Vista|Windows 81|Windows XP|Windows Seven|
+-----+------+-----+---------+---------------+--------------+-----------------+------------------+------------------+----------------+---------------+-----------------+---------+-------------+------------+---------------+--------------+------+---------+----+------+---------+-----+-------------+----------+----------+-------------+
|   

In [16]:
df.printSchema()

root
 |-- churn: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- timedelta: string (nullable = true)
 |-- nb_unique_songs: string (nullable = true)
 |-- nb_total_songs: string (nullable = true)
 |-- nb_unique_artists: string (nullable = true)
 |-- total_length: string (nullable = true)
 |-- total_add_playlist: string (nullable = true)
 |-- total_add_friend: string (nullable = true)
 |-- total_thumbs_up: string (nullable = true)
 |-- total_thumbs_down: string (nullable = true)
 |-- total_ads: string (nullable = true)
 |-- think_upgrade: string (nullable = true)
 |-- has_upgraded: string (nullable = true)
 |-- think_downgrade: string (nullable = true)
 |-- has_downgraded: string (nullable = true)
 |-- nb_404: string (nullable = true)
 |-- Windows 8: string (nullable = true)
 |-- iPad: string (nullable = true)
 |-- iPhone: string (nullable = true)
 |-- Macintosh: string (nullable = true)
 |-- Linux: string (nullable = true)
 |-- Win

All columns are read as string, this will be an issue for the modeling part. Columns are casted to their appropriate types:

In [19]:
df = df.withColumn("churn", df["churn"].cast("int"))
df = df.withColumn("gender", df["gender"].cast("int"))
df = df.withColumn("level", df["level"].cast("int"))
df = df.withColumn("timedelta", df["timedelta"].cast("int"))
df = df.withColumn("nb_unique_songs", df["nb_unique_songs"].cast("int"))
df = df.withColumn("nb_total_songs", df["nb_total_songs"].cast("int"))
df = df.withColumn("nb_unique_artists", df["nb_unique_artists"].cast("int"))
df = df.withColumn("total_length", df["total_length"].cast("double"))
df = df.withColumn("total_add_playlist", df["total_add_playlist"].cast("int"))
df = df.withColumn("total_add_friend", df["total_add_friend"].cast("int"))
df = df.withColumn("total_thumbs_up", df["total_thumbs_up"].cast("int"))
df = df.withColumn("total_thumbs_down", df["total_thumbs_down"].cast("int"))
df = df.withColumn("total_ads", df["total_ads"].cast("int"))
df = df.withColumn("think_upgrade", df["think_upgrade"].cast("int"))
df = df.withColumn("has_upgraded", df["has_upgraded"].cast("int"))
df = df.withColumn("think_downgrade", df["think_downgrade"].cast("int"))
df = df.withColumn("has_downgraded", df["has_downgraded"].cast("int"))
df = df.withColumn("nb_404", df["nb_404"].cast("int"))
df = df.withColumn("Windows 8", df["Windows 8"].cast("int"))
df = df.withColumn("Windows 81", df["Windows 81"].cast("int"))
df = df.withColumn("iPad", df["iPad"].cast("int"))
df = df.withColumn("iPhone", df["iPhone"].cast("int"))
df = df.withColumn("Macintosh", df["Macintosh"].cast("int"))
df = df.withColumn("Linux", df["Linux"].cast("int"))
df = df.withColumn("Windows Vista", df["Windows Vista"].cast("int"))
df = df.withColumn("Windows XP", df["Windows XP"].cast("int"))
df = df.withColumn("Windows Seven", df["Windows Seven"].cast("int"))

In [20]:
df.printSchema()

root
 |-- churn: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- level: integer (nullable = true)
 |-- timedelta: integer (nullable = true)
 |-- nb_unique_songs: integer (nullable = true)
 |-- nb_total_songs: integer (nullable = true)
 |-- nb_unique_artists: integer (nullable = true)
 |-- total_length: double (nullable = true)
 |-- total_add_playlist: integer (nullable = true)
 |-- total_add_friend: integer (nullable = true)
 |-- total_thumbs_up: integer (nullable = true)
 |-- total_thumbs_down: integer (nullable = true)
 |-- total_ads: integer (nullable = true)
 |-- think_upgrade: integer (nullable = true)
 |-- has_upgraded: integer (nullable = true)
 |-- think_downgrade: integer (nullable = true)
 |-- has_downgraded: integer (nullable = true)
 |-- nb_404: integer (nullable = true)
 |-- Windows 8: integer (nullable = true)
 |-- iPad: integer (nullable = true)
 |-- iPhone: integer (nullable = true)
 |-- Macintosh: integer (nullable = true)
 |-- Linux: integer (nul

And now starts the most exciting part: **modeling**!. In the next section we will:
* split our dataset into train and test subsets
* build different models to compare on a specific performance metric.
* best model will then be chosen for further tuning to see if we can improve this performance metric.  

Before doing that we have two things to do:
* decide on which metric we will measure our performance
* choose the models that will be built

## 2.1 About the performance metric and potential models
Our data is imbalanced: we have more no churn users that churn ones so the `accuracy` cannot be taken as the performance metric. If we do not want to choose between `Precision` or `Recall` because both are kind of equally important we can choose to
use the ***F1-Score*** metric which is the harmonic mean of both:  
> ```F1 score = 2x ((Precision x Recall)/(Precision + Recall))```  

This will be our choice!

About the models, as we are in a supervised learning classification problem:
* I will take the **Logistic Regression** as the baseline model
* Then, more sophisticated algorithms based on trees (**Random Forest** or even **GradientBoostingTree** can be tried). Moreover, with such models we can also plot the **feature importance** so it is easier to understand (and so to explain).
* To show that `accuracy` is not good we can also build 2 dummy classifier: an pessimistic one that will predict each user as a churn one and an optimistic one that will predict that all users will stay (so no churn at all).

Transformations to do:
* **Logistic Regression**: **features have to be scaled** (I will use MinMaxScaler so that it will be between 0 and 1). **No missing value allowed**: imputation as decided in previous notebook.
* For **tree based solutions, there is no need to scale** but no missing value allowed neither.

## 2.2. Train-test split
Made with the help of [Spark official documentation](https://spark.apache.org/docs/latest/ml-tuning.html#train-validation-split).

In [26]:
xy_train, xy_test = df.randomSplit([0.8, 0.2], seed=42)

In [27]:
print("{} rows in training dataset, {} rows in testing dataset".format(xy_train.count(), xy_test.count()))

193 rows in training dataset, 32 rows in testing dataset


How "good" was the split? How many churn users are in both datasets?

In [32]:
xy_train.select(Fmean('churn')).show()

+-------------------+
|         avg(churn)|
+-------------------+
|0.22279792746113988|
+-------------------+



In [33]:
xy_test.select(Fmean('churn')).show()

+----------+
|avg(churn)|
+----------+
|   0.28125|
+----------+



In [22]:
x_cols = df.columns[1:]

In [23]:
def build_pipeline(x_cols, clf, use_scaling=False):
    """
    Build the pipeline that will fit the needs
    """
    assembler = VectorAssembler(inputCols=x_cols, outputCol="x_cols")
    minmax_scaler = MinMaxScaler(inputCol="x_cols", outputCol="scaled_feat")
    stages = [assembler]
    if use_scaling:
        stages.append(minmax_scaler)
    stages.append(clf)
    return Pipeline(stages=stages)

In [71]:
# https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator
evaluator_train = MulticlassClassificationEvaluator(labelCol='churn', metricName='f1')
evaluator_test = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="churn")

## 2.3. Dummy classifiers

In [38]:
xy_test.groupby('churn').agg({'churn': 'count'}).show()

+-----+------------+
|churn|count(churn)|
+-----+------------+
|    1|           9|
|    0|          23|
+-----+------------+



### 2.3.1. Pessimistic dummy classifier (everyone will churn)

In [78]:
df_pessimistic = xy_test.select('churn').withColumn('prediction', lit(1.0))

In [79]:
acc_score = evaluator_test.evaluate(df_pessimistic, {evaluator_test.metricName: "accuracy"})
f1_score = evaluator_test.evaluate(df_pessimistic, {evaluator_test.metricName: "f1"})
print("F1-Score for pessimistic classifier: {:.2f} (accuracy: {:.2f})".format(f1_score, acc_score))

F1-Score for pessimistic classifier: 0.12 (accuracy: 0.28)


### 2.3.2. Optimistic dummy classifier (no one will churn)

In [75]:
df_optimistic = xy_test.select('churn').withColumn('prediction', lit(0.0))

In [80]:
acc_score = evaluator_test.evaluate(df_optimistic, {evaluator_test.metricName: "accuracy"})
f1_score = evaluator_test.evaluate(df_optimistic, {evaluator_test.metricName: "f1"})
print("F1-Score for optimistic classifier: {:.2f} (accuracy: {:.2f})".format(f1_score, acc_score))

F1-Score for optimistic classifier: 0.60 (accuracy: 0.72)


As we can see, it is quite easy to have 72% of accuracy with a dumb classifier.

## 2.4. Logistic Regression

In [59]:
log_reg = LogisticRegression(featuresCol="scaled_feat", labelCol="churn")

# No specific param for the moment, just use the default parameters
param = ParamGridBuilder().build()
pipeline = build_pipeline(x_cols, log_reg, True)

# https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator
log_reg_cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param, evaluator=evaluator_train, numFolds=3)

cv_model = log_reg_cv.fit(xy_train)
pred = cv_model.transform(xy_test)

f1_score = evaluator_test.evaluate(pred, {evaluator_test.metricName: "f1"})
print("F1-Score for Logistic Regression model: {:.2f}".format(f1_score))

F1-Score for Logistic Regression model: 0.78


In [61]:
pred.groupby("churn", "prediction").agg({'prediction': 'count'}).show()

+-----+----------+-----------------+
|churn|prediction|count(prediction)|
+-----+----------+-----------------+
|    1|       0.0|                4|
|    0|       0.0|               20|
|    1|       1.0|                5|
|    0|       1.0|                3|
+-----+----------+-----------------+



## 2.5. Random Forest

In [81]:
rf = RandomForestClassifier(featuresCol="x_cols", labelCol="churn")

# No specific param for the moment, just use the default parameters
param = ParamGridBuilder().build()
pipeline = build_pipeline(x_cols, rf)

# https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator
rf_cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param, evaluator=evaluator_train, numFolds=3)

cv_model = rf_cv.fit(xy_train)
pred = cv_model.transform(xy_test)

f1_score = evaluator_test.evaluate(pred, {evaluator_test.metricName: "f1"})
print("F1-Score for Random Forest model: {:.2f}".format(f1_score))

F1-Score for Random Forest model: 0.66


In [82]:
pred.groupby("churn", "prediction").agg({'prediction': 'count'}).show()

+-----+----------+-----------------+
|churn|prediction|count(prediction)|
+-----+----------+-----------------+
|    1|       0.0|                7|
|    0|       0.0|               20|
|    1|       1.0|                2|
|    0|       1.0|                3|
+-----+----------+-----------------+



## 2.6. GradientBoostingTree

In [85]:
gbt = GBTClassifier(featuresCol="x_cols", labelCol="churn")

# No specific param for the moment, just use the default parameters
param = ParamGridBuilder().build()
pipeline = build_pipeline(x_cols, gbt)

# https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator
gbt_cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param, evaluator=evaluator_train, numFolds=3)

cv_model = gbt_cv.fit(xy_train)
pred = cv_model.transform(xy_test)

f1_score = evaluator_test.evaluate(pred, {evaluator_test.metricName: "f1"})
print("F1-Score for GradientBoostingTree model: {:.2f}".format(f1_score))

F1-Score for GradientBoostingTree model: 0.70


In [84]:
pred.groupby("churn", "prediction").agg({'prediction': 'count'}).show()

+-----+----------+-----------------+
|churn|prediction|count(prediction)|
+-----+----------+-----------------+
|    1|       0.0|                3|
|    0|       0.0|               16|
|    1|       1.0|                6|
|    0|       1.0|                7|
+-----+----------+-----------------+



---
# CONCLUSION
* With **this** dataset (which is small), the **best model so far is the Logistic Regression with a F1-Score of 0.78**, followed by GBT with 0.70 and then RandomForest with 0.66.  
* It is good to note that **none of RandomForest and GBT have been tuned** (they offer more parameters to tune than Logistic Regression).
* It is also good to note that all **those 3 models are better than our dummy classifier**. It means that in any case, no matter of the choice, any model is a better option than choosing arbitrarily.

***That would be too fast and simple to make any further conclusions***. To choose a model and tune it we need more data (remember that here we have only 32 users to classify and the model has been trained with less than 200 users...we cannot really say that it is "big data" ;-)).  
In this project we have the option to setup a cluster on the cloud in order to use Spark on a bigger dataset. This is what I did (follow this [notebook](APPENDIX_Sparkify_EMR.ipynb) for more details). Results have been stored and exported so that they can be loaded from here. This is what will be detailed in the next [notebook](4_Sparkify_Modeling_Full_Dataset.ipynb)!