In [34]:
# import libraries
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
#from pyspark.ml.evaluation import BinaryClassificationEvaluator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read in data

In [46]:
# Read in full sparkify dataset and show first row
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

## Explore data

In [4]:
# Number of rows 
df.count() # 26259199 entries

# Schema
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
# Define Churn by user: This will be a binary column called "label". If user has cancelled once, this will be one for this user
df = df.withColumn("Churn", F.when(df.page == 'Cancellation Confirmation',1).otherwise(0))
df.head()

label_user = df.select("Churn", "userId").groupBy("userId").sum()
label_user = label_user.withColumnRenamed("sum(Churn)", "ChurnSum")
label_user.head()

generate_labels = F.udf(lambda x: 1 if x > 0 else 0, IntegerType())
label_user = label_user.withColumn("label", generate_labels(label_user.ChurnSum))
label_user = label_user.drop("ChurnSum")
label_user.head(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1142513', label=0), Row(userId='1271218', label=0), Row(userId='1972195', label=0), Row(userId='1883991', label=1), Row(userId='1816626', label=0), Row(userId='1766909', label=0), Row(userId='1869054', label=0), Row(userId='1084642', label=0), Row(userId='1344069', label=0), Row(userId='1768352', label=0), Row(userId='1008404', label=1), Row(userId='1670523', label=0), Row(userId='1844759', label=0), Row(userId='1137412', label=0), Row(userId='1274556', label=0), Row(userId='1846298', label=0), Row(userId='1241210', label=1), Row(userId='1011149', label=0), Row(userId='1081344', label=0), Row(userId='1112183', label=0)]

In [6]:
# How many users churned?
label_user.groupBy('label').count().orderBy('count').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|label|count|
+-----+-----+
|    1| 5003|
|    0|17275|
+-----+-----+

In [7]:
# Check for invalid or missing data
for col in df.columns:
    print(f" The column {col} has {df.filter(df[col].isNull()).count()} missing values")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 The column artist has 5408927 missing values
 The column auth has 0 missing values
 The column firstName has 778479 missing values
 The column gender has 778479 missing values
 The column itemInSession has 0 missing values
 The column lastName has 778479 missing values
 The column length has 5408927 missing values
 The column level has 0 missing values
 The column location has 778479 missing values
 The column method has 0 missing values
 The column page has 0 missing values
 The column registration has 778479 missing values
 The column sessionId has 0 missing values
 The column song has 5408927 missing values
 The column status has 0 missing values
 The column ts has 0 missing values
 The column userAgent has 778479 missing values
 The column userId has 0 missing values
 The column Churn has 0 missing values

In [8]:
# Above computation showed, that there are patterns of missing data: 
# One cluster in which song information is missing (columns artist, length, song have 5 408 927 missing rows) and one cluster with missing user information (firstName, gender, lastName, location and userAgent missing 778479 rows).
# What do those missing rows look like, what is happening there? 

df.filter(df.artist.isNull()).head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=87, lastName=None, length=None, level='paid', location=None, method='GET', page='Home', registration=None, sessionId=8615, song=None, status=200, ts=1538352008000, userAgent=None, userId='1261737', Churn=0), Row(artist=None, auth='Logged In', firstName='Valarie', gender='F', itemInSession=206, lastName='Moore', length=None, level='paid', location='Los Angeles-Long Beach-Anaheim, CA', method='GET', page='Home', registration=1537790336000, sessionId=2948, song=None, status=200, ts=1538352008000, userAgent='"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1009070', Churn=0), Row(artist=None, auth='Logged In', firstName='Sophia', gender='F', itemInSession=9, lastName='Dalton', length=None, level='free', location='El Paso, TX', method='PUT', page='Thumbs Up', registration=1536693084000, sessionId=655, song=None, status=307, ts=1538352011000, userA

In [9]:
df.filter(df.firstName.isNull()).head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=87, lastName=None, length=None, level='paid', location=None, method='GET', page='Home', registration=None, sessionId=8615, song=None, status=200, ts=1538352008000, userAgent=None, userId='1261737', Churn=0), Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=0, lastName=None, length=None, level='free', location=None, method='PUT', page='Login', registration=None, sessionId=7433, song=None, status=307, ts=1538352041000, userAgent=None, userId='1261737', Churn=0), Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=4, lastName=None, length=None, level='free', location=None, method='GET', page='Home', registration=None, sessionId=25003, song=None, status=200, ts=1538352182000, userAgent=None, userId='1261737', Churn=0), Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=2, lastName=None, length=None, level='free', location=None,

