## Sample Run

This is a sample run of the mini-sparkify dataset on a single node cluster on AWS-EMR with a Pyspark kernel. I will use this as a reference while training the large sparkify data.

`1.` __Installing and Importing Packages__

`1.1` Installing Packages

In [1]:
# Installing the packages pandas and matplotlib

sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") 

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1619138142649_0011,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.1

Collecting matplotlib
  Using cached https://files.pythonhosted.org/packages/ce/63/74c0b6184b6b169b121bb72458818ee60a7d7c436d7b1907bd5874188c55/matplotlib-3.4.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting pyparsing>=2.2.1 (from matplotlib)
  Using cached https://files.pythonhosted.org/packages/8a/bb/488841f56197b13700afd5658fc279a2025a39e22449b7cf29864669b15d/pyparsing-2.4.7-py2.py3-none-any.whl
Collecting pillow>=6.2.0 (from matplotlib)
  Using cached https://file

In [15]:
# pyspark2pmml may help in saving a pyspark model as a pmml file
sc.install_pypi_package("pyspark2pmml")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pyspark2pmml
  Downloading https://files.pythonhosted.org/packages/78/48/e3b2c8f52132eb1807d986f74b035cfc6114f42a0310d6d905b90f63a3cc/pyspark2pmml-0.5.1.tar.gz
Collecting py4j (from pyspark2pmml)
  Downloading https://files.pythonhosted.org/packages/2b/e2/543019a6e620b759a59f134158b4595766f9bf520a1081a2ba1a1809ba32/py4j-0.10.9.2-py2.py3-none-any.whl (198kB)
Building wheels for collected packages: pyspark2pmml
  Running setup.py bdist_wheel for pyspark2pmml: started
  Running setup.py bdist_wheel for pyspark2pmml: finished with status 'done'
  Stored in directory: /var/lib/livy/.cache/pip/wheels/fa/f2/e9/e2370733daa2c5fc3271a64eba149a57d44a607901760d17b3
Successfully built pyspark2pmml
Installing collected packages: py4j, pyspark2pmml
Successfully installed py4j-0.10.9.2 pyspark2pmml-0.5.1

`1.2` Importing general libraries and packages

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from datetime import datetime

import re

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`1.3` Importing pyspark based libraries

In [4]:
# import libraries for spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, DateType, IntegerType
from pyspark.sql.functions import concat, lit, avg, split, isnan, when, count, col, sum, mean, stddev, min, max, round, udf, to_date, datediff 
from pyspark.sql import Window

from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.mllib.util import MLUtils

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# List the current packages available
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version  
-------------------------- ---------
beautifulsoup4             4.9.3    
boto                       2.49.0   
click                      7.1.2    
cycler                     0.10.0   
jmespath                   0.10.0   
joblib                     1.0.1    
kiwisolver                 1.3.1    
lxml                       4.6.2    
matplotlib                 3.4.1    
mysqlclient                1.4.2    
nltk                       3.5      
nose                       1.3.4    
numpy                      1.16.5   
pandas                     0.25.1   
Pillow                     8.2.0    
pip                        9.0.1    
py-dateutil                2.2      
pyparsing                  2.4.7    
python-dateutil            2.8.1    
python37-sagemaker-pyspark 1.4.1    
pytz                       2021.1   
PyYAML                     5.4.1    
regex                      2021.3.17
setuptools                 28.8.0   
six                        1.13.0   
t

`2.` __Setting up a spark session__

In [20]:


spark = SparkSession.builder \
    .master("local") \
    .appName("spark_app") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# getting information about the current session configuration

%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1619138142649_0011,pyspark,idle,Link,Link,✔


In [1]:
# configuring the session

%%configure -f 

{ "conf":{
          "spark.pyspark.python": "python3", "spark.pyspark.virtualenv.enabled": "true", "spark.pyspark.virtualenv.type": "native", "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv", "driverMemory": "6000M"
         }
}

`3` __Feature Engineering__

