# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [100]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import avg, col, desc, min, max, udf
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType, StringType, FloatType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import StringIndexerModel, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline


In [101]:
# create a Spark session locally

spark = SparkSession.builder \
    .master('local') \
    .appName('Churn Prediction') \
    .getOrCreate()

# Load and Clean Dataset
Clean your dataset, checking for invalid or missing data. For example, records without userids or sessionids. In this workspace, the filename is `mini_sparkify_event_data.json`.

In [102]:
event_data = 'mini_sparkify_event_data.json'

In [103]:
df = spark.read.json(event_data)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [None]:
# Inspect the data 
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [None]:
# Inspect the schema
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
# Find a total number of rows
df.count()

286500

In [None]:
# Find the statistics
df.describe().show()

+-------+------------------+----------+---------+------+------------------+--------+-----------------+------+-----------------+------+-------+--------------------+-----------------+--------------------+------------------+--------------------+--------------------+-----------------+
|summary|            artist|      auth|firstName|gender|     itemInSession|lastName|           length| level|         location|method|   page|        registration|        sessionId|                song|            status|                  ts|           userAgent|           userId|
+-------+------------------+----------+---------+------+------------------+--------+-----------------+------+-----------------+------+-------+--------------------+-----------------+--------------------+------------------+--------------------+--------------------+-----------------+
|  count|            228108|    286500|   278154|278154|            286500|  278154|           228108|286500|           278154|286500| 286500|            

In [None]:
# Look atcounts of each column
df.describe().show(1, vertical=True)

In [None]:
# Inspect unique available userIds
df.select('userId').dropDuplicates().sort('userId').show()

In [None]:
df.select('userId').groupby(df.userId).count().sort('userId').show()

In [None]:
# Drop missing values in the userId columns
df_valid = df.filter(df.userId != '')

In [None]:
# Inspect the counts of each column again
df_valid.describe().show(1, vertical=True)

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
# Create a udf function to identify churn by flagging an event 
# with the "Cancellation Confirmation" page

is_churned = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())


In [None]:
# Add a new column 'isChurned' by passing a value in the 'page' column to the udf function
# created above (is_churned)
df_valid = df_valid.withColumn('isChurned', is_churned("page"))

In [None]:
# Then, inspect the data
df_valid.head()

In [None]:
# Then, Identify events of users who are churned by 
# 1. Create a window object partitioned by a userId, ordered descendingly by a timestamp 'ts'
# 2. Then, utilize the pyspark.sql.functions.sum function (Fsum) to perform an accumulate sum
# within a window and assign its value to a new column 'willBeChurned'
# 3. With the steps above, events with users who are churned 

windowval = Window.partitionBy("userId") \
                .orderBy(desc("ts")) \
                .rangeBetween(Window.unboundedPreceding, 0)

df_valid = df_valid.withColumn("willBeChurned", Fsum("isChurned").over(windowval))


In [None]:
# Create a datetime to help with the analysis
get_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
df_valid = df_valid.withColumn('datetime', get_datetime(df_valid.ts))


In [None]:
# Inspect the value
df_valid.filter('willBeChurned == 1') \
    .select('userId', 'datetime', 'page', 'level', 'willBeChurned') \
    .show()

### Compare page counts of events with and without churns

The comparison plot between a percentage of each page for churned and not churned users below shows that the page of most events are 'NextPage' and the plot does not show much difference for each page type

In [None]:
# Get page count of events with and without churns
df_churn_page = df_valid.groupby('willBeChurned', 'page') \
    .count() \
    .orderBy(desc('willBeChurned'), desc('count'))

In [None]:
# And inspect the raw data
df_churn_page.show(50)

In [None]:
# Visualize the results by converting data to pandas first
pd_churn_page = df_churn_page.toPandas()

In [None]:
# Then, find a total counts of each type
total_churn_page = pd_churn_page.groupby('willBeChurned')['count'].sum()
total_churn_page

In [None]:
# Then, create a new column called 'percentage' to store a percentage number of each page count
def get_page_count_percentage(row):
    return (row['count'] / total_churn_page[row['willBeChurned']])*100

