<a href="https://colab.research.google.com/github/nikyan/spark_customer_churn/blob/master/sparkify_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sparkify Project Pipeline

This Jupyter Notebook contains the execution script for loading the full Sparkify dataset (12GB), extracting features, running machine learning models to predict 'churn' and finally tuning the model to achieve best results.

The Notebook is run on AWS EMR Spark Cluster.

## Import Dependencies & Libraries 

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirrors.hoobly.com/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, col, concat, count, lit, isnan, when, desc, split, trim
import pyspark.sql.functions as F
from pyspark.sql.types import *
import datetime

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import Normalizer, MinMaxScaler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#%matplotlib inline

## Feature Extraction

Following are all the functions that are used to extract features from the dataset and make a dataframe ready for machine learning algorithms.

In [0]:
def load_data(filename):
    
    '''
    Funtion to load data and remove null, empty strings.
    INPUT
    filename = name of file as well the path
    OUTPUT
    df - a spark dataframe with no null rows for primary key
    '''
    
    df = spark.read.json(filename)
    # remove empty string from userId
    df = df.filter(df.userId != "")
    print("Count of rows in dataframe: {}".format(df.count()))
          
    return df

In [0]:
def add_date_columns(df):
    '''
    Funtion to add date/time related columns to the dataframe.
    INPUT - a spark dataframe
    OUPUT - a spark dataframe with calculated fields: hour, month, year, day
    '''
    # create a function to get hour, month, year, day from timestamp
    get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
    get_month = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).month)
    get_year = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).year)
    get_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
    
    # add hour, month, year, day columns to the dataframe
    df = df.withColumn("hour", get_hour(df.ts))
    df = df.withColumn("month", get_month(df.ts))
    df = df.withColumn("year", get_year(df.ts))
    df = df.withColumn("day", get_day(df.ts))
    
    
    return df

In [0]:
def get_flag_churn(df):
    '''
    Funtion to add churn flag that identifies whether a user has churned.
    INPUT - a spark dataframe
    OUPUT - a spark dataframe with userId of each user, churn flag and gender
    '''
    
    # function to flag 'cancellation confirmation' event
    flag_cancel_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
    df = df.withColumn("churn", flag_cancel_event("page"))
    user_churn = df.groupBy("userId").agg({"churn":"max", "gender":"max"})\
                           .withColumnRenamed("max(churn)", "label")\
                           .withColumnRenamed("max(gender)", "gender")
    
    #print("Count of rows: {}".format(user_churn.count()))
    return user_churn

In [0]:
def get_latest_level(df):
    '''
    Funtion to find the latest level of each user.
    INPUT - a spark dataframe
    OUPUT - a spark dataframe with userId of each user and latest level
    '''
    # use timestamp to identify the most latest record for a user
    # sort timestamp in descending order and than drop duplicates to get the last row for each user
    latest_level = df.select(["userId", "level", "ts"])\
                        .orderBy(desc("ts"))\
                        .dropDuplicates(["userId"])\
                        .select(["userId", "level"])
    #print("Count of rows: {}".format(latest_level.count()))
    return latest_level

In [0]:
def get_states(df):
    '''
    Funtion to clean location column and return cleaned state names.
    INPUT - a spark dataframe
    OUPUT - a spark dataframe with userId of each user and state from location
    '''
    # get location of each user
    state_data = df.groupBy("userId").agg({"location":"max"}).withColumnRenamed("max(location)", "state")
    # extract state
    state_data = state_data.withColumn("state", split(col("state"),',').getItem(1))
    
    #print("Count of rows: {}".format(state_data.count()))
    
    return state_data

In [0]:
def get_device(df):
    '''
    Funtion to clean userAgent column and return cleaned device/os names.
    INPUT:
    df - a spark dataframe
    OUPUT:
    device_data - a spark dataframe with userId of each user and device/os name
    '''
      
    device_data = df.groupBy("userId").agg({"userAgent":"max"}).withColumnRenamed("max(userAgent)", "device")
    device_data = device_data.withColumn("device", regexp_extract(col("device"), r'\((.*?)\)', 1));
    
    device_data = device_data.withColumn("device", split(col("device"),';').getItem(0))
    device_data = device_data.withColumn("device", split(col("device"),'NT').getItem(0))
    
    device_data = device_data.withColumn("device", trim(device_data.device))
    
    #print("Count of rows: {}".format(device_data.count()))
    
    return device_data

