In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, lit, min, max, split, udf, countDistinct, sum, count, isnan, when, count, col, size
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import array, lit
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoderEstimator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier,RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml import Pipeline

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1618666845340_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Starter code
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [3]:
def clean_dataset(event_data):
    
    '''
    Take a event_data as a path to a spark dataframe in the json format. Selects the columns that will be used
    in the model, cleans the data, create dummies for the categoricals, create the date columns, balance
    the dataframe and outputs a model dataframe.

    Parameters:
    event_data : the path to the dataframe in the spark session in the format json (ex: 'mini_sparkify_event_data.json')

    Returns cleaned and balanced spark dataframe.
    '''

    
    df = spark.read.json(event_data)
    #selecting only the columns that will be used
    df=df.select(['gender', 'auth','page', 'ts', 'level', 'userId', 'page','registration'])

    #removing the guest and logged Out
    df_filtered = df.filter((df.auth != 'Guest') & (df.auth != 'Logged Out'))

    #Creating the churn column as a dummy
    cancelled = udf(lambda x : 0 if  x == 'Logged In' else 1, IntegerType())
    df_clean = df_filtered.withColumn('churn',cancelled('auth'))

    # Creating a new column with the page column as 1 if the user is downgraded
    downgrade = udf(lambda x : 1 if  x == 'Submit Downgrade' else 0, IntegerType())
    df_clean = df_clean.withColumn('Downgrade',downgrade('page'))


    #creating the date column
    df_clean = df_clean.withColumn("date", F.to_date(F.from_unixtime(col('ts')/lit(1000))))
    df_clean = df_clean.withColumn("date_created", F.to_date(F.from_unixtime(col('registration')/lit(1000))))
    df_clean = df_clean.withColumn("time_from_creation", F.datediff('date','date_created'))

    # Convert gender to binary. 1 = female
    female = udf(lambda x: 1 if x == 'F' else 0, IntegerType())
    df_clean = df_clean.withColumn('gender_binary', female('gender'))

    # Convert level to binary. 1 = paid
    level = udf(lambda x: 1 if x == 'paid' else 0, IntegerType())
    df_clean = df_clean.withColumn('paid_user', level('level'))

    # Convert songs played to binary.
    songs = udf(lambda x: 1 if x == 'NextSong' else 0, IntegerType())
    df_clean = df_clean.withColumn('played_song', songs('page'))

    # Get user interactions except for playing songs
    inter = ['Thumbs Up', 'Thumbs Down', 'Add to Playlist', 'Add Friend']
    get_interactions = udf(lambda x: 1 if x in inter else 0, IntegerType())
    df_clean = df_clean.withColumn('interactions', get_interactions('page'))


    # Get the model
    model_df = df_clean.groupby('userId').agg(\
                                                              max('churn').alias('churn_rate'),\
                                                              max('Downgrade').alias('Downgrade'),\
                                                              avg('time_from_creation').alias('time_from_creation'),\
                                                              max('gender_binary').alias('gender'),\
                                                              max('paid_user').alias('paid_user'),\
                                                              sum('played_song').alias('total_songs_played'),\
                                                              sum('interactions').alias('interactions'),\
                                             max('date').alias('date'),\
                                             max('date_created').alias('date_created'))



    #Create a column with the session time and dropping the date column
    model_df = model_df.withColumn("session_time", F.datediff('date','date_created'))
    model_df = model_df.drop('date')
    
    #Ensuring that users with low interactions are considered as churned
    model_df = model_df.withColumn("churn_rate", \
              F.when( (model_df["churn_rate"]==1) | (model_df["interactions"] < 2), 1).otherwise(0))
    
    #Balancing the dataframe by the churn rate
    numerator = model_df.filter(model_df.churn_rate == 1).count()
    churned =  model_df.filter(model_df.churn_rate == 1)
    not_churned= model_df.filter(model_df.churn_rate == 0)
    denominator = not_churned.count()
    sample_df = not_churned.sample(withReplacement=False, fraction=numerator/denominator, seed=42)
    model_df = churned.union(sample_df)
    

    return model_df
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def model_preparation(model):
        
    '''
    Take a model as a spark dataframe with this columns = ['Downgrade','time_from_creation','gender','paid_user',
    'total_songs_played','interactions','session_time']. Uses the VectorAssembler on this columns and then uses
    the StandardScaler on this numeric features.

    Parameters:
    model : a spark dataframe

    Returns - Data prepared for machine learning with features and label columns.
    '''
    
    # Assemble num features
    assembler = VectorAssembler(inputCols=['Downgrade','time_from_creation','gender',\
                                       'paid_user','total_songs_played','interactions',\
                                      'session_time'],\
                            outputCol='numerical_features',handleInvalid = 'skip')
    
    #temp DF
    temp = assembler.transform(model)
    
    #Create Scaler
    scaler = StandardScaler(withMean=True, withStd=True, inputCol='numerical_features', outputCol='features')
    scaler_fit = scaler.fit(temp)
    
    data = scaler_fit.transform(temp)
    
    # Use 'churn' as model prediction label
    data = data.withColumnRenamed('churn_rate','label')
    
    return data

    
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def baseline_ml(data):
            
    '''
    Take the data as a processed spark dataframe. Splits the data into train and test. Use 4 machine learnirg
    algorithms with the default settings: LogisticRegression, DecisionTreeClassifier(seed=42),
    RandomForestClassifier(seed=42),LinearSVC. Outputs prints for the baseline parameters : accuracy. f1-score,
    precision and recall.  
    

    Parameters:
    model : a processed spark dataframe

    Returns - Prints for the parameters
    '''
    
    
    # Split train test set
    train, test = data.randomSplit([0.8, 0.2], seed=42)
    
    # Initialize four models
    clf_LR = LogisticRegression()
    clf_DT = DecisionTreeClassifier(seed=42)
    clf_RF = RandomForestClassifier(seed=42)
    clf_SVM = LinearSVC()
    
    #Instanciate the evaluator
    evaluator= MulticlassClassificationEvaluator(predictionCol="prediction")
    
    #Empty dic for the results
    results = {}

    for clf in [clf_LR, clf_DT, clf_RF, clf_SVM]:
        model_results = {}
        # get the classifier name
        clf_name = clf.__class__.__name__

        #Train
        model = clf.fit(train)

        #Predict
        pred = model.transform(test)

        #Get each model result, print and append to the results
        model_results['f1_test'] = evaluator.evaluate(pred.select('label','prediction'),{evaluator.metricName: 'f1'})
        model_results['precision'] = evaluator.evaluate(pred.select('label','prediction'),{evaluator.metricName: 'weightedPrecision'})
        model_results['recall'] = evaluator.evaluate(pred.select('label','prediction'),{evaluator.metricName: 'weightedRecall'})
        model_results['accuracy'] = evaluator.evaluate(pred.select('label','prediction'),{evaluator.metricName: 'accuracy'})

        print(clf_name)
        print('Test F1-score: ',model_results['f1_test'])
        results[clf_name] = model_results
    
    
    return results
    
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:

model = clean_dataset(event_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
data = model_preparation(model)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
baseline_ml(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-8:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1258



LogisticRegression
Test F1-score:  0.7472126221248947
DecisionTreeClassifier
Test F1-score:  0.7320726580584135
RandomForestClassifier
Test F1-score:  0.7237304651485719
LinearSVC
Test F1-score:  0.7536315575929698
{'LogisticRegression': {'f1_test': 0.7472126221248947, 'precision': 0.7479786013340562, 'recall': 0.7484076433121019, 'accuracy': 0.7429481346678799}, 'DecisionTreeClassifier': {'f1_test': 0.7320726580584135, 'precision': 0.7599060310646791, 'recall': 0.7333939945404914, 'accuracy': 0.7402183803457689}, 'RandomForestClassifier': {'f1_test': 0.7237304651485719, 'precision': 0.7561078172995626, 'recall': 0.732484076433121, 'accuracy': 0.7333939945404914}, 'LinearSVC': {'f1_test': 0.7536315575929698, 'precision': 0.7495968605071163, 'recall': 0.7506824385805277, 'accuracy': 0.7493175614194723}}

In [9]:
# Get the params

clf_LR = LogisticRegression()
paramGrid = ParamGridBuilder().\
            addGrid(clf_LR.maxIter, [10, 100, 1000]).\
            addGrid(clf_LR.regParam, [0.01,0.1,10.0,100.0]).\
            build()

#Use the crossvalidation
crossval = CrossValidator(estimator=clf_LR,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                      numFolds=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#Train model
train, test = data.randomSplit([0.8, 0.2], seed=42)


cvModel_stack = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-10:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1495



In [11]:
#Predict

pred = cvModel_stack.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Results
evaluator= MulticlassClassificationEvaluator(predictionCol="prediction")
print('F-1 Score:{}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "f1"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

F-1 Score:0.7435218879141079

In [13]:
# Get the best params
bestModel = cvModel_stack.bestModel
print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())

print ('Best Param (MaxIter): ', bestModel._java_obj.getMaxIter())

print ('Best Param (elasticNetParam): ', bestModel._java_obj.getElasticNetParam())



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best Param (regParam):  0.01
Best Param (MaxIter):  10
Best Param (elasticNetParam):  0.0

In [8]:
# Get the params

clf_SVM = LinearSVC()
paramGrid = ParamGridBuilder().\
            addGrid(clf_SVM.maxIter, [10, 100, 1000]).\
            addGrid(clf_SVM.regParam, [0.01,0.1,10.0,100.0]).\
            build()

#Use the crossvalidation
crossval = CrossValidator(estimator=clf_SVM,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                      numFolds=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#Train model
train, test = data.randomSplit([0.8, 0.2], seed=42)


cvModel_stack = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-9:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1596



In [10]:
#Predict

pred = cvModel_stack.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Results
evaluator= MulticlassClassificationEvaluator(predictionCol="prediction")
print('F-1 Score:{}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "f1"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

F-1 Score:0.755621576663529

In [14]:
# Get the best params
bestModel = cvModel_stack.bestModel
print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())

print ('Best Param (MaxIter): ', bestModel._java_obj.getMaxIter())





VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best Param (regParam):  0.01
Best Param (MaxIter):  100