pd_churn_page['percentage'] = pd_churn_page.apply(lambda row: get_page_count_percentage(row), axis=1)

In [None]:
# Show a pivot table to better visualize the data
pd_churn_page.pivot(index='willBeChurned', columns='page', values='percentage')


In [None]:
# Finally, use the newly created pandas data frame to create a comparison plot
# to show a difference between a percentage of each page type by churn
plt.figure(figsize=(8,6))

sns.barplot(x='percentage', y='page', hue='willBeChurned', data=pd_churn_page);
plt.title("Comparison of a percentage of event counts of each page type for events with/without churn");

### Inspect a number of songs played in each hour

The plot below shows a similar trend between a number of songs played in each hour of events of users who are/are not churned

In [None]:
# Creat an hour column by calculating Static hour using the 'ts' column
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
df_valid = df_valid.withColumn('hour', get_hour('ts'))

In [None]:
def plot_NumberOfSongsInHour(willBeChurned):
    '''
    Plot a number of songs in hour for events with the specified willBeChurned value
    '''
    
    # Get a number of songs in hour for a particular data set
    songs_in_hour = df_valid.filter(df_valid.willBeChurned == willBeChurned) \
        .groupby('hour') \
        .count() \
        .orderBy(df_valid.hour.cast("float"))

    
    # Convert to pandas and create a scatter plot

    pd_songs_in_hour = songs_in_hour.toPandas()
    pd_songs_in_hour.hour = pd.to_numeric(pd_songs_in_hour.hour)

    plt.scatter(pd_songs_in_hour["hour"], pd_songs_in_hour["count"]);
    plt.xlim(-1, 24);
    plt.ylim(0, 1.2 * pd_songs_in_hour["count"].max())
    plt.xlabel("Hour")
    plt.ylabel("Songs played");

In [None]:
# Create a scatter plot to inspect The number of songs in hours of events of not churned users
plot_NumberOfSongsInHour(0)
plt.title('The number of songs in hours of events of NOT churned users');

In [None]:
# Create a scatter plot to inspect The number of songs in hours of events of not churned users
plot_NumberOfSongsInHour(1)
plt.title('The number of songs in hours of events of CHURNED users');

### Inspect a ratio of each gender

The data below shows that it is more likely for Male to be churned than a female

In [None]:
df_gender = df_valid.select("userId", "gender", 'willBeChurned') \
    .dropDuplicates() \
    .groupby('willBeChurned', 'gender') \
    .count() 

In [None]:
df_gender.orderBy('willBeChurned', 'gender').show()

In [None]:
pd_gender = df_gender.toPandas()

In [None]:
pd_gender_pivot = pd_gender.pivot(index='willBeChurned', columns='gender', values='count')
total_counts = pd_gender_pivot.sum(axis=1)
pd_gender_pivot = pd_gender_pivot.div(total_counts, axis=0)
pd_gender_pivot

In [None]:
pd_gender_pivot.plot.barh();
plt.title('Comparison of ratio of gender counts of users who are/are not churned');

### Inspect session time

The statistics of session time below shows that 
* The average of session time of users who are churned is around 283 minutes/session 
* The the average of session time of users who are NOT  churned is around 302 minutes/session

In [None]:
# Filter data based on its willBeChurned status first 
df_churn = df_valid.filter('willBeChurned == 1')
df_not_churn = df_valid.filter('willBeChurned == 0')

In [None]:
def get_session_time(df, showSummary=False):
    '''
    Get a number of registration days of the given data
    
    Input Argument:
        df:   Sparkify dataframe
        
    Optional Input Argument:
        showSummary: Show summary statistics of the output data fram. Default is False.
        
    Output Argument:
        df_sessionTime: Spark dataframe contains sessionTime information of each userId and sessionId
    '''
    
    # Find a session time by grouping events by userId and sessionId
    # and then find a difference between the min and max timestamp
    df_sessionTime = df.groupby("userId", "sessionId") \
        .agg(((max(df.ts)-min(df.ts))/(1000*60)) \
        .alias("sessionTime"))
    
    if showSummary:
        # Print the statistics
        df_sessionTime.select('sessionTime').describe().show();

        # Create a box plot
        df_sessionTime.select('sessionTime').toPandas().boxplot();
    
    return df_sessionTime;
    