In [0]:
def get_days_since_reg(df):
    '''
    Funtion to get the number of days since registration
    INPUT:
    df - a spark dataframe
    OUPUT:
    days_since_reg - a spark dataframe with userId of each user and days since registration
    '''
    
    
    get_subtract_ts = udf(lambda x, y: datetime.datetime.fromtimestamp((y - x) / 1000.0).day, IntegerType())

    days_since_reg = df.groupBy("userId").agg({"registration":"max", "ts":"max"})\
                                         .withColumnRenamed("max(registration)", "max_reg")\
                                         .withColumnRenamed("max(ts)", "max_ts")
    
    days_since_reg = days_since_reg.withColumn("days_since_reg", when(col('max_reg') != 0, get_subtract_ts(col('max_reg'), col('max_ts'))).otherwise(0))
    
    days_since_reg = days_since_reg.drop("max_reg")
    days_since_reg = days_since_reg.drop("max_ts")
    
    #print("Count of rows: {}".format(days_since_reg.count()))
    
    return days_since_reg

In [0]:
def get_avg_count_session(df):
    '''
    Funtion to get the average number of sessions per month for a user
    INPUT:
    df - a spark dataframe
    OUPUT:
    avg_count_session - a spark dataframe with userId of each user and average monthly count of sessions
    '''
    mon_count_session = df.groupBy("userId", "month").agg(countDistinct("sessionId"))\
                          .groupBy("userId").agg(avg("count(DISTINCT sessionId)"))\
                          .withColumnRenamed("avg(count(DISTINCT sessionId))", "avg_mon_session_count")
    
    #print("Count of rows: {}".format(mon_count_session.count()))
    
    return mon_count_session

In [0]:
def get_avg_duration_session(df):
    '''
    Funtion to get the average number of sessions per month for a user
    INPUT:
    df - a spark dataframe
    OUPUT:
    avg_duration_session - a spark dataframe with userId of each user and average monthly count of sessions
    '''
    
    mon_sess_duration = df.groupBy("userId", "month").agg(max("ts"), min("ts"))\
                          .withColumn("duration", (col("max(ts)") - col("min(ts)"))/1000)\
                          .groupBy("userId").agg(avg("duration"))\
                          .withColumnRenamed("avg(duration)", "avg_mon_sess_duration")
    
    #print("Count of rows: {}".format(mon_sess_duration.count()))
    
    return mon_sess_duration

In [0]:
def get_avg_page_views(df):
    '''
    Funtion to get the average number of page views per month for a user
    INPUT:
    df - a spark dataframe
    OUPUT:
    avg_page_views - a spark dataframe with userId of each user and average monthly page views with each page pivoted
    '''
    # get avg of page views per month
    avg_mon_page_views = df.groupBy("userId", "page", "month").agg(count("page"))\
                           .groupBy("userId", "page").agg(avg("count(page)"))\
                           .withColumnRenamed("avg(count(page))", "avg_page_count")
    
    # clean up page column
    avg_mon_page_views = avg_mon_page_views.withColumn("page", trim(avg_mon_page_views.page))
    avg_mon_page_views = avg_mon_page_views.withColumn("page", regexp_replace(avg_mon_page_views.page, " ", "_"))
    
    # add prefix to each page name
    add_prefix = udf(lambda x: "avg_mon_" + x)
    avg_mon_page_views = avg_mon_page_views.withColumn("page", add_prefix(avg_mon_page_views.page))
    
    # pivot page names to get columns
    avg_mon_page_views = avg_mon_page_views.groupBy("userId").pivot("page").max("avg_page_count")
    avg_mon_page_views = avg_mon_page_views.fillna(0)
    
    #print("Count of rows: {}".format(avg_mon_page_views.count()))
    
    return avg_mon_page_views

## Modeling

### Data transformation for ML algorithms

