# Sparkify Project Workspace


### Contents
* [Loading data, preliminary study of content](#load)
* [Summary of Data Content](#summary1)
* [Cleaning Data](#clean)
* [Exploratory Data Analysis](#explore1)
* [Aggregating the Data](#aggregate)
* [Exploratoring Aggregated Data](#explore2)
* [Feature Engineering](#feature)
* [Modeling](#model)
    - [1. Copy dataframe; Apply Train Test Split](#copy)
    - [2. Build Pipelines](#pipes)
    - [3. Train Base Models](#train)
    - [4. Evaluate First Predictions](#evaluate)
    - [5. Tune Models and Re-Compute Accuracy](#tune)    
* [Recommendation](#recomend)

In [None]:
# import libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType


from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

import seaborn as sb

In [None]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify Data") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. 
The dataset was loaded and cleaned, checking for invalid or missing data - for example, records without userids or sessionids. 

### Loading data, preliminary study of content<a name="load"></a>

In [None]:
# load data
path = "mini_sparkify_event_data.json"
event_log = spark.read.json(path)

In [None]:
# study schema
event_log.printSchema()

In [None]:
# sample check columns 1 thru 10 for content
event_log.select('artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method').show(5)

In [None]:
# sample check columns 11 thru 18 for content
event_log.select('page','registration','sessionId','song','status','ts','userAgent','userId').show(5)

In [None]:
# count rows
event_log.count()

In [None]:
# describe columns 1 thru 6 for stats
event_log.describe('artist','auth','firstName','gender','itemInSession','lastName').show()

In [None]:
# describe columns 7 thru 12 for stats
event_log.describe('length','level','location','method','page','registration').show()

In [None]:
# describe columns 13 thru 18 for stats
event_log.describe('sessionId','song','status','ts','userAgent','userId').show()

In [None]:
# establish unique values in page column
event_log.select("page").dropDuplicates().sort("page").show()

In [None]:
# establish unique values in level column
event_log.select("level").dropDuplicates().sort("level").show()

In [None]:
# establish unique values in auth column
event_log.select("auth").dropDuplicates().sort("auth").show()

In [None]:
# relating method to activity
event_log.select("method","page").dropDuplicates().show()

In [None]:
# clarify relationship between page, registration, sessionId, itemInSession, ts and userId
event_log \
    .filter(f.col('userId')==30)\
    .select('page','registration','sessionId','itemInSession','ts','userId') \
    .sort(f.asc('sessionId'),f.asc('ts')) \
    .show(10)

In [None]:
# clarify relationship between registration, ts and userId
event_log \
    .groupBy('registration','userId').agg(f.min('ts').alias('ts')) \
    .sort(f.asc('userId')) \
    .show(10)

In [None]:
# translate unixtimestamps ts and and registration into a date format
event_log = event_log \
    .withColumn('ts_date', f.to_utc_timestamp(f.from_unixtime(f.col('ts')/1000,'yyyy-MM-dd HH:mm:ss'),'EST')) \
    .withColumn('reg_date', f.to_utc_timestamp(f.from_unixtime(f.col('registration')/1000,'yyyy-MM-dd HH:mm:ss'),'EST'))

event_log.select('ts_date','reg_date').show(5)

In [None]:
# first and last ts_dates, first Registration date
event_log.agg(
    f.min('ts_date').alias('start'),
    f.max('ts_date').alias('end'),
    f.min('reg_date').alias('first_reg')
    ).show()

### Summary of Data Content:<a name="summary1"></a>
#### Scope:
* Checking the eraliest and latest timespamps shoes that data was being collected between 01/10/2018 and 03/12/2018, just over 2 months.
* The first registration was showing as 18/03/2018.

#### Dataframe contains the following columns:
**1. Data relating to users**:
* `userId` - a unique numeric identifier assigned to each user who is registered with the platform.
* `firstName` - users first name.
* `lastName` - users surname.
* `gender` - their gender.
* `location` - the location of their registered address.
* `level` - the tier *paid* or *free* to which they are subscribed.

**2. Data relating to sessions (periods of time for which a user is logged in to the platform)**:
* `sessionId` - the id number for the session. Each session for a particlular user has its own id. THe same sessionId may appear in another users account. 
* `auth` - an identifier for the authorisation level for the session, one of four possibilities:- *Cancelled, Guest, Logged In, Logged Out.* No userIds are associated with *Logged Out* or *Guest*.
* `userAgent` - The browser / operating system used to access the platform.
* `registration` - the timestamp associated with the time when the user registered. Events triggered by individuals who are not logged in return no registration timestamp.

**3. Event logs** are recorded for individual interactions between a user and the platform:
* `page` - the type of event. There are 22 different page types such as *NextSong* for playing a song, *Add to Playlist* for adding a song to a playlist, *Home* for visiting the platform's home-page etc.
* `method` - User interaction with platform: *Put* for input comming from user, *Get* for output recieved by user.
* `itemInSession` - sequential number with in a session for an event that was logged.
* `ts` - an integer timestamp for recording the time of an event in milliseconds where ts=0 is defined 01.01.1970 00:00:00.000).

**4. Data relating to songs played**, the event type (page) NextSong records that the user is playing a song. For this event song details are recorded:
* `song` - the song's title.
* `length` - the length of time for which the song was listened to.
* `artist` - the song's performer.

### Cleaning Data<a name="clean"></a>

From the preliminary study of the data and its statistics, the following data quality issues were identified:

#### Issues:
* Min Value for `userId` is empty.<br>
* Missing values in `artist`, `firstName`, `gender`, `lastName`, `length`, `location`, `registration`, `song` & `userAgent`

#### Fixes:
* Create new dataframe `event_log_valid`<br>
* Drop Nulls from `userId`

In [None]:
# drop rows with null as "userId"
event_log_valid = event_log.dropna(how = "any", subset = ["userId"])

In [None]:
# recount rows
event_log_valid.count()

* It would appear that dropping null userId had no effect; so content of "userId" was rechecked

In [None]:
event_log_valid.select("userId").dropDuplicates().sort("userId").show(10)

#### Redefine Issue
* No nulls but an empty String<br>
#### Redefine Fix
* Drop empty string<br>
* Revalidate

In [None]:
# drop rows with empty string as "userId"
event_log_valid = event_log_valid.filter(event_log_valid["userId"] != "")

In [None]:
# recount rows
event_log_valid.count()

In [None]:
# describe columns 1 thru 6 for stats
event_log_valid.describe('artist','auth','firstName','gender','itemInSession','lastName').show()

In [None]:
# describe columns 7 thru 12 for stats
event_log_valid.describe('length','level','location','method','page','registration','sessionId').show()

In [None]:
# describe columns 13 thru 12 for stats
event_log_valid.describe('song','status','ts','userAgent','userId').show()

#### State of Data after Cleaning
* all columns refering to song information, `song`, `artist`, `length` contain 228108 records.
* all other columns contain 278154 records.
* this is plausible, because songs are only played when the event (page) `NextSong` is logged. For all other log events (pages) there would be no song infomation.

# Exploratory Data Analysis<a name="explore1"></a>
Performing EDA with the cleaned subset of the data and doing basic manipulations within Spark. 

#### Pareto analysis of `Page` types
* the column `page` is one of the most useful for our prediction in that it contains information about the type of log event that occurred. This information can be transformed to provide data about the users activity, which in turn can be used to predict the if a user is going to churn.

In [None]:
# get count of unique page values to control output of .show()
pages = event_log_valid.select("page").dropDuplicates().count()
# obtain an aggregated count of each page type, in descending order
pages_agg = event_log_valid \
    .groupby('page') \
    .agg(f.count('page')).withColumnRenamed('count(page)','count') \
    .sort(f.desc('count'))
# send to pandas for visualisation
pages_agg_pd = pages_agg.toPandas()
# create, show and save as *png pareto diagram for count of page events
pages_agg_pd = pages_agg_pd.sort_values('count',ascending=False).reset_index()
plt.figure(figsize = [10, 5])
sb.barplot(y='page', x='count', data=pages_agg_pd, order=pages_agg_pd['page'], color = sb.color_palette()[0])\
.set_title('Pareto of Page Types', fontsize=12)
plt.ylabel('')   
plt.savefig('pages_pareto.png')
plt.show();     

* it can be seen that the page with by far the most event logs is `NextSong`, which is to be expected, after all the purpose of Sparkify is to play songs.
* however `NextSong` is so dominant, it is hard to see the results at the tail end of the Pareto, so it will be replotted without NextSong to achieve this.

In [None]:
# Redraw, show and save as *.png Pareto without NextSong to better visuzalise tail of pareto
pages_agg_pd = pages_agg_pd.loc[pages_agg_pd.page != 'NextSong'].sort_values('count',ascending=False).reset_index()
plt.figure(figsize = [10, 5])
sb.barplot(y='page', x='count', data=pages_agg_pd, order=pages_agg_pd['page'], color = sb.color_palette()[0])\
.set_title('Pareto of Page Types without NextSong', fontsize=12)
plt.ylabel('')   
plt.savefig('pages_pareto_2.png')
plt.show();

* it is interesting to see that `Cancel` and `Cancellation Confirmation`, the two events that we wish to predict are virtually negligible compared to the others, and that with this sample data set the number of `downgrade` and `Submit Downgrade` events do not match, like as much the numbers of `upgrade` and `Submit Upgrade` events.

#### Analysis of Gender difference for Number of Users and Webpage Usage
* another insight that was studied in the exploratory analysis is the difference between genders, in terms of number of users and amount of usage per user.

In [None]:
# comparison of number of users by gender and rate of visits by gender
# obtain aggregated count of number of users, number of sessions by gender 
# to give number of sessions per user by gender
gender_agg = event_log_valid \
    .groupby('gender') \
    .agg(
        f.count('sessionId').alias('visits'),
        f.countDistinct('userId').alias('users')
    ) \
    .withColumn('visits_per', f.round(f.col('visits')/f.col('users'))) 
# send to pandas for visualisation
gender_agg_pd = gender_agg.toPandas()
# create, show and save as *png barcharts comparing users by gender and usage by gender
plt.figure(figsize = [12,4])
# User Count
plt.subplot(1,2,1)
sb.barplot(x="gender", y="users", data=gender_agg_pd).set_title('Number of Users by Gender', fontsize=12)
tick_names = ['Female','Male']
plt.xticks([0,1],tick_names)
plt.xlabel('')
plt.ylabel('count')
# Usage
plt.subplot(1,2,2)
sb.barplot(x="gender", y="visits_per", data=gender_agg_pd).set_title('Rate of Usage by Gender', fontsize=12)
tick_names = ['Female','Male']
plt.xticks([0,1],tick_names)
plt.xlabel('')
plt.ylabel('Sessions / User')
plt.savefig('user_gender.png')
plt.show();

* whilst there are clearly more male users than female, it is interesting to see that female users spend more time using Sparkify.

#### Analysis of Gender difference in terms of Paid Subscription Level
* finally the difference between the genders regarding the proportion of users who at somepoint had Paid Subscription was measured (including even those who subsequently downgraded)

In [None]:
# comparison of % of users who have had or who have Paid level
# obtain aggregated count of number of users who have logs at Paid level,
# join in the total number users from gender_agg to obtain % users who have logs at paid level
gender_level_agg = event_log_valid \
    .filter(f.col('level')=='paid') \
    .groupby('gender','level','userId') \
    .agg(
        f.countDistinct('userId').alias('count')
    ) \
    .groupby('gender','level') \
    .agg(
        f.sum('count').alias('count')
    ) \
    .join(gender_agg.select('gender','users'), on = 'gender', how = 'left') \
    .withColumn('paid_per',100*f.col('count')/f.col('users')) \
# send to pandas for visualisation
gender_level_agg_pd = gender_level_agg.toPandas()
# create, show and save as *png bar chart for % users who have logs at paid level
plt.figure(figsize = [6,4])
sb.barplot(x="gender", y="paid_per", data=gender_level_agg_pd)\
.set_title('Users who use or have used the Paid Level', fontsize=12)
tick_names = ['Female','Male']
plt.xticks([0,1],tick_names)
plt.xlabel('')
plt.ylabel('% Users')
plt.savefig('level_gender.png')
plt.show();

* a marginally higher proportion of female users have at some point had a paid subscription, compared to male users.
* we can conclude from this that gender is an important factor to consider when looking to predict user behaviour.

## Aggregating the data to create a new dataframe containing one row per UserId<a name="aggregate"></a>
* the data as supplied is essentially a table of event logswhere each row represents a single user event.
* the purpose of this exercise is to predict user behaviour, so these single user events were be aggregated such that in the resulting dataframe each row refers to an individual user.
* the columns in the resulting dataframe represent various features, such as page event type, membership level, gender, number of sessions etc, and contain values for these features for each userId.
* these values were subsequently be used to create vectors for each user for use in the prediction models.

### The aggregated dataframe was built up in several steps as follows:
1. Obtain the count of each page type for each userId/gender grouping, and pivot this to obtain a table with userId/gender as the row index and ount of page events as columns. To this table an additional column `gender_v` is added that contains the gender data as a binary variable.

In [None]:
# get a count of each page type for each userId/SessionId/gender
agg_users = event_log_valid \
        .groupby('userId','gender','page') \
        .agg(f.count('page')).withColumnRenamed('count(page)','counts') \
        .sort(f.asc('userId'))
# pivoting to a table of userId/SessionId/gender as rows
# and count of page events as columns 
agg_users = agg_users \
    .groupby('userId','gender') \
    .pivot('page') \
    .max('counts') \
    .fillna(0) \
    .withColumn('gender_v',
                f.when(f.col('gender')=="F", f.lit(0)).otherwise(f.lit(1))
               )

2. Find the earliest timestamp, latest timestamp and number of logged events for each userId/SessionId. Use the difference bewteen these timestamp values to get the duration `online_time` of each session. Aggregate these values by userId to get the earliest timestamp, the last timestamp, the total number of logs and the total `online_time` for each UserId. The earliest and last timestamps were then used to get a value for `half_time` which would be used for trend calulations

In [None]:
# collect timestamp start and information for each sessionId/userId group
# calculate online_time duration in hours of each sessionId (1 hour = 60*60*1000 ms)  
agg_sessions = event_log_valid \
    .groupby('userId','sessionId').agg(
        f.min(f.col('ts')).alias('start_ts'),
        f.max(f.col('ts')).alias('end_ts'),
        f.countDistinct('ts').alias('total_logs')
        ) \
    .withColumn('online_time', ((f.col('end_ts')-f.col('start_ts'))/(1000*60*60)))

# aggregate online_time for each user to get total online time
# find first and last time stamp for each user
# find half time (mid point between first and last time stamp) for each user (to calculate trends)
# sum number of sessions for each user
agg_sessions =  agg_sessions \
    .groupby('userId').agg(
        f.sum(f.col('online_time')).alias('online_time'),
        f.min(f.col('start_ts')).alias('start_ts'),
        f.max(f.col('end_ts')).alias('end_ts'),
        f.countDistinct('sessionId').alias('sessions'),
        f.sum(f.col('total_logs')).alias('total_logs')
    ) \
    .withColumn('half_time', f.col('start_ts')+((f.col('end_ts')-f.col('start_ts'))/2))

3. In this step the data is manipulated to provide value for a trend calulation. The earliest timestamp and the number of event logs for each userId/sessionId grouping is aggregated into a dataframe. The `half_time` value for each userId is obtained by joining the relevant column from the `agg_sessions` dataframe created in the previous step.

In [None]:
# get earliest timestamp & count logs for each sessionId/UserId,
# for use in trend calclation.
agg_trends = event_log_valid \
    .groupby('userId','sessionId').agg(
        f.min(f.col('ts')).alias('start_ts_trend'),
        f.countDistinct('ts').alias('total_logs_trend')
    )
    
# join in half_time value from agg_sessions, compare sessionId start time of with half_time
# to split logs into two groups: logs_at_start, logs_at_end 
agg_trends =  agg_trends \
    .join(agg_sessions.select('userId','half_time'), on = ("userId"), how = "left") \
    .withColumn('logs_at_start',
                f.when(f.col('start_ts_trend') <=f.col('half_time'), f.col('total_logs_trend')).otherwise(f.lit(0))
                ) \
    .withColumn('logs_at_end',
                f.when(f.col('start_ts_trend') > f.col('half_time'), f.col('total_logs_trend')).otherwise(f.lit(0))
                ) \
    .groupby('userId').agg(
        f.sum(f.col('logs_at_start')).alias('logs_at_start'),
        f.sum(f.col('logs_at_end')).alias('logs_at_end')
    )

4. The dataframes created in steps 1 to 3 are then joined to create the completed aggregated dataframe. As they will no longer be used they can be deleted to free up memory.

In [None]:
# join agg_sessions agg_gender and agg_trend to aggregation        
df_aggregated = agg_users \
    .join(agg_sessions, on=("userId"), how='left') \
    .join(agg_trends, on=("userId"), how='left')

In [None]:
df_aggregated.printSchema()

### Define Churn & Downgrade
A column `Churn` was added to the table df_agrregated to use as the label for the prediction models. This was achieved using the `Cancellation Confirmation` events to define churn, which applies to both paid and free users. Additionally, a column `downgraded` was added using the `Downgrade` events as a definition.

In [None]:
# define flags
flag_cancellation = f.udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
flag_downgrade = f.udf(lambda x: 1 if x == "Downgrade" else 0, IntegerType())
    
# map flags to userIds
event_flags = event_log_valid \
    .select('userId','page') \
    .withColumn('churned', flag_cancellation('page')) \
    .withColumn('downgraded', flag_downgrade('page')) \
    .groupby('userId') \
    .agg(
        f.max(f.col('churned')).alias('churned'),
        f.max(f.col('downgraded')).alias('downgraded')
    )

# add the columns to the main dataframe
df_aggregated = df_aggregated \
        .join(event_flags, on=("userId"), how='left') 

### Explore Data<a name="explore2"></a>
* Having defined `churn`, some exploratory data analysis was performed to observe the behavior for users who stayed vs users who churned. 
* Aggregated values for these two groups of users were explored, comparing the incidence rate that they experienced for specific events in a given time, in this case per hour.
* A dictionary of actions `rates_hour` was created, with the events as keys and new *event rate* titles (ending with `_h`) as values.
* The dictionary was then looped through to create new feature columns each containing values for the relevant event rate per hour.

In [None]:
# defining rates per hour:
rates_hour = {
    'Thumbs Down':'thumbs_down_h',
    'Roll Advert':'adverts_h',
    'Error':'errors_h',
    'Help':'help_h',
    'Save Settings':'settings_h',
    'Downgrade':'downgrades_h',
    'Thumbs Up':'thumbs_up_h',
    'Add to Playlist':'playlists_h',
    'Add Friend':'freinds_h',
    'Upgrade':'upgrades_h',
    'NextSong':'songs_h',
    'total_logs':'total_logs_h',
    'Home':'home_page_h'
}

In [None]:
for key, value in rates_hour.items():
    df_aggregated = df_aggregated \
        .withColumn(value, f.col(key) / f.col('online_time')) 

* Addition aggregations were calculated for
     - `average session time` per user
     - `positivity` - the ratio of positive enjoyment indicators / total enjoyment indicators*
     - `negativity` - the ratio of negative enjoyment indicators / total enjoyment indicators*
     - `trend` - comparing log_events/hour in the first half of the period the User was active with log_events/hour in the second half

(* An enjoyment indicator is a log event that either reflects or would provoke the users level of enjoyment:- `Thumbs Up`, `Add to Playlist`, `Add Friend`, `Upgrade`, `Thumbs Down`, `Roll Advert`, `Error`, `Help`, `Save Settings`, `Downgrade`)

In [None]:
# additional rates
df_aggregated = df_aggregated \
    .withColumn('mean_session_time', f.col('online_time') / f.col('sessions')) \
    .withColumn('sum_positive', 
                f.col('Thumbs Up')+f.col('Add to Playlist')
                +f.col('Add Friend')+f.col('Upgrade')) \
    .withColumn('sum_negative', 
                f.col('Thumbs Down')+f.col('Roll Advert')+f.col('Error')
                +f.col('Help')+f.col('Save Settings')+f.col('Downgrade')) \
    .withColumn('positivity', f.col('sum_positive') / (f.col('sum_negative')+f.col('sum_positive')))\
    .withColumn('negativity', f.col('sum_negative') / (f.col('sum_negative')+f.col('sum_positive')))\
    .withColumn('trend', (f.col('logs_at_start')-f.col('logs_at_end'))/ (f.col('online_time')/2))\
    .fillna(0)            

* The completed aggregated dataframe was sent to Pandas, to facilitate the visualisations

In [None]:
aggregated_pd = df_aggregated.toPandas().set_index("userId")

In [None]:
aggregated_pd.describe()

### Visualise Data
Box plots were chosen to compare the incidence rate of these events between the churned users and the users who stayed.

In [None]:
# defining boxplot titles and variables:
boxplots = {
    'thumbs down / hour':'thumbs_down_h',
    'adverts / hour':'adverts_h',
    'errors / hour':'errors_h',
    'help / hour':'help_h',
    'settings / hour':'settings_h',
    'downgrades / hour':'downgrades_h',
    'thumbs up / hour':'thumbs_up_h',
    'added playlists / hour':'playlists_h',
    'added freinds / hour':'freinds_h',
    'upgrades / hour':'upgrades_h',
    'songs / hour':'songs_h',
    'logs / hour':'total_logs_h',
    'homepage visits / hour':'home_page_h',
    'mean session time':'mean_session_time',
    'positivity':'positivity',
    'negativity':'negativity',
    'trend, logs / hour':'trend'
}

In [None]:
# plotting boxplot variables, stayed vs churned
dictlength = len(boxplots)
if dictlength%4==0:
    val = 0
else:
    val = 1
rows = round(dictlength/4) + val

plt.figure(figsize = [24,24])
plt.tight_layout()
for i in range (0,dictlength):
    plt.subplot(rows,4,i+1)
    sb.boxplot(data = aggregated_pd, x = 'churned', y = list(boxplots.values())[i], showfliers = False)\
    .set_title(list(boxplots.keys())[i], fontsize=15)    
    tick_names = ['Stayed','Churned']
    plt.xticks([0,1],tick_names)
    plt.xlabel('')
    plt.ylabel('')    
plt.savefig('boxplot_rate.png')
plt.show();

A heatmap was drawn to study the correlation between the incidence rates

In [None]:
heatmap_pd = aggregated_pd[list(boxplots.values())]
fig, ax = plt.subplots(figsize=(12, 10))

sb.heatmap(heatmap_pd.corr(), annot=True, fmt=".2f", ax=ax, cmap="vlag", center=0)\
.set_title('Heatmap showing Correlation Between Numeric Features', fontsize=15)
plt.savefig('heatmap_pd.png')
plt.show();

Barchart comparing number of users who churned with those who stayed

In [None]:
tick_names = ['Staying Members','Churned Members']
plt.xticks([0,1],tick_names)
sb.countplot(data = aggregated_pd, x = 'churned').set_title('Dataset Inbalance', fontsize=15)
plt.xticks([0,1],tick_names)
plt.xlabel('')
plt.ylabel('count')
plt.savefig('inbalance.png')
plt.show();

### Insights from Exploratory data Analysis
* reference to the box plot diagrams shows that the median and quartile values for several of these incidence rates differ for users who have stayed and those who have churned, particluarly for `thumbs down / hour`, `thumbs up / hour``adverts / hour`, `mean session time / hour`, `positivity` and `negativity`.
* however it should be noted that there is also a lot overlap between the distributions, which could lead to uncertainties when making a prediction.
* regarding the heatmap, some of the features have a very strong correlation bewtween them, which means that they could be considered duplicate and can therefore be discarded: 
    - `positivity` and `negativity` unsurprisingly have a very strong negative correlation, so `negativity` can be discarded.
    - `total_logs_h` and `songs_h` are also positively related, as are `total_logs_h` and `adverts_h`, together with `total_logs_h` and `home_page_h`. Seeing as Sparkify is about playing songs, `total_logs_h` and `home_page_h` could also be dropped.
* Regarding the ratio of churned to stayed users, the data is not balanced, so `Balanced Accuracy` should be used when evaluating the prediction models

### Memory Management:
Before proceeding the various supporting pandas and spark dataframes were deleted, to free up RAM for the Modelling phase.

In [None]:
pandas_list = [
    pages_agg_pd, 
    gender_agg_pd, 
    gender_level_agg_pd,
    aggregated_pd,
    heatmap_pd
]
del pandas_list 

In [None]:
event_log.unpersist()
event_log_valid.unpersist()
pages_agg.unpersist()
gender_agg.unpersist()
gender_level_agg.unpersist()
agg_users.unpersist()
agg_sessions.unpersist()
agg_trends.unpersist()
event_flags.unpersist();

# Feature Engineering<a name="feature"></a>
The features which will used with those which were studied in the Boxplot and Heatmap anaylses minus the "correlated duplicates".
* `thumbs_down_h`: thumbs down / hour
* `adverts_h`: adverts / hour
* `errors_h`: errors / hour
* `help_h`: visits to help pages / hour
* `settings_h`: number of changes to user settings / hour
* `downgrades_h`: 'downgrades / hour'
* `thumbs_up_h`: 'thumbs up / hour'
* `playlists_h`: songs added to playlists / hour
* `freinds_h`: recommendations to freinds / hour
* `upgrades_h`: upgrades / hour
* `songs_h`: songs playyed / hour
* `mean_session_time`: average length of session time duration
* `positivity`
* `trend`: trend for rate of logs / hour

In [None]:
# identify numeric features
cols_features = list(boxplots.values())
cols_features.remove('total_logs_h')
cols_features.remove('home_page_h')
cols_features.remove('negativity')

# identify binary features
bin_features = ['gender_v','downgraded']

# define VectorAssembler combining numeric features into vector "NumericFeatures"
numeric_assembler = VectorAssembler(inputCols=cols_features, outputCol="NumericFeatures")

# define Normalization of "NumericFeatures" to "ScaledFeatures" vector
numeric_scaler = Normalizer(inputCol="NumericFeatures", outputCol="ScaledFeatures")

# define VectorAssembler to add binary features to "ScaledFeatures" to obtain Vector "features"
final_assembler = VectorAssembler(inputCols=bin_features + ["ScaledFeatures"], outputCol="features")


In [None]:
# create copy of dataframe containing the features that are to be processed
# and test the functions for plausibility and errors
df_test = df_aggregated.select(*bin_features,*cols_features)

# Assemble Numeric Features into vector "NumericFeatures"
df_test = numeric_assembler \
    .transform(df_test) \
    .drop(*cols_features)

# Normalize "NumericFeatures" as "ScaledFeatures"
df_test = numeric_scaler.transform(df_test)

# Add binary features to "ScaledFeatures" to obtain Vector "features"
df_test = final_assembler.transform(df_test)

# check output
df_test.head(1)

In [None]:
df_test.unpersist();

# Modeling<a name="model"></a>
* The full dataset was copied and split into train, and test sets.
* Three machine learning methods `LogisticRegression`, `RandomForestClassifier`and `GBTClassifier` were tried.
* These were evaluated for accuracy of the various models, and the parameters tuned as necessary.
* Based on test accuracy the winning model was determined and results on the validation set were reported.
* Since the churned users are a fairly small subset, F1 score was the metric to optimize.

## Step 1. Copy dataframe; Apply Train Test Split<a name="copy"></a>
* `df_model`, a copy of the dataframe `df_aggregated` was made and then randomly split into 90% for training and 10% for testing,<br> with the random seed set to `42`.

In [None]:
# copy aggregated to new dataframe df_model, set column 'churned' as label
df_model = df_aggregated\
    .select(*bin_features,*cols_features,'churned')\
    .withColumnRenamed('churned','label')

In [None]:
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = df_model.randomSplit([0.9, 0.1], seed = 42)

In [None]:
trainingData.persist()

## Step 2. Build Pipelines<a name="pipes"></a>
* Three pipelines have been built using the algorythmns `LogisticRegression`, `RandomForestClassifier`and `GBTClassifier` (Gradient Boosted Tree), initially with default parameter settings

In [None]:
def pipeline_build(model):
    """
    Function to build pipeline using engineered features
    In:
    - name of model
    Out:
    - pipeline
    """
    pipeline = Pipeline(stages=[numeric_assembler,numeric_scaler,final_assembler, model])
    
    return pipeline

In [None]:
# logistic Regression
lr = LogisticRegression(labelCol="label", featuresCol="features",maxIter=100,regParam=0.0,elasticNetParam=0.0)
pipeline_lr =  pipeline_build(lr)

In [None]:
# Random Forest Classifier
rfc = RandomForestClassifier(labelCol="label", featuresCol="features",maxDepth=5, numTrees=20)
pipeline_rfc =  pipeline_build(rfc)

In [None]:
# GBT Classifier
gbt = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=5, maxIter=20)
pipeline_gbt = pipeline_build(gbt)

## Step 3. Train Base Models (default Parameter settings) and Make First Predictions<a name="train"></a>
### 3.1 Function for Generating Prediction using Model with Default Parameters:
* This function is written to train a given Pipeline model with its default Parameter settings, using the trainingData from the data split.
* The trained model is then used to make a prediction for the remaining testData.

In [None]:
def prediction(pipeline):
    """
    Function to train basic pipeline models with default parameter settings and make predictions
    In:
    - pipeline
    Out:
    - prediction
    """
    # Train model
    model = pipeline.fit(trainingData)
    
    # Make predictions
    predictions = model.transform(testData)
    
    return predictions    

### 3.2 Run function for the three Pipelines

In [None]:
# logistic Regression
base_prediction_lr = prediction(pipeline_lr)

In [None]:
# Random Forest Classifier
base_prediction_rfc = prediction(pipeline_rfc)

In [None]:
# GBT Classifier
base_prediction_gbt = prediction(pipeline_gbt)

## Step 4. Evaluate First Predictions<a name="evaluate"></a>
### 4.1 Function for Calculating Accuracy Values for given Predictions:
* with reference to the [Precision and Recall](https://en.wikipedia.org/wiki/Precision_and_recall) definition in wikipedia, together with Spark's `BinaryClassificationEvaluator()`, a function was written to obtain values first for a confusion matrix, which could be used to calulate the Accuracy and F1 Score, as well as values for Area under the ROC and PR curves.
* Because the data is not balanced, due to the number of churned members being far less than the number of staying members, `Balanced Accuracy` should be used for evaluating the prediction model

In [None]:
def test_prediction(prediction):
    """
    Function to test accuracy of prediction
    In:
    - prediction dataframe
    Out:
    - dictionary containing results showing accuracy of prediction
    """
    
    # confusion matrix for accuracy and f1 calculation
    positives = prediction.filter(prediction.label == 1)
    true_pos = (positives.filter(positives.label == positives.prediction).count())
    fals_neg = (positives.filter(positives.label != positives.prediction).count())
    negatives = prediction.filter(prediction.label == 0)
    true_neg = (negatives.filter(negatives.label == negatives.prediction).count())
    fals_pos = (negatives.filter(negatives.label != negatives.prediction).count())
    
    # balanced accuracy
    tpr = true_pos/(true_pos+fals_neg)
    tnr = true_neg/(true_neg+fals_pos)
    accuracy = round((tpr+tnr)/2,3)
    
    # f1 score
    f1_score = round(2*true_pos/(2*true_pos+fals_pos+fals_neg),3)
    
    # area under curve evaluation using API BinaryClassificationEvaluator
    roc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
    pr_evaluator = BinaryClassificationEvaluator(metricName="areaUnderPR")
    
    au_roc = round(roc_evaluator.evaluate(prediction),3)
    au_pr = round(pr_evaluator.evaluate(prediction),3)
    
    results = {
        'True Positives':true_pos,
        'False Negatives':fals_neg,
        'True Negatives':true_neg,
        'False Positives':fals_pos,
        'Balanced Accuracy':accuracy,
        'F1 Score':f1_score,
        'Area under PR':au_pr,
        'Area under ROC':au_roc
    }
    
    return results

### 4.2 Run the Function to Compute Accuracy
#### 4.2a LogisticRegression

In [None]:
base_result_lr = test_prediction(base_prediction_lr)
base_result_lr

#### 4.2b RandomForestClassifier

In [None]:
base_result_rfc = test_prediction(base_prediction_rfc)
base_result_rfc

#### 4.2c GBTClassifier

In [None]:
base_result_gbt = test_prediction(base_prediction_gbt)
base_result_gbt

### 4.3 Summary of Results and Insights for Default Models
#### Results

* to facilitate this a function for creating tables in matplotlib is used

In [None]:
def render_mpl_table(data, col_width=3.0, row_height=0.625, font_size=14,
                     header_color='#40466e', row_colors=['#f1f1f2', 'w'], edge_color='w',
                     bbox=[0, 0, 1, 1], header_columns=0,
                     ax=None, **kwargs):
    if ax is None:
        size = (np.array(data.shape[::-1]) + np.array([0, 1])) * np.array([col_width, row_height])
        fig, ax = plt.subplots(figsize=size)
        ax.axis('off')
    mpl_table = ax.table(cellText=data.values, bbox=bbox, colLabels=data.columns, **kwargs)
    mpl_table.auto_set_font_size(False)
    mpl_table.set_fontsize(font_size)

    for k, cell in mpl_table._cells.items():
        cell.set_edgecolor(edge_color)
        if k[0] == 0 or k[1] < header_columns:
            cell.set_text_props(weight='bold', color='w')
            cell.set_facecolor(header_color)
        else:
            cell.set_facecolor(row_colors[k[0]%len(row_colors) ])
    return ax.get_figure(), ax

In [None]:
# Tabulate the results
base_result_pd_lr = pd.DataFrame.from_dict(base_result_lr, orient='index', columns=['LogisticRegression_Base'])
base_result_pd_rfc = pd.DataFrame.from_dict(base_result_rfc, orient='index', columns=['RandomForestClassifier_Base'])
base_result_pd_gbt = pd.DataFrame.from_dict(base_result_gbt, orient='index', columns=['GBTClassifier_Base'])

base_result_pd = base_result_pd_lr\
    .join(base_result_pd_rfc, how='left')\
    .join(base_result_pd_gbt, how='left')

base_result_pd.index.names = [' ']
base_result_pd.reset_index(level=0, inplace=True)

In [None]:
fig,ax = render_mpl_table(base_result_pd, header_columns=0, col_width=5.0)
fig.savefig("base_result_pd.png")

#### Insights
* Area under the curves for all models is relatively high thanks to their 100% success rate when predicting true negatives, however this may well be due to the significant inbalance in the data between positives and negatives.
* Balanced accuracy and F1 score are not so optimistic, owing to the fact that true Positives are not so well predicted. In fact the Random Forest Classifier model failed to predict any true Positives at all.
* Based on the initial assessment, GBT Classifier has an edge with a higher F1 Score and Balanced Accuracy, since it was able to identify 2 from 4 Postitives.

## Step 5. Tune Models and Re-Compute Accuracy<a name="tune"></a>
* A Function for executing 3-fold cross-validation on a given Pipeline Model was written, which takes the base pipeline model and  Parameter grid, crossvalidates it using the Spark Tuning Library `CrossValidator` and `trainData` to find the most accurate models:

### 5.1 Defining Function for Tuning the Models

In [None]:
def crossval(pipeline, paramGrid):
    """
    Function to cross validate parameters of pipeline models
    to obtain best model
    In:
    - pipeline to optimize
    - paramGrid relevant to pipeline
    Out:
    - tuned model with optimized parameters
    """
    
    crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3,
                          parallelism=4)
    
    cvModel = crossval.fit(trainingData)
    
    return cvModel

### 5.2 Defining Parameter Values for Tuning the Models
* In order to use this function, Parameter Grids were defined. 
* First the default allowed values parameter values for the pipeline models were obtained from the Spark documentation for [LogisticRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html), [RandomForestClassifier](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestClassifier.html) and [GBTClassifier](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.GBTClassifier.html)
* Based on this information parameterGrids were built for the cross validation as follows:

| **Model** | **Main Parameters** | **Default values** | **CrossValidation Values** | 
| :---------- | :---------- | :---------- | :---------- |
| LogisticRegression | maxIter<br>regParam<br>elasticNetParam | 100<br>0.0<br>0.0 | 100, 50, 20<br>0.0, 10.0, 20.0<br>0.0, 1.0 | 
| RandomForestClassifier | maxDepth<br>numTrees | 5<br>20 | 5, 3, 7<br>20, 10, 40 |
| GBTClassifier | maxDepth<br>maxIter | 5<br>20 |  5, 3, 7<br>20, 10, 40  |

* The `.avgmetrics` for the cross validations was obtained to identify the best parameter combination.
* The pipelines wer then reconstructed with these parameters, and the predictions re-run

### 5.3 Cross Validate and Tune the Models, Run the Predictions and Compute Accuracy
#### 5.3a LogisticRegression

In [None]:
# Define ParamGrid
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.maxIter,[100,50,200]) \
    .addGrid(lr.regParam,[0.0, 10.0, 20.0]) \
    .addGrid(lr.elasticNetParam,[0.0, 1.0]) \
    .build()

In [None]:
# Cross Validate Model
cv_model_lr = crossval(pipeline_lr, paramGrid_lr)
cv_model_lr.avgMetrics

**Results for LogisticRegression**

| **Main Parameters** | **Default** | **Grid Search** | <nbsp> | <nbsp> |**best.avgMetrics** | **Values for best** |
| :---------- | :---------- | :---------- | :---------- | :---------- | :---------- | :---------- |
| maxIter<br>regParam<br>elasticNetParam | 100<br>0.0<br>0.0 | 100<br>0.0<br>0.0 | 50<br>10.0<br>1.0 | 200<br>20.0<br>- | 0.5496865704561543 | 50, 100, or 200 <br>20.0<br>0.0 or 1.0 |

* select `maxIter`=100, `regParam`=20.0; `elascticNetParam`=0.0

In [None]:
# Tune Model
lr_tuned = LogisticRegression(maxIter=100,regParam=20.0,elasticNetParam=0.0)
tuned_model_lr =  pipeline_build(lr_tuned)

In [None]:
# Run Prediction
tuned_prediction_lr = prediction(tuned_model_lr)

In [None]:
# Measure Results
tuned_result_lr = test_prediction(tuned_prediction_lr)
tuned_result_lr

#### 5.3b RandomForestClassifier

In [None]:
# Define ParamGrid
paramGrid_rfc = ParamGridBuilder() \
    .addGrid(rfc.maxDepth,[5, 3, 7]) \
    .addGrid(rfc.numTrees,[20,10,40]) \
    .build()

In [None]:
# Cross Validate Model
cv_model_rfc = crossval(pipeline_rfc, paramGrid_rfc)
cv_model_rfc.avgMetrics

**Results for RandomForestClassifier**

| **Main Parameters** |**Default** | **Grid Search** | <nbsp> | <nbsp> | **best.avgMetrics** | **Params for best.avgMetrics** |
| :---------- | :---------- | :---------- | :---------- | :---------- | :---------- | :---------- |
| maxDepth<br>numTrees | 5<br>20 | 5<br>20 | 3<br>10 | 7<br>40 | 0.5917140937445151 |  5<br>10 |

* select `maxDepth`=5, `numTrees`=10

In [None]:
# Tune Model
rfc_tuned = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=5, maxDepth=10)
tuned_model_rfc =  pipeline_build(rfc_tuned)

In [None]:
# Run Prediction
tuned_prediction_rfc = prediction(tuned_model_rfc)

In [None]:
# Measure Results
tuned_result_rfc = test_prediction(tuned_prediction_rfc)
tuned_result_rfc

#### 5.3c GBTClassifier

In [None]:
# Define ParamGrid
#paramGrid_gbt = ParamGridBuilder() \
#    .addGrid(gbt.maxDepth,[5, 3, 7]) \
#    .addGrid(gbt.maxIter,[20, 10, 40]) \
#    .build()

In [None]:
# Cross Validate Model
# cv_model_gbt = crossval(pipeline_gbt, paramGrid_gbt)
# cv_model_gbt.avgMetrics

**Note:** durung the development of this project it was found that cross-validation for GBTClassifier was very resource intensive, with several hours passing without result when running a cross-validation of 3 values for each of 2 parameters, even when setting the parallism  parameter to a value higher than default 1. To expediate tuning, it was decided to abandon cross-validation for GBTClassifier and find the best result by manual trial of the grid search parameter values

**Manual Optimisation for GBTClassifier**

In [None]:
# Define GBTClassifier Model Candidates:
candidate_1 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=5,maxIter=10)
candidate_2 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=5,maxIter=40)
candidate_3 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=3,maxIter=20)
candidate_4 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=3,maxIter=10)
candidate_5 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=3,maxIter=40)
candidate_6 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=7,maxIter=20)
candidate_7 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=7,maxIter=10)
candidate_8 = GBTClassifier(labelCol="label", featuresCol="features",maxDepth=7,maxIter=40)

In [None]:
# Build trial Pipelines for each Candidate
trial_model_1 =  pipeline_build(candidate_1)
trial_model_2 =  pipeline_build(candidate_2)
trial_model_3 =  pipeline_build(candidate_3)
trial_model_4 =  pipeline_build(candidate_4)
trial_model_5 =  pipeline_build(candidate_5)
trial_model_6 =  pipeline_build(candidate_6)
trial_model_7 =  pipeline_build(candidate_7)
trial_model_8 =  pipeline_build(candidate_8)

In [None]:
# Run Prediction Try 1
trial_prediction_1 = prediction(trial_model_1)

In [None]:
# Run Prediction Try 2
trial_prediction_2 = prediction(trial_model_2)

In [None]:
# Run Prediction Try 3
trial_prediction_3 = prediction(trial_model_3)

In [None]:
# Run Prediction Try 4
trial_prediction_4 = prediction(trial_model_4)

In [None]:
# Run Prediction Try 5
trial_prediction_5 = prediction(trial_model_5)

In [None]:
# Run Prediction Try 6
trial_prediction_6 = prediction(trial_model_6)

In [None]:
# Run Prediction Try 7
trial_prediction_7 = prediction(trial_model_7)

In [None]:
# Run Prediction Try 8
trial_prediction_8 = prediction(trial_model_8)

In [None]:
# Measure Results Try 1
test_result_1 = test_prediction(trial_prediction_1)

In [None]:
# Measure Results Try 2
test_result_2 = test_prediction(trial_prediction_2)

In [None]:
# Measure Results Try 3
test_result_3 = test_prediction(trial_prediction_3)

In [None]:
# Measure Results Try 4
test_result_4 = test_prediction(trial_prediction_4)

In [None]:
# Measure Results Try 5
test_result_5 = test_prediction(trial_prediction_5)

In [None]:
# Measure Results Try 6
test_result_6 = test_prediction(trial_prediction_6)

In [None]:
# Measure Results Try 7
test_result_7 = test_prediction(trial_prediction_7)

In [None]:
# Measure Results Try 8
test_result_8 = test_prediction(trial_prediction_8)

**Results from GBTClassifier Trials**

In [None]:
# Tabulate the results
tuning_gbt_0_pd = pd.DataFrame.from_dict(base_result_gbt, orient='index', columns=['Default 5,20'])
tuning_gbt_1_pd = pd.DataFrame.from_dict(test_result_1, orient='index', columns=['Trial_1 5,10'])
tuning_gbt_2_pd = pd.DataFrame.from_dict(test_result_2, orient='index', columns=['Trial_2 5,40'])
tuning_gbt_3_pd = pd.DataFrame.from_dict(test_result_3, orient='index', columns=['Trial_3 3,20'])
tuning_gbt_4_pd = pd.DataFrame.from_dict(test_result_4, orient='index', columns=['Trial_4 3,10'])
tuning_gbt_5_pd = pd.DataFrame.from_dict(test_result_5, orient='index', columns=['Trial_5 3,40'])
tuning_gbt_6_pd = pd.DataFrame.from_dict(test_result_6, orient='index', columns=['Trial_6 7,20'])
tuning_gbt_7_pd = pd.DataFrame.from_dict(test_result_7, orient='index', columns=['Trial_7 7,10'])
tuning_gbt_8_pd = pd.DataFrame.from_dict(test_result_8, orient='index', columns=['Trial_8 7,40'])

tuning_gbt_pd = tuning_gbt_0_pd\
    .join(tuning_gbt_1_pd, how='left')\
    .join(tuning_gbt_2_pd, how='left')\
    .join(tuning_gbt_3_pd, how='left')\
    .join(tuning_gbt_4_pd, how='left')\
    .join(tuning_gbt_5_pd, how='left')\
    .join(tuning_gbt_6_pd, how='left')\
    .join(tuning_gbt_7_pd, how='left')\
    .join(tuning_gbt_8_pd, how='left')

tuning_gbt_pd.index.names = [' ']
tuning_gbt_pd.reset_index(level=0, inplace=True)

In [None]:
fig,ax = render_mpl_table(tuning_gbt_pd, header_columns=0, col_width=2.7)
fig.savefig("tuning_gbt_pd.png")

**Select Best Model based on Results from GBTClassifier Trials**

| **Main Parameters** | **Default** | **Grid Search** | <nbsp> | <nbsp> |  **Params for best** |
| :---------- | :---------- | :---------- | :---------- | :---------- | :---------- |
| maxDepth<br>maxIter | 5<br>20 | 5<br>20 | 3<br>10  | 7<br>40 | 3<br>40  |

* select `maxDepth`=3, `maxIter`=40

In [None]:
# best Result
tuned_result_gbt = test_result_5

### 5.4 Summary of Results and Insights for Tuned Models
#### Results:

In [None]:
# Tabulate the results
tuned_result_pd_lr = pd.DataFrame.from_dict(tuned_result_lr, orient='index', columns=['LogisticRegression_Tuned'])
tuned_result_pd_rfc = pd.DataFrame.from_dict(tuned_result_rfc, orient='index', columns=['RandomForestClassifier_Tuned'])
tuned_result_pd_gbt = pd.DataFrame.from_dict(tuned_result_gbt, orient='index', columns=['GBTClassifier_Tuned'])

tuned_result_pd = tuned_result_pd_lr\
    .join(tuned_result_pd_rfc, how='left')\
    .join(tuned_result_pd_gbt, how='left')

tuned_result_pd.index.names = [' ']
tuned_result_pd.reset_index(level=0, inplace=True)

In [None]:
fig,ax = render_mpl_table(tuned_result_pd, header_columns=0, col_width=5.0)
fig.savefig("tuned_result_pd.png")

#### Observations
* Area under the curves for LogisticRegression improved when using the best Paramater Set from the CrossValidation. However the F1 Score and Balanced Accuracy increased.
* Area under the curves for RandomForestClassifier decreased when using the best Paramater Set from the CrossValidation. However the F1 Score and Balanced Accuracy increased.
* Area under the curves for GBTClassifier decreased when using the best Paramater Set from the CrossValidation. However the F1 Score and Balanced Accuracy increased.

#### Interpretations
The results of the CrossValidation are counter-intuitive, in that the so-called best models are performing less well for some indicators compared to the default values, which were also included in the Parameter Grid.However the with the manual process used on GBTClassifier it was possible to find some parameters that led to an all round improvement.<br><br>
It should be remembered however that the `.avgMetrics` that are given are average values measured over the 3 folds of the cross validation.<br><br>
Each fold uses a different subset of the training data. The data has already been shown to be biased, and depending on how these subsets are selected, the bias could be exaggerated, leading to one or more subsets with an unrepresentative proportion of _churn_:_stayed users_, or a difference in this proportion between the subsets. Either way the model will be learning differently for each fold.<br><br>
If one of the folds then delivers an abnormally low value because of this mismatch, it will effect the average in such a way that a misleading `.avgMetric` is given, which in turn leads to an incorrect selection of parameter when tuning.
The training data used in the cross validation consists of 90% of the aggregated dataframe approximately 200 rows, but the testing data is much smaller at 10% or only about 25 rows. This is a very small sample size with a very low statistical significance, which could lead to different outputs each time the prediction is run.<br><br>
The metrics also only apply to the evaluators definitions, in this case `BinaryClassificationEvaluator` which only considers two metrics, Area under PR and Area under ROC. `F1 Score` has been measured "manually".<br><br>
This could be a reason why the tuned models are performing less well for some indicators but better for others, when compared to the default models.<br><br>
With the manual process used to optimise GBTClassifier, no such subsetting took place, which is equivalent to only one fold taking place. Whilst this delivers a desirable result with improved accuracy values for the prediction, it does not guarentee that the result is repeatable, particularly when applying it to a prediction based on the full 12GB dataset. This would need to be verified with a trial by running the prediction as a Spark cluster on the cloud using AWS or IBM Cloud.<br><br>
An additional point to consider is that we are checking our prediction of userIds who will churn against a list of `userIds` who did churn. It is possible, that some of the false positives in the prediction aren't actually false. The data in the sample was collected over months, which is a relatively short period of time. It could be that the model has indeed identified a user who will churn, but who has not done so yet, according to the sample data, because of the short collection period. A false positive may not necessarily be a bad thing.

# Final Steps
# Recommended Model and Further Work
Based on the results such that they are the recommendation is to use the GBTClassifier Model, with Hyperparameters `maxDepth` and `maxIter` tuned to 3 and 40 respectively.<br>
It out performs both other models in the default settings as well as appearing to improve with tuning<br>
However, for the reasons discussed above in _Interpretations_, before deploying the model into production it should first be verified with the full data set using a Spark cluster in the cloud, to prove its reliabilty, if necessary taking steps to tune it further.
Nevertheless it has the potential to deliver a solution for predicting customer _churns_.