In [None]:
df_sessionTime_churn = get_session_time(df_churn, showSummary=True)
plt.title('Distribution of session time of users who are churned');

In [None]:
df_sessionTime_not_churn = get_session_time(df_not_churn, showSummary=True)
plt.title('Distribution of session time of users who are NOT churned');

### Inspect a number of songs per session

The statistics of session time below shows that 
* The average of songs played in each session of users who are churned is around 70 songs
* The average of songs played in each session of users who are NOT churned is around 75 songs

In [None]:
def get_songs_per_session(df, showSummary=False):
    '''
    Get a number of songs played in each userId and sessionId
    
    Input Argument:
        df:   Sparkify dataframe
        
    Optional Input Argument:
        showSummary: Show summary statistics of the output data fram. Default is False.
        
    Output Argument:
        df_songs_per_session: Spark dataframe contains information of each userId and sessionId
    '''
    
    # Filter only events with the "NextSong" page
    # Then, group them by userId and sessionId and find the counts
    df_songs_per_session = df.filter(df.page=="NextSong") \
        .groupby("userId", "sessionId") \
        .count()
    
    if showSummary:
        # Print the statistics
        df_songs_per_session.select('count').describe().show();

        # Create a box plot
        df_songs_per_session.select('count').toPandas().boxplot();
    
    return df_songs_per_session;



In [None]:
df_songs_per_session_churn = get_songs_per_session(df_churn, showSummary=True);
plt.title('Distribution of a number of songs per session of users who are churned');

In [None]:
df_songs_per_session_not_churn = get_songs_per_session(df_not_churn, showSummary=True)
plt.title('Distribution of a number of songs per session of users who are churned');

### Inspect a number of registration days

The data below shows that a number of registration days of users who are churned are around 57 days

In [None]:
def get_user_reg_days(df, showSummary=False):
    '''
    Get a number of registration days of the given data
    
    Input Argument:
        df:   Sparkify dataframe
        
    Optional Input Argument:
        showSummary: Show summary statistics of the output data fram. Default is False.
        
    Output Argument:
        user_reg_days: Spark dataframe contains information of a number of registration days
    '''
    
    # Get a max timestamp of each ID first
    user_max_ts = df.groupby("userId").max("ts").sort("userId")

    # Get the registration timestamp. We only need to get it from the first data 
    # since they will all be the same
    user_reg_ts = df.select("userId", "registration").dropDuplicates().sort("userId")

    # Join the max timestamp and registration timestamp table
    join_reg_max_ts = user_reg_ts.join(user_max_ts, (user_reg_ts.userId == user_max_ts.userId))

    # Then, select
    # 1) The userId column
    # 2) Computed a number of registration days from the max and registration columns
    user_reg_days = join_reg_max_ts.select(user_reg_ts["userId"], 
                                           ((user_max_ts["max(ts)"]-user_reg_ts["registration"])/(1000*60*60*24))
                                            .alias("NumRegDays"))
    
    if showSummary:
        # Print the statistics
        user_reg_days.select('NumRegDays').describe().show();

        # Create a boxplot of the statistics
        user_reg_days.select('NumRegDays').toPandas().boxplot();

    return user_reg_days

In [None]:
# Get a number of registration days and statistics of users who are churned
user_reg_days = get_user_reg_days(df_churn, showSummary=True)
plt.title('The number of registration days of users who are churned ');

In [None]:
# Get a number of registration days and statistics of users who are NOT churned
user_reg_days = get_user_reg_days(df_not_churn, showSummary=True)
plt.title('The number of registration days of users who are churned ')

### Inspect the last level

The data below shows that a ratio of free/paid users who are/are not churned are similar (around 55/45)