In [0]:
def transform_data(df, numerical_col, categorical_col):
    '''
    Funtion to transform categorical fields and create feature vector
    INPUT:
    df - a spark dataframe
    OUPUT:
    df - a spark dataframe with transformed data containing feature vector
    '''
    
    # define stringIndexer for converting categorical to numerical variables
    # Use OneHotEncoderEstimator for converting categorical variables to onehot encoded vectors
    # Apply VectorAssembler to create feature vector
    
    stages = []
    for col in str_cols:
        stringIndexer = StringIndexer(inputCol = col, outputCol = col + '_index', handleInvalid = 'keep')
        #encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[col + "_classVec"])
        #stages += [stringIndexer, encoder]
        stages += [stringIndexer]

    assemblerInputs = [c + "_index" for c in str_cols] + num_cols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]
    
    # # Normalize each Vector
    # normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
    # stages += [normalizer]

    # Rescale feature vector
    # scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",   withStd=True, withMean=False)
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    stages += [scaler]
    
    # use pipeline to transform data
    cols = df.columns
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df)
    df = pipelineModel.transform(df)
    
    # select new 'feature' column and rest of columns
    selectedCols = ['features', 'scaledFeatures'] + cols
    df = df.select(selectedCols)
    
    return df

### Baseline models testing

In [0]:
def model_testing(model, train, test):
    '''
    Funtion to test baseline machine learning algorithms
    INPUT:
    model - instantiated model object
    train - training dataset
    test - testing dataset
    OUPUT:
    score - F1 score to measure performance of the algorithm
    '''
    
    
    cl_model = model.fit(train)
    predict = cl_model.transform(test)

    # f1, weightedPrecision, weightedRecall, accuracy
    evaluator = MulticlassClassificationEvaluator(metricName='f1')
    score = evaluator.evaluate(predict)

    return predict, score

### Parameter tuning

In [0]:
def model_tuning(train, test, model, paramGrid, numFolds):
    '''
    Funtion to tune a machine learning algorithm using cross validation
    INPUT:
    model - instantiated model object
    paramGrid - hyper parameters associated with the model used for tuning
    numFolds - number of cross validation folds
    train - training dataset
    test - testi dataset
    
    OUPUT:
    score - F1 score to measure performance of the algorithm
    '''
    
    
    evaluator = MulticlassClassificationEvaluator(metricName="f1")

    cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=numFolds)
    
    # fit and transform
    cvmodel = cv.fit(train)
    predict = cvmodel.transform(test)
    
    #.select("features", "label", "prediction")
    
    # get the f1 score
    score = evaluator.evaluate(predict)
    

    
    bestModel = cvmodel.bestModel
    
    #model_stages = cvmodel.stages[2]
    #print(gbtModel)  # summary only
    
    return score, predict, bestModel
    

## Execution

### 1. Load dataset

In [20]:
# link to mini dataset
path = "/content/drive/My Drive/ML data/mini_sparkify_event_data.json"

# link to full dataset
# path = "/content/drive/My Drive/ML data/sparkify_event_data.json"
# load Spark dataframe
event_data = load_data(path)

Count of rows in dataframe: 278154


In [21]:
# check first row of dataframe
event_data.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [22]:
# check for nans and nulls
event_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in event_data.columns]).show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 50046|   0|        0|     0|            0|       0| 50046|    0|       0|     0|   0|           0|        0|50046|     0|  0|        0|     0|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



In [23]:
event_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [24]:
# 'registration' field is used for calculating number of days since registration on the platform.
event_data.select("registration").dropDuplicates().sort("registration").show()

+-------------+
| registration|
+-------------+
|1521380675000|
|1526739206000|
|1526838391000|
|1528403713000|
|1528560242000|
|1528772084000|
|1528780738000|
|1529027541000|
|1529252604000|
|1529643103000|
|1529934689000|
|1530306321000|
|1530333492000|
|1530514394000|
|1530674962000|
|1530763799000|
|1530789251000|
|1531281160000|
|1531350022000|
|1531679695000|
+-------------+
only showing top 20 rows



In [0]:
# replace null in 'registration' field with 0 since this field is used in calculation.
event_data = event_data.fillna(0, subset=['registration'])

### 2. Extract features

In [26]:
%%time
# add date columns
event_data = add_date_columns(event_data)
#event_data.count()

# get churn and gender columns
user_label = get_flag_churn(event_data)

# get latest level of the user
latest_level = get_latest_level(event_data)

# extract states from location
states = get_states(event_data)

# extract device/os from userAgent
device = get_device(event_data)

# get days since registration
days_since_reg = get_days_since_reg(event_data)

# get average monthly count of sessions per user
avg_count_session = get_avg_count_session(event_data)

# get average monthly duration of sessions per user
avg_duration_session = get_avg_duration_session(event_data)