`3.1` Extracting useful features

In [6]:

def clean_data(df):
    
    """
    This functions removes all the rows where userId is empty and 
    returns the dataframe.
    
    Input : DataFrame
    Output : cleaned DataFrame
    """
    
    df_new = df.filter(df["userId"] != "")
    
    return df_new


def prepare_dataset(df): 
    
    """
    This function will prepare the DataFrame for Machine Learning
    by Extracting out the useful features, and engineering more 
    relevant features. The unique users will be kept in one of the DataFrames
    which will be used for ML
    
    Input : DataFrame
    Output : DataFrame to be used for applying Machine Learning and the modified input DataFrame
    """
    
    
    
    df = clean_data(df)
    
    
    """Defining churn"""
    cancellation_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())   
    df = df.withColumn("churn", cancellation_event("page"))   
    cancelled_users = df.select(['userId']).where(df.churn == 1).groupby('userId').count().toPandas()['userId'].values
    
    #Filling all the users who pressed the 'Cancellation Confirmation' button with 1
    def fill_array(userId, features):
        if(userId in cancelled_users): return 1
        else : return 0
        
    fill_array_udf = udf(fill_array, IntegerType())
    df = df.withColumn("churn", fill_array_udf(col("userId"), col("churn")))
    
    
    
    
    w = Window.partitionBy('userId') #Partitioning the Data by User Id
    df = df.withColumn('last_ts', max('ts').over(w)) #create last timestamp
    df = df.withColumn('first_ts', min('ts').over(w)) #create first timestamp
    
    #This function will convert timestamp to date
    def get_date_from_ts(ts):
        return str(datetime.utcfromtimestamp(ts / 1000).strftime('%Y-%m-%d'))
    
    get_date_from_ts_udf = udf(get_date_from_ts, StringType())
    df = df.withColumn('last_date', get_date_from_ts_udf(col('last_ts'))) #converting last timestamp to date
    df = df.withColumn('first_date', get_date_from_ts_udf(col('first_ts'))) #converting first timestamp to date
    
    
    df = df.withColumn('date', get_date_from_ts_udf(col('ts'))) #converting all timestamps to date
    
    df = df.withColumn('last_level',when(df.last_ts == df.ts, df.level)) #defining the last level (paid or free) of a user
    
    # create column avg_songs to calculate average number of songs per day
    # first grouping on unique (userId, date) pair and then taking average 
    # over all the dates for a particular user
    w = Window.partitionBy('userId', 'date')
    songs = df.where(df.page == 'NextSong').select('userId', 'date', count('userId').over(w).alias('songs')).distinct()
    w = Window.partitionBy('userId')
    songs = songs.withColumn('avg_songs', avg('songs').over(w))
    songs = songs.select(col("userId").alias("songs_userId"), 'avg_songs')
    songs = songs.withColumn("avg_songs", round(songs["avg_songs"], 2))
    
    # create column avg_events to calculate average number of events per day
    # first grouping on unique (userId, date) pair and then taking average 
    # over all the dates for a particular user
    w = Window.partitionBy('userId', 'date')
    events = df.select('userId', 'date', count('userId').over(w).alias('events')).distinct()
    w = Window.partitionBy('userId')
    events = events.withColumn('avg_events', avg('events').over(w))
    events = events.select(col("userId").alias("events_userId"), 'avg_events')
    events = events.withColumn("avg_events", round(events["avg_events"], 2))
    
    # calculate number of thumbs up for a user
    w = Window.partitionBy('userId')
    thumbsup = df.where(df.page == 'Thumbs Up').select('userId', count('userId').over(w).alias('thumbs_up')).distinct()
    thumbsup = thumbsup.select(col("userId").alias("thumbsup_userId"), 'thumbs_up')
    
    # calculate number of thumbs down for a user
    w = Window.partitionBy('userId')
    thumbsdown = df.where(df.page == 'Thumbs Down').select('userId', count('userId').over(w).alias('thumbs_down')).distinct()
    thumbsdown = thumbsdown.select(col("userId").alias("thumbsdown_userId"), 'thumbs_down')
    
    # calculate days since the date of the first event
    df = df.withColumn("days_active", 
              datediff(to_date(lit(datetime.now().strftime("%Y-%m-%d %H:%M"))),
                       to_date("first_date","yyyy-MM-dd")))
    
    # add column with state of the event based on location column
    def get_state(location):
        location = location.split(',')[-1].strip()
        if (len(location) > 2):
            location = location.split('-')[-1].strip()
    
        return location
    
    get_state_udf = udf(get_state, StringType())
    df = df.withColumn('state', get_state_udf(col('location')))
    
    #add column with last location of the user
    df = df.withColumn('last_state',when(df.last_ts == df.ts, df.state))
    
    # calculate number of add friends for a user
    w = Window.partitionBy('userId')
    addfriend = df.where(df.page == 'Add Friend').select('userId', count('userId').over(w).alias('addfriend')).distinct()
    addfriend = addfriend.select(col("userId").alias("addfriend_userId"), 'addfriend')

    # assemble everything into resulting dataset
    df_ml = df.select('userId', 'gender', 'churn', 'last_level', 'days_active', 'last_state')\
    .dropna().drop_duplicates()
    df_ml = df_ml.join(songs, df_ml.userId == songs.songs_userId).distinct()
    df_ml = df_ml.join(events, df_ml.userId == events.events_userId).distinct()
    df_ml = df_ml.join(thumbsup, df_ml.userId == thumbsup.thumbsup_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['thumbs_up'])
    df_ml = df_ml.join(thumbsdown, df_ml.userId == thumbsdown.thumbsdown_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['thumbs_down'])
    df_ml = df_ml.join(addfriend, df_ml.userId == addfriend.addfriend_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['addfriend'])
    df_ml = df_ml.drop('songs_userId','events_userId', 'thumbsup_userId', 'thumbsdown_userId', 'addfriend_userId')
    
    return df, df_ml
    