In [None]:
def get_user_level(df, showSummary=False):
    '''
    Get data of the last user level (paid/free) of a given data
    
    Input Argument:
        df:   Sparkify dataframe
    
    Optional Input Argument:
        showSummary: Show summary statistics of the output data fram. Default is False.
        
    Output Argument:
        user_reg_days: Spark dataframe contains data of the last user level (paid/free) of a given data
    '''
    
    # Get a max timestamp of each ID first
    user_max_ts = df.groupby("userId").max("ts").sort("userId")

    # Get the registration timestamp. We only need to get it from the first data 
    # since they will all be the same
    user_level = df.select("userId", "level").dropDuplicates()
    
    
    if showSummary:
        # we will show a summary of counts grouped by 'level' here
        total_user_level = user_level.count()

        group_level = user_level.groupby('level').count()
        get_ratio = udf(lambda x: x*1.0/total_user_level, FloatType())
        df_level = group_level.withColumn('ratio', get_ratio("count"))
        df_level.show()
    
    return user_level;

In [None]:
# Get a number of last level of users who are churned
get_user_level(df_churn, showSummary=True)

In [None]:
# Get a number of last level of users who are NOT churned
get_user_level(df_not_churn, showSummary=True)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

Based on the Exploratory Data Analysis (EDA) above, we will use the following information to create features that will be used to train our model:
* Gender of the user
* Average session time of each user
* Average number of songs played in each session of each user
* How long the user has been registered
* The last level (paid/free)

**Gender of the user**


In [None]:
# Drop duplicate first because the gender information will not be changed
df_gender = df_valid.dropDuplicates(['userId']).sort('userId').select(['userId','gender'])

# Then, use StringIndexerModel to create an index for gender

fromlabelsModel = StringIndexerModel.from_labels(["F", "M"], \
    inputCol="gender", outputCol="gender_index", handleInvalid="error")
df_gender = fromlabelsModel.transform(df_gender)

# Only extracted interested columns
df_gender = df_gender.select('userId', df_gender.gender_index.cast("int"))
df_gender.head()

**Average session time of each user**


In [None]:
def scale_column_with_StandardScaler(df, column_name):
    '''
    Scale the specified column name with the StandardScaler
    
    Input arguments:
        df:           Spark dataframe
        column_name:  Column name with numerical value to be scaled with the StandardScaler
    
    Output argument:
        df_scaled:    New Spark dataframe with the "<column_name>_vec" and "<column_name>_scaled" columns
    '''
    
    # Create a vector of sessionTime first before scaling it
    column_vec = column_name + '_vec'
    assembler = VectorAssembler(inputCols=[column_name], outputCol=column_vec)
    df_scaled = assembler.transform(df)

    # Use the StandardScale to scale the session time
    column_scaled = column_name + '_scaled'
    scaler = StandardScaler(inputCol=column_vec, 
                            outputCol=column_scaled, 
                            withStd=True)

    # Then, select only the interested columns
    df_scaled = scaler.fit(df_scaled).transform(df_scaled)
    
    return df_scaled;
    

In [None]:
def get_avg_column_by_user(df, column_name):
    '''
    Get average values of a particular column grouped by userId
    
    Input argument:
    df :  Sparkify dataframe
    column_name: The interested column name that we want to get average values for
    
    Output argument:
    df_avg: Spark dataframe with average values information stored in a column "avg_<column_name>""
    '''
    df_avg = df.groupby('userId') \
                .agg(avg(column_name)) \
                .select('userId', col('avg(' + column_name + ')').alias('avg_' + column_name))
    
    return df_avg
    

In [None]:
# Find the average session time first
df_sessionTime = get_session_time(df_valid)
df_sessionTime_avg = get_avg_column_by_user(df_sessionTime, 'sessionTime')


In [None]:
# Inspect the results
df_sessionTime_avg.show(5)

In [None]:
# Then, scaled it
df_sessionTime_avg_scaled = scale_column_with_StandardScaler(df_sessionTime_avg, 'avg_sessionTime')
df_sessionTime_avg_scaled = df_sessionTime_avg_scaled.select('userId', 'avg_sessionTime_scaled')


In [None]:
# Inspect the result
df_sessionTime_avg_scaled.show(5)

**Average number of songs played in each session of each user**

In [None]:
# Get data of a number of songs per session
df_song = get_songs_per_session(df_valid)

In [None]:
# Get an average number of songs per session of each userId
df_song_avg = get_avg_column_by_user(df_song, 'count')
# Rename the column to be more specific
df_song_avg = df_song.select('userId', col('count').alias('num_songs'))

In [None]:
# Then, scale data in the 'num_songs' column with the StandardScaler
df_song_avg_scaled = scale_column_with_StandardScaler(df_song_avg, 'num_songs')

# And only select the userId and the scaled column
df_song_avg_scaled = df_song_avg_scaled.select('userId', 'num_songs_scaled')

In [None]:
# Inspect the data
df_song_avg_scaled.show(5)


**How long the user has been registered**


In [None]:
# Get data of a number of days since each user registered
df_reg_days = get_user_reg_days(df_valid)

In [None]:
# Then, scale it with the StandardScaler
df_reg_days_scaled = scale_column_with_StandardScaler(df_reg_days, 'NumRegDays')
df_reg_days_scaled = df_reg_days_scaled.select('userId', 'NumRegDays_scaled')


In [None]:
# Inspect the data
df_reg_days_scaled.show(5)

**The last level (paid/free)**

In [None]:
# Get data of the last level of each user
df_level = get_user_level(df_valid)

In [None]:
# Then, transform it

fromlabelsModel = StringIndexerModel.from_labels(["free", "paid"], \
    inputCol="level", outputCol="level_index", handleInvalid="error")
df_level = fromlabelsModel.transform(df_level)

# Only extracted interested columns
df_level = df_level.select('userId', df_level.level_index.cast("int"))


In [None]:
# Inspect data
df_level.show(5)

After transforming/scaling the interested columns, join them with dataframe with the churn dataframe using the 'userId' column

In [None]:
# Create the label (churn) data first
df_combined = df_valid.dropDuplicates(['userId']) \
                .sort('userId') \
                .select(['userId', col('willBeChurned').alias('label').cast("int")])


In [None]:
# Iterate through each data frame that we created earlier and joining them with the label dataframe
feature_list = [    
    df_gender, 
    df_sessionTime_avg_scaled, 
    df_song_avg_scaled, 
    df_reg_days_scaled, 
    df_level
]

for feature in feature_list:
    df_combined = df_combined.join(feature,'userId')

In [None]:
# Inspect the schema of the combined data frame first
df_combined.printSchema()

Now, let's create a dataframe that will be used in the modeling step by 
1. Combining values in the feature columns into a single DenseVector and named it 'features'
2. Create a data frame 'df_final' by selecting only the 'features' and 'label' columns

In [None]:
assembler = VectorAssembler(inputCols=df_combined.columns[2:-1], outputCol="features")
df_combined = assembler.transform(df_combined)

In [None]:
df_combined.head()

In [None]:
df_final = df_combined.select('features', 'label')

In [None]:
df_final.show(5)

In [None]:
# Write out data for a future use
out_path = "final_data.csv"
df_final.write.save(out_path, format="csv", header=True)

In [None]:
# df_final = spark.read.csv(out_path, header=True)
df_final.persist()

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

First, let's split the data set into 80% of training data and set aside 20%. Set random seed to 42.

In [None]:
train, test = df_final.randomSplit([0.8, 0.2], seed=42)

#### LogisticRegression

Let's first start with the LogisticRegression

In [None]:
lr =  LogisticRegression()

In [None]:
# Since we already transform data in the previous step, 
# we will not need to do it in the pipeline

pipeline = Pipeline(stages=[lr])

In [None]:
# Then, create a ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0]) \
    .addGrid(lr.regParam,[0.0, 0.01, 0.1]) \
    .build()

# And use pipeline and paramGrid to construct a CrossValidator object
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

In [None]:
# Fit with the train data
print('[{}] Start'.format(datetime.datetime.now()))

cvModel_lr = crossval.fit(train)

print('[{}] Done'.format(datetime.datetime.now()))

In [None]:
# Find the average metrics
cvModel_lr.avgMetrics

In [None]:
# Now, get a predicted value of the test data
results_lr = cvModel_lr.transform(test)

In [None]:
# Next, check the accuracy of the results
print(results_lr.filter(results_lr.label == results_lr.prediction).count())
print(results_lr.count())

#### Gradient-boosted tree classifier

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.