# get average monthly page views per page per user
avg_page_views = get_avg_page_views(event_data)

CPU times: user 120 ms, sys: 45.2 ms, total: 165 ms
Wall time: 9.22 s


In [27]:
%%time
# join all extracted tables
user_data_all = user_label.join(latest_level, on="userId")\
                          .join(states, on="userId")\
                          .join(device, on="userId")\
                          .join(days_since_reg, on="userId")\
                          .join(avg_count_session, on="userId")\
                          .join(avg_duration_session, on="userId")\
                          .join(avg_page_views, on="userId")

CPU times: user 9.17 ms, sys: 1.93 ms, total: 11.1 ms
Wall time: 222 ms


In [0]:
# save parquet file - save data
# user_data_all.write.parquet("/content/drive/My Drive/ML data/user_data_all_06-01.parquet")

In [28]:
# check data
user_data_all.head()

Row(userId='100010', gender='F', label=0, level='free', state=' CT', device='iPhone', days_since_reg=25, avg_mon_session_count=3.5, avg_mon_sess_duration=1232078.0, avg_mon_About=1.0, avg_mon_Add_Friend=2.0, avg_mon_Add_to_Playlist=3.5, avg_mon_Cancel=0.0, avg_mon_Cancellation_Confirmation=0.0, avg_mon_Downgrade=0.0, avg_mon_Error=0.0, avg_mon_Help=1.0, avg_mon_Home=5.5, avg_mon_Logout=2.5, avg_mon_NextSong=137.5, avg_mon_Roll_Advert=26.0, avg_mon_Save_Settings=0.0, avg_mon_Settings=0.0, avg_mon_Submit_Downgrade=0.0, avg_mon_Submit_Upgrade=0.0, avg_mon_Thumbs_Down=2.5, avg_mon_Thumbs_Up=8.5, avg_mon_Upgrade=1.0)

In [29]:
# check count
user_data_all.count()

225

In [30]:
# check for unique userId in the dataset
event_data.select("userId").dropDuplicates().count()

225

In [31]:
# check schema for all columns
user_data_all.printSchema()

root
 |-- userId: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- state: string (nullable = true)
 |-- device: string (nullable = true)
 |-- days_since_reg: integer (nullable = true)
 |-- avg_mon_session_count: double (nullable = true)
 |-- avg_mon_sess_duration: double (nullable = true)
 |-- avg_mon_About: double (nullable = false)
 |-- avg_mon_Add_Friend: double (nullable = false)
 |-- avg_mon_Add_to_Playlist: double (nullable = false)
 |-- avg_mon_Cancel: double (nullable = false)
 |-- avg_mon_Cancellation_Confirmation: double (nullable = false)
 |-- avg_mon_Downgrade: double (nullable = false)
 |-- avg_mon_Error: double (nullable = false)
 |-- avg_mon_Help: double (nullable = false)
 |-- avg_mon_Home: double (nullable = false)
 |-- avg_mon_Logout: double (nullable = false)
 |-- avg_mon_NextSong: double (nullable = false)
 |-- avg_mon_Roll_Advert: double (nullable = false)
 |-- avg_mon_Sav

In [32]:
# check for nans and nulls
user_data_all.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in user_data_all.columns]).show()

+------+------+-----+-----+-----+------+--------------+---------------------+---------------------+-------------+------------------+-----------------------+--------------+---------------------------------+-----------------+-------------+------------+------------+--------------+----------------+-------------------+---------------------+----------------+------------------------+----------------------+-------------------+-----------------+---------------+
|userId|gender|label|level|state|device|days_since_reg|avg_mon_session_count|avg_mon_sess_duration|avg_mon_About|avg_mon_Add_Friend|avg_mon_Add_to_Playlist|avg_mon_Cancel|avg_mon_Cancellation_Confirmation|avg_mon_Downgrade|avg_mon_Error|avg_mon_Help|avg_mon_Home|avg_mon_Logout|avg_mon_NextSong|avg_mon_Roll_Advert|avg_mon_Save_Settings|avg_mon_Settings|avg_mon_Submit_Downgrade|avg_mon_Submit_Upgrade|avg_mon_Thumbs_Down|avg_mon_Thumbs_Up|avg_mon_Upgrade|
+------+------+-----+-----+-----+------+--------------+---------------------+---------

### 3. Data Prep before ML