In [48]:
# Illustrate different outcomes in "page" column
df.groupBy('page').count().orderBy('count').show(50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+
|                page|   count|
+--------------------+--------+
| Submit Registration|     401|
|            Register|     802|
|Cancellation Conf...|    5003|
|              Cancel|    5003|
|    Submit Downgrade|    6494|
|      Submit Upgrade|   15135|
|               Error|   25962|
|       Save Settings|   29516|
|             Upgrade|   50507|
|               About|   92759|
|            Settings|  147074|
|                Help|  155100|
|           Downgrade|  184240|
|         Thumbs Down|  239212|
|              Logout|  296005|
|               Login|  296350|
|          Add Friend|  381664|
|         Roll Advert|  385212|
|     Add to Playlist|  597921|
|           Thumbs Up| 1151465|
|                Home| 1343102|
|            NextSong|20850272|
+--------------------+--------+

In [11]:
# Those missing data seems to contain still relevant info in the "page" section:
# Users are visiting the Help section, getting an error, down- or upgrading songs.
# In order to keep this useful information, we count those occurences by user and then compute summary statistics about positive and negative interactions
# Prepare pages by user field 

pages_by_user = df.groupBy(['userId']).pivot('page').count().fillna(0)
pages_by_user.head()
pages_by_user.printSchema()


cols_to_sum_over = [element for element in pages_by_user.columns if element != 'userId']



pages_by_user = pages_by_user.withColumn("TotalInteractions", sum(pages_by_user[col] for col in cols_to_sum_over))
pages_by_user = pages_by_user.withColumn("PositiveInteractions", F.col('Add Friend') + F.col('Add to Playlist') + F.col('NextSong') + F.col('Thumbs Up') + F.col('Upgrade'))
pages_by_user = pages_by_user.withColumn("NegativeInteractions", F.col('Submit Downgrade') + F.col('Error') + F.col('Help') + F.col('Downgrade') + F.col('Thumbs Down'))
pages_by_user = pages_by_user.withColumn("SharePositiveInteractions", F.col("PositiveInteractions") / F.col("TotalInteractions"))
pages_by_user = pages_by_user.withColumn("ShareNegativeInteractions", F.col("NegativeInteractions") / F.col("TotalInteractions"))
pages_by_user = pages_by_user.select(F.col('userId'), F.col('SharePositiveInteractions'), F.col('ShareNegativeInteractions'))
pages_by_user.head(3)




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- About: long (nullable = true)
 |-- Add Friend: long (nullable = true)
 |-- Add to Playlist: long (nullable = true)
 |-- Cancel: long (nullable = true)
 |-- Cancellation Confirmation: long (nullable = true)
 |-- Downgrade: long (nullable = true)
 |-- Error: long (nullable = true)
 |-- Help: long (nullable = true)
 |-- Home: long (nullable = true)
 |-- Login: long (nullable = true)
 |-- Logout: long (nullable = true)
 |-- NextSong: long (nullable = true)
 |-- Register: long (nullable = true)
 |-- Roll Advert: long (nullable = true)
 |-- Save Settings: long (nullable = true)
 |-- Settings: long (nullable = true)
 |-- Submit Downgrade: long (nullable = true)
 |-- Submit Registration: long (nullable = true)
 |-- Submit Upgrade: long (nullable = true)
 |-- Thumbs Down: long (nullable = true)
 |-- Thumbs Up: long (nullable = true)
 |-- Upgrade: long (nullable = true)

[Row(userId='1114507', SharePositiveInteractions=0.8880382775119617, ShareNega

## Remove missing user data


In [12]:
df = df.dropna(how='any', subset=['artist', 'firstName'])



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Feature Engineering 



In [14]:
# Timestamp column - extract number of days registered
df = df.withColumn("TimeRegistered", df.ts - df.registration)
df = df.withColumn("TimeRegisteredSeconds", df.TimeRegistered / 1000)
df = df.withColumn("DaysRegistered", F.ceil(df.TimeRegisteredSeconds / (60*60*24)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Count the number of different songs a user has listend to
songs_by_user = df.select('userId', 'song').dropDuplicates().groupBy('userId').count()
songs_by_user = songs_by_user.withColumnRenamed("count", "NumberOfDifferentSongs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Compute total, mean and std of the length of songs a user has listened to 
total_length_by_user = df.select('userId', 'length').groupBy('userId').sum()
total_length_by_user = total_length_by_user.withColumnRenamed("sum(length)", "TotalLength")

avg_length_by_user = df.select('userId', 'length').groupBy('userId').mean()
avg_length_by_user = avg_length_by_user.withColumnRenamed("avg(length)", "AvgLength")

std_length_by_user = df.select('userId', 'length').groupBy('userId').agg(F.stddev('length'))
std_length_by_user = std_length_by_user.withColumnRenamed("stddev_samp(length)", "StdLength")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Extract the US state of the user (occurs after the comma, as in 'Bakersfield, CA')
extract_state = F.udf(lambda x: "Missing" if x == '' else (x.split(", ")[1]), StringType())
df = df.withColumn("State", extract_state(df.location))
df.groupBy('State').count().orderBy(F.desc('count')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------+
|      State|  count|
+-----------+-------+
|         CA|2718538|
|         TX|1657413|
|   NY-NJ-PA|1416186|
|         FL|1315374|
|         MI| 658378|
|         OH| 628231|
|   IL-IN-WI| 593556|
|         GA| 567738|
|         AZ| 536478|
|         PA| 507109|
|PA-NJ-DE-MD| 502817|
|         WA| 497018|
|DC-VA-MD-WV| 436256|
|         NY| 433677|
|         NC| 428223|
|         IN| 365177|
|      MA-NH| 337747|
|         CO| 313123|
|         WI| 304415|
|         AL| 276120|
+-----------+-------+
only showing top 20 rows

In [18]:
df.filter(df.State.isNull()).count()
df.filter(df.DaysRegistered.isNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [19]:
# Only keep relevant columns for analysis and consoliate dataframe by user
# Keep only the latest level data for a user - the last access by a user is with high likelihood also when he churned
df_by_user = df.select(['userId', 'userAgent', 'State', 'level', 'itemInSession', 'gender', 'DaysRegistered']).orderBy(F.desc('ts')).dropDuplicates(['userId'])
df_by_user.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1001393', userAgent='"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', State='OH', level='paid', itemInSession=151, gender='M', DaysRegistered=19), Row(userId='1002143', userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', State='NY-NJ-PA', level='paid', itemInSession=70, gender='M', DaysRegistered=74), Row(userId='1002493', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', State='PA-NJ-DE-MD', level='free', itemInSession=232, gender='M', DaysRegistered=8), Row(userId='1002749', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', State='MD', level='paid', itemInSession=24, gender='M', DaysRegistered=78), Row(userId='1004060', userAgent='"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 li

In [20]:
# Define string indexer to convert level, state and gender into numeric columns

level_indexer = StringIndexer(inputCol="level", outputCol="NumLevel")
state_indexer = StringIndexer(inputCol="State", outputCol="NumState")
gender_indexer = StringIndexer(inputCol="gender", outputCol="NumGender")
agent_indexer = StringIndexer(inputCol="userAgent", outputCol="NumAgent")

df_by_user = level_indexer.fit(df_by_user).transform(df_by_user)
df_by_user = state_indexer.fit(df_by_user).transform(df_by_user)
df_by_user = gender_indexer.fit(df_by_user).transform(df_by_user)
df_by_user = agent_indexer.fit(df_by_user).transform(df_by_user)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Merge dataframes into one dataframe in which one row corresponds to a user with features and label ("churn")

df_modelling = df_by_user.join(songs_by_user, ['userId'], 'left')

for dataframe in [total_length_by_user, avg_length_by_user, std_length_by_user, pages_by_user, label_user]: 
    
    df_modelling = df_modelling.join(dataframe, ['userId'], 'left')

df_modelling.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1001393', userAgent='"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', State='OH', level='paid', itemInSession=134, gender='M', DaysRegistered=60, NumLevel=0.0, NumState=4.0, NumGender=0.0, NumAgent=25.0, NumberOfDifferentSongs=475, TotalLength=124086.29356, AvgLength=251.18682906882592, StdLength=81.79764237586046, SharePositiveInteractions=0.8901098901098901, ShareNegativeInteractions=0.0282574568288854, label=0), Row(userId='1002143', userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', State='NY-NJ-PA', level='free', itemInSession=74, gender='M', DaysRegistered=60, NumLevel=1.0, NumState=2.0, NumGender=0.0, NumAgent=2.0, NumberOfDifferentSongs=350, TotalLength=91167.15798, AvgLength=253.94751526462397, StdLength=90.25001263937163, SharePositiveInteractions=0.8613445378151261, ShareNegativeInteractions=0.023109243697478993, label=0), Row(userI

In [22]:
## Modelling

# Split data into train and test 

train, test = df_modelling.randomSplit([0.9, 0.1], seed=42)
train.head()
train.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['userId', 'userAgent', 'State', 'level', 'itemInSession', 'gender', 'DaysRegistered', 'NumLevel', 'NumState', 'NumGender', 'NumAgent', 'NumberOfDifferentSongs', 'TotalLength', 'AvgLength', 'StdLength', 'SharePositiveInteractions', 'ShareNegativeInteractions', 'label']

## Define class for evaluation metric in CrossValidator and function to compute evaluations

In [23]:
# Define class for F1Binary Evaluator (taken from https://stackoverflow.com/a/68280339 )

class F1BinaryEvaluator():

    def __init__(self, predCol="prediction", labelCol="label", metricLabel=1.0):
        self.labelCol = labelCol
        self.predCol = predCol
        self.metricLabel = metricLabel

    def isLargerBetter(self):
        return True

    def evaluate(self, dataframe):
        tp = dataframe.filter(self.labelCol + ' = ' + str(self.metricLabel) + ' and ' + self.predCol + ' = ' + str(self.metricLabel)).count()
        fp = dataframe.filter(self.labelCol + ' != ' + str(self.metricLabel) + ' and ' + self.predCol + ' = ' + str(self.metricLabel)).count()
        fn = dataframe.filter(self.labelCol + ' = ' + str(self.metricLabel) + ' and ' + self.predCol + ' != ' + str(self.metricLabel)).count()
        return tp / (tp + (.5 * (fn +fp)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
def evaluation_metrics(model_obj,df):
    
    '''
    
    Returns evaluation metrics (precision, recall,accuracy and F score) for a fitted model object on a test dataframe
    
    Input Arguments:
    
        - model_ob: A Spark model object
        - df: the dataframe to evalute (usually the test dataset from the randomSplit)
  
    
    Output:
    
        - None, but prints the evaluation metrics
    
  
    '''
    
    results = model_obj.transform(df).select('label','prediction')

    
    TP = results.filter('label == 1 and prediction == 1').count()
    FN = results.filter('label == 1 and prediction == 0').count()
    FP = results.filter('label == 0 and prediction == 1').count()
    TN = results.filter('label == 0 and prediction == 0').count()
    
    recall = TP/ (TP+FN)
    precision = TP / (TP + FP)
    fscore = 2*(precision*recall)/ (precision + recall)
    accuracy = (TP + TN)/(TP + TN + FN + FP)
    
    print(f"Accuracy: {accuracy}")
    print(f"F1-Score: {fscore}")
    print(f"Recall: {recall}")
    print(f"Precision: {precision}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Logistic Regression 

In [25]:


assembler = VectorAssembler(inputCols=['DaysRegistered', 'NumLevel', 'NumState', 'NumGender', 'NumAgent',
                                       'NumberOfDifferentSongs', 'TotalLength', 'AvgLength', 'StdLength', 
                                       'SharePositiveInteractions', 'ShareNegativeInteractions'], 
                                       outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledfeatures", withStd=True)
#df_modelling = assembler.transform(df_modelling)

lr =  LogisticRegression(maxIter=10, labelCol='label', featuresCol='scaledfeatures')

#pipeline_lr = Pipeline(stages=[level_indexer.setHandleInvalid("skip"), state_indexer.setHandleInvalid("skip"), gender_indexer.setHandleInvalid("skip"), assembler.setHandleInvalid("skip"), lr])
#pipeline_lr = Pipeline(stages=[level_indexer, state_indexer, gender_indexer, assembler.setHandleInvalid("skip"), lr])
pipeline_lr = Pipeline(stages=[assembler.setHandleInvalid('skip'), scaler, lr])

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          #evaluator=BinaryEvaluator,
                          evaluator=F1BinaryEvaluator(),
                          numFolds=3)

cval_model_lr = crossval_lr.fit(train)
cval_model_lr.avgMetrics



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-25:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1293



[0.15251329881742506, 0.08455487836315763, 0.06971457349565183, 0.04177339719000879, 0.0, 0.0]

In [31]:
evaluation_metrics(cval_model_lr, test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-31:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 24493



Accuracy: 0.7753690036900369
F1-Score: 0.15597920277296362
Recall: 0.08928571428571429
Precision: 0.6164383561643836

In [35]:
cval_model_lr.getEstimatorParamMaps()[np.argmax(cval_model_lr.avgMetrics)]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='LogisticRegression_42cf828ce197', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_42cf828ce197', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}

## Random forest

In [39]:
rf = RandomForestClassifier()

# need to remove Numstate and NumAgent because number of categorical features exceeds maxBins

assembler = VectorAssembler(inputCols=['DaysRegistered', 'NumLevel', 'NumGender', 
                                       'NumberOfDifferentSongs', 'TotalLength', 'AvgLength', 'StdLength', 
                                       'SharePositiveInteractions', 'ShareNegativeInteractions'], 
                                       outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledfeatures", withStd=True)

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.maxDepth,[2, 3, 4]) \
    .build()

pipeline_rf = Pipeline(stages=[assembler.setHandleInvalid('skip'), scaler, rf])

crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=F1BinaryEvaluator(),
                          numFolds=3)

cval_model_rf = crossval_rf.fit(train)
cval_model_rf.avgMetrics



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-39:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 24690



[0.0, 0.09334487221315038, 0.24975701476687057]

In [41]:
evaluation_metrics(cval_model_rf, test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-41:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 26787



Accuracy: 0.7878228782287823
F1-Score: 0.222972972972973
Recall: 0.13253012048192772
Precision: 0.7021276595744681

In [42]:
cval_model_rf.getEstimatorParamMaps()[np.argmax(cval_model_rf.avgMetrics)]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='RandomForestClassifier_d7666e060778', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4}

## Gradient Boosting Tree Classifier

In [29]:


gbt = GBTClassifier(subsamplingRate=0.5, minInstancesPerNode=10, seed=42)

# need to remove Numstate and NumAgent because number of categorical features exceeds maxBins

assembler = VectorAssembler(inputCols=['DaysRegistered', 'NumLevel', 'NumGender', 
                                       'NumberOfDifferentSongs', 'TotalLength', 'AvgLength', 'StdLength', 
                                       'SharePositiveInteractions', 'ShareNegativeInteractions'], 
                                       outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledfeatures", withStd=True)

paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth,[2, 3, 4]) \
    .build()

pipeline_gbt = Pipeline(stages=[assembler.setHandleInvalid('skip'), scaler, gbt])

crossval_gbt = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid_gbt,
                          evaluator=F1BinaryEvaluator(),
                          numFolds=3)

cval_model_gbt = crossval_gbt.fit(train)

cval_model_gbt.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-29:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 8448



[0.39400079885027894, 0.4316506225443716, 0.4424942817245162]

In [40]:
evaluation_metrics(cval_model_gbt, test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.7998159226875288
F1-Score: 0.42384105960264895
Recall: 0.32
Precision: 0.6274509803921569

In [37]:
cval_model_gbt.getEstimatorParamMaps()[np.argmax(cval_model_gbt.avgMetrics)]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='GBTClassifier_8b8a2cfcf2b7', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4}