df = spark.read.json('s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json')
df.persist()

df, df_ml = prepare_dataset(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`3.2` Indexing of String columns and One Hot Encoding

In [7]:
# index categorical features gender, level and state (like Label Encoder in sklearn)

stringIndexerGender = StringIndexer(inputCol="gender", outputCol="genderIndex", handleInvalid = 'skip')
stringIndexerLevel = StringIndexer(inputCol="last_level", outputCol="levelIndex", handleInvalid = 'skip')
stringIndexerState = StringIndexer(inputCol="last_state", outputCol="stateIndex", handleInvalid = 'skip')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# OneHotEncoding these features

encoder_gender = OneHotEncoder(inputCol="genderIndex", outputCol="genderVec")
encoder_level = OneHotEncoder(inputCol="levelIndex", outputCol="levelVec")
encoder_state = OneHotEncoder(inputCol="stateIndex", outputCol="stateVec")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`4` __Applying Machine Learning__

In [10]:
# create vector for features
features = ['genderVec', 'levelVec', 'stateVec', 'days_active', 'avg_songs', 'avg_events', 'thumbs_up', 'thumbs_down', 'addfriend']
assembler = VectorAssembler(inputCols=features, outputCol="features")

# initialize random forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# assemble pipeline
pipeline = Pipeline(stages = [stringIndexerGender, stringIndexerLevel, stringIndexerState, encoder_gender,encoder_level,encoder_state, assembler, rf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# train-test-validation split

df_ml = df_ml.withColumnRenamed("churn", "label")

train, test_valid = df_ml.randomSplit([0.6, 0.4], seed = 42)
test, validation = test_valid.randomSplit([0.5, 0.5], seed = 42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# fitting the model

model = pipeline.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# saving the model in a S3 bucket
model.save('s3://aws-emr-resources-816555935147-us-east-2/notebooks/model')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
model.save('s3://bucket-motua16/model')