Check for imbalance in 'label' class.

In [33]:
major_df = user_data_all.filter(col("label") == 0)
minor_df = user_data_all.filter(col("label") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 3


The data is heavily skewed towards label == 0 i.e. there are way more active users than churned users. Because the prediction classs is imbalance, the ML models will be heavily skewed towards predicting label 0. In order to circumvent this situation, I will create 'weights' column to provide more weight to the minority class. 

In [0]:
# add new column weights
# use user defined ratio field to provide appropriate weights to majority and minority class
ratio = 0.9
user_data_all = user_data_all.withColumn("weights", when(col('label') == 1, ratio).otherwise(1*(1-ratio)))

In [35]:
user_data_all.head()

Row(userId='100010', gender='F', label=0, level='free', state=' CT', device='iPhone', days_since_reg=25, avg_mon_session_count=3.5, avg_mon_sess_duration=1232078.0, avg_mon_About=1.0, avg_mon_Add_Friend=2.0, avg_mon_Add_to_Playlist=3.5, avg_mon_Cancel=0.0, avg_mon_Cancellation_Confirmation=0.0, avg_mon_Downgrade=0.0, avg_mon_Error=0.0, avg_mon_Help=1.0, avg_mon_Home=5.5, avg_mon_Logout=2.5, avg_mon_NextSong=137.5, avg_mon_Roll_Advert=26.0, avg_mon_Save_Settings=0.0, avg_mon_Settings=0.0, avg_mon_Submit_Downgrade=0.0, avg_mon_Submit_Upgrade=0.0, avg_mon_Thumbs_Down=2.5, avg_mon_Thumbs_Up=8.5, avg_mon_Upgrade=1.0, weights=0.09999999999999998)

In [0]:
# get list of numerical and categorical columns
num_cols = [item[0] for item in user_data_all.dtypes if not item[1].startswith('string')]
str_cols = [item[0] for item in user_data_all.dtypes if item[1].startswith('string')]

# drop 'label' or 'userId' from the list since this is the response variable
num_cols.remove('label')
str_cols.remove('userId')

In [37]:
num_cols

['days_since_reg',
 'avg_mon_session_count',
 'avg_mon_sess_duration',
 'avg_mon_About',
 'avg_mon_Add_Friend',
 'avg_mon_Add_to_Playlist',
 'avg_mon_Cancel',
 'avg_mon_Cancellation_Confirmation',
 'avg_mon_Downgrade',
 'avg_mon_Error',
 'avg_mon_Help',
 'avg_mon_Home',
 'avg_mon_Logout',
 'avg_mon_NextSong',
 'avg_mon_Roll_Advert',
 'avg_mon_Save_Settings',
 'avg_mon_Settings',
 'avg_mon_Submit_Downgrade',
 'avg_mon_Submit_Upgrade',
 'avg_mon_Thumbs_Down',
 'avg_mon_Thumbs_Up',
 'avg_mon_Upgrade',
 'weights']

In [38]:
str_cols

['gender', 'level', 'state', 'device']

In [39]:
stages = []
for col in str_cols:
  stringIndexer = StringIndexer(inputCol = col, outputCol = col + '_index', handleInvalid = 'keep')
  print(stringIndexer)
  encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[col + "_classVec"])
  print(encoder)
  stages += [stringIndexer, encoder]
  print(stages)

StringIndexer_7e6112d29ea8
OneHotEncoderEstimator_d75b98f05302
[StringIndexer_7e6112d29ea8, OneHotEncoderEstimator_d75b98f05302]
StringIndexer_86e144f22aa3
OneHotEncoderEstimator_f8a194e34764
[StringIndexer_7e6112d29ea8, OneHotEncoderEstimator_d75b98f05302, StringIndexer_86e144f22aa3, OneHotEncoderEstimator_f8a194e34764]
StringIndexer_cbc591a810ef
OneHotEncoderEstimator_cff2481fab9d
[StringIndexer_7e6112d29ea8, OneHotEncoderEstimator_d75b98f05302, StringIndexer_86e144f22aa3, OneHotEncoderEstimator_f8a194e34764, StringIndexer_cbc591a810ef, OneHotEncoderEstimator_cff2481fab9d]
StringIndexer_01f5f85dca20
OneHotEncoderEstimator_dd9d63b08d45
[StringIndexer_7e6112d29ea8, OneHotEncoderEstimator_d75b98f05302, StringIndexer_86e144f22aa3, OneHotEncoderEstimator_f8a194e34764, StringIndexer_cbc591a810ef, OneHotEncoderEstimator_cff2481fab9d, StringIndexer_01f5f85dca20, OneHotEncoderEstimator_dd9d63b08d45]


In [0]:
assemblerInputs = [c + "_classVec" for c in str_cols] + num_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

### 4. Transform data using Pipelines

In [0]:
#user_data_all = user_data_all.drop('features', 'scaledFeatures')

In [0]:
user_data_all = transform_data(user_data_all, num_cols, str_cols)

In [42]:
# check schema for all columns
user_data_all.printSchema()

root
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- userId: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- state: string (nullable = true)
 |-- device: string (nullable = true)
 |-- days_since_reg: integer (nullable = true)
 |-- avg_mon_session_count: double (nullable = true)
 |-- avg_mon_sess_duration: double (nullable = true)
 |-- avg_mon_About: double (nullable = false)
 |-- avg_mon_Add_Friend: double (nullable = false)
 |-- avg_mon_Add_to_Playlist: double (nullable = false)
 |-- avg_mon_Cancel: double (nullable = false)
 |-- avg_mon_Cancellation_Confirmation: double (nullable = false)
 |-- avg_mon_Downgrade: double (nullable = false)
 |-- avg_mon_Error: double (nullable = false)
 |-- avg_mon_Help: double (nullable = false)
 |-- avg_mon_Home: double (nullable = false)
 |-- avg_mon_Logout: double (nullable = false)
 |-- avg_mon_NextSong: double (

In [43]:
user_data_all.head()

Row(features=DenseVector([1.0, 1.0, 4.0, 2.0, 25.0, 3.5, 1232078.0, 1.0, 2.0, 3.5, 0.0, 0.0, 0.0, 0.0, 1.0, 5.5, 2.5, 137.5, 26.0, 0.0, 0.0, 0.0, 0.0, 2.5, 8.5, 1.0, 0.1]), scaledFeatures=DenseVector([1.0, 1.0, 0.0702, 0.4, 0.8, 0.0472, 0.4696, 0.08, 0.028, 0.0292, 0.0, 0.0, 0.0, 0.0, 0.0435, 0.031, 0.0403, 0.0336, 0.4062, 0.0, 0.0, 0.0, 0.0, 0.0667, 0.0389, 0.1333, 0.0]), userId='100010', gender='F', label=0, level='free', state=' CT', device='iPhone', days_since_reg=25, avg_mon_session_count=3.5, avg_mon_sess_duration=1232078.0, avg_mon_About=1.0, avg_mon_Add_Friend=2.0, avg_mon_Add_to_Playlist=3.5, avg_mon_Cancel=0.0, avg_mon_Cancellation_Confirmation=0.0, avg_mon_Downgrade=0.0, avg_mon_Error=0.0, avg_mon_Help=1.0, avg_mon_Home=5.5, avg_mon_Logout=2.5, avg_mon_NextSong=137.5, avg_mon_Roll_Advert=26.0, avg_mon_Save_Settings=0.0, avg_mon_Settings=0.0, avg_mon_Submit_Downgrade=0.0, avg_mon_Submit_Upgrade=0.0, avg_mon_Thumbs_Down=2.5, avg_mon_Thumbs_Up=8.5, avg_mon_Upgrade=1.0, weights=

In [0]:
# save parquet file - save data
# user_data_all.write.parquet("/content/drive/My Drive/ML data/user_data_all_06-01_2.parquet")

In [0]:
# # # using SQLContext to read parquet file
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

# # # to read parquet file
# user_data_all = sqlContext.read.parquet("/user_data_all_05-30.parquet")

### 5. Split data into training and testing datasets

In [44]:
# split data for training and testing datasets
train, test = user_data_all.randomSplit([0.7, 0.3], seed=1104)
print(train.count())
print(test.count())

141
84


In [45]:
train[train.label == 1].count()

31

In [46]:
test[test.label == 1].count()

21

#### 5.1 Save data for later load.

In [0]:
# # save parquet file - save data
# train.write.parquet("/train_05-30.parquet")

In [0]:
# # save parquet file - save data
# test.write.parquet("/test_05-30.parquet")

In [0]:
# # using SQLContext to read parquet file
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

# # to read parquet file
# train = sqlContext.read.parquet("/train_05-30.parquet")
# test = sqlContext.read.parquet("/test_05-30.parquet")

### 6. Run baseline models

#### Instantiate model objects

In [0]:
# Logistic Regression
lr_model = LogisticRegression(featuresCol = "features", labelCol = "label", maxIter=10)

# Gradient Boosting Trees (GBT)
gbt_model = GBTClassifier(featuresCol = "scaledFeatures", labelCol = "label", maxIter=10)

# Decision Tree Classifier
dt_model = DecisionTreeClassifier(featuresCol = 'scaledFeatures', labelCol = 'label', maxDepth = 3)

# RandomForest Classifier
rf_model = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'label', maxDepth = 3)

#### F1 score for each model

Run each model and get the f1 score.

In [58]:
lr_predict, lr_score = model_testing(lr_model, train, test)
print("f1 score for Logistic Regression: {}".format(lr_score))

f1 score for Logistic Regression: 1.0


In [59]:
lr_predict.select("label", "prediction").collect()

[Row(label=1, prediction=1.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(lab

In [50]:
gbt_predict, gbt_score = model_testing(gbt_model, train, test)
print("f1 score for GBT Classifier: {}".format(gbt_score))

f1 score for GBT Classifier: 1.0


In [51]:
gbt_predict.select("label", "prediction").collect()

[Row(label=1, prediction=1.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(lab

In [52]:
dt_predict, dt_score = model_testing(dt_model, train, test)
print("f1 score for Decistion Tree Classifier: {}".format(dt_score))

f1 score for Decistion Tree Classifier: 1.0


In [53]:
dt_predict.select("label", "prediction").collect()

[Row(label=1, prediction=1.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(lab

In [54]:
rf_predict, rf_score = model_testing(rf_model, train, test)
print("f1 score for RandomForest Classifier: {}".format(rf_score))

f1 score for RandomForest Classifier: 1.0


In [55]:
rf_predict.select("label", "prediction").collect()

[Row(label=1, prediction=1.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=0, prediction=0.0),
 Row(lab

### 7. Parameter tuning

In [0]:
gbt = GBTClassifier(featuresCol = "features", labelCol = "label")
paramGrid = (ParamGridBuilder()\
             .addGrid(gbt.maxDepth, [2, 4])\
             .addGrid(gbt.maxBins, [20, 30])\
             .addGrid(gbt.maxIter, [10, 15])\
             .build())

In [0]:
score, predict, bestModel = model_tuning(train, test, gbt, paramGrid, 5)

In [113]:
print("f1 score: {}".format(score))
print("Best parameter for depth: {}".format(bestModel._java_obj.getMaxDepth()))
print("Best parameter for bins: {}".format(bestModel._java_obj.getMaxBins()))
print("Best parameter for iterations: {}".format(bestModel._java_obj.getMaxIter()))

f1 score: 1.0
Best parameter for depth: 2
Best parameter for bins: 20
Best parameter for iterations: 10


In [114]:
predict.select("features", "label", "prediction").show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(91,[0,2,12,64,68...|    0|       0.0|
|(91,[0,2,50,62,68...|    0|       0.0|
|(91,[0,2,6,62,68,...|    1|       1.0|
|(91,[1,2,16,63,68...|    0|       0.0|
|(91,[0,3,5,63,68,...|    1|       1.0|
|(91,[0,3,28,62,68...|    0|       0.0|
|(91,[1,2,39,62,68...|    0|       0.0|
|(91,[0,2,4,64,68,...|    0|       0.0|
|(91,[0,3,53,62,68...|    0|       0.0|
|(91,[1,3,13,62,68...|    0|       0.0|
|(91,[0,2,12,63,68...|    0|       0.0|
|(91,[1,2,56,64,68...|    0|       0.0|
|(91,[0,2,11,65,68...|    0|       0.0|
|(91,[1,2,8,62,68,...|    0|       0.0|
|(91,[1,3,6,63,68,...|    1|       1.0|
|(91,[0,3,12,63,68...|    0|       0.0|
|(91,[1,3,24,62,68...|    0|       0.0|
|(91,[1,2,43,63,68...|    0|       0.0|
|(91,[0,2,9,65,68,...|    1|       1.0|
|(91,[0,3,9,62,68,...|    0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows

