# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, last, when, sum, mean, col, ceil, struct
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, LongType, DoubleType
from pyspark.ml import Pipeline
from pyspark.sql.functions import avg, col, concat, count, desc, explode, lit, min, max, split, stddev, udf
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from datetime import datetime

import numpy as np
import re

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1618058617081_0010,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
#Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [18]:
def load_data():
    # specify file path of the dataset
    #sparkify_data = "mini_sparkify_event_data.json"
    #sparkify_data = "medium_sparkify_event_data.json"
    sparkify_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
    #sparkify_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
    
    df = spark.read.json(sparkify_data)
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
def clean_data(df):
    '''
    Values where NA-values must be prevented:
    - userId -> identifying the user
    - page -> which page was accessed: e.g. downgrade/upgrade/next song:
    - registration: the date of the registration 
    - ts -> play-time: needed for analyzing the user activity during the day 
    - level -> free/paid: needed for analyzing the churn rate
    - status -> HTTP status codes: check if user is unsatisfied with Sparkify's availability 
    - method -> HTTP request type: check if user is unsatisfied with Sparkify's availability
    - gender -> check if there is a different behaviour for different gender
    - sessionId -> assures the session was valid

    usefull but not necessary relevant:
    - auth -> "Cancalled / Logged In"
    - firstName -> first name of te user
    - lastName -> last name of the user
    - location -> user geographical location
    - userAgent -> difference between Win/Mac users
    - itemInSession

    Depending in the page event NA-valus in the follwing columns are acceptable:
    - song -> name of the played song
    - artist -> name of the artist 
    - length -> length of the song (do not remove NA's here or the column page only contains "next page")
    '''
    
    df_clean = df.dropna(how = "any", subset = ["userId", "sessionId", "method", "page", "ts", 
                                                "registration", "level", "userAgent", "method",
                                                "status"])
    # cleaning the gender column - assuming the null values can be either null or a other gender
    # later this groups will be converted into numerical values
    df_clean = df_clean.fillna("null/other", subset=["gender"])

    # filter userIds with an empty string
    df_clean = df_clean.filter(df_clean["userId"] != "")

    # user defined function to extract the play hour from the timestamp
    get_hour = udf(lambda x: datetime.fromtimestamp(x / 1000.0).hour, IntegerType())
    # create the a new column "hour" this holds the hour when a user interacted with the system
    df_clean = df_clean.withColumn("hour", get_hour(df.ts))
    
    
    return df_clean

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
def calc_churn_feature(df):
    
    # user defined function for set a churn indicator
    get_churn = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

    # create new column churn
    df_churn = df.withColumn("churnEvent", get_churn(df.page))

    # create window with userId - this is needed to extract the churn users
    user_window = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    # create a new column churn, where a unique churn value is assigned to each user 
    df_churn = df_churn.withColumn("churn", F.max("churnEvent").over(user_window))

    # drop temporary culumn
    df_churn = df_churn.drop(*["churnEvent"])

    return df_churn

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
def load_and_clean_data():
    df = load_data()
    df = clean_data(df)
    df = calc_churn_feature(df)
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df = load_and_clean_data()
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- churn: integer (nullable = true)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

#### Features
The following features are created in the following section:

Categorical features:
* Level (paid / unpaid membership)
* Gender
* Downgrade (user performed a downgrade)
* Location (state)
* Page (event)

Numerical features:
* Number of friends
* Number of Thumbs Up
* Number of Thumbs Down
* Number add to playlist
* Ratio like (Thumbs Up / Thumbs down)
* Number artists
* Number of songs per user
* Churntime (time from registation to cancelling event)

#### Feature Creation

In [23]:
def get_downgrade_feature(df):
    # create window with userId - this is needed to extract the churn users
    user_window = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    # user defined function for indicating a downgrade event
    get_downgrade = udf(lambda x: 1 if x == "Downgrade" else 0, IntegerType())

    # create temporary downgradeEvent column
    df_downgrade = df.withColumn("downgradeEvent", get_downgrade(df.page)) 

    # create new column downgrade 
    df_downgrade = df_downgrade.withColumn("downgrade", F.max("downgradeEvent").over(user_window))
  
    return df_downgrade.select(["userId", "downgrade"]).dropDuplicates(["userId"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
def extract_os(userAgent):    
    if "Firefox" in userAgent:
        return "Firefox"
    elif "Safari" in userAgent:
        if "Chrome" in userAgent:
            return "Chrome"
        else:
            return "Safari"
    elif "Trident" in userAgent:
        return "InternetExplorer"
    else:
        return np.nan

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
def get_os_feature(df):
    get_os = udf(lambda x: extract_os(x))
    return df.withColumn("os", get_os(df.userAgent)).select(["userId", "os"]).dropDuplicates(["userId"]) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
def extract_browser(userAgent):
    if "Windows" in userAgent:
        return "Windows"
    elif "Macintosh" in userAgent:
        return "MacOS"
    elif "iPhone" in userAgent:
        return "iPhone"
    elif "iPad" in userAgent:
        return "iPad"
    elif "Linux" in userAgent:
        return "Linux"
    else:
        np.nan
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
def get_browser_feature(df):
    get_browser = udf(lambda x: extract_browser(x))
    return df.withColumn("browser", get_browser(df.userAgent)).select(["userId", "browser"]).dropDuplicates(["userId"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
def get_feature_dataframe(df):
    '''
    INPUT: 
    df - (pyspark dataframe) cleaned dataframe
    
    OUTPUT:
    df_result - (pyspark dataframe) dataframe with numerical features
    
    Description:
    This function computes several numerical features:
    - friends: Number of friends per Sparkify user
    - dislike: Number of dislikes
    - like: Number of likes
    - like-ration: ratio between likes and dislikes
    - churn-time: time from registration to churn in days
    - avg-songs-in-session: average amount of songs a user played in a session
    - advert: number of advert
    '''
    
    df_downgrade = get_downgrade_feature(df)
    
    df_browser = get_browser_feature(df)
     
    df_os = get_os_feature(df)
        
    df_friends = df.where("page = 'Add Friend'").groupby("userId").count().dropDuplicates(["userId"])\
                   .withColumn("numFriends", col("count")).drop("count")
        
    df_dislike = df.where("page = 'Thumbs Down'").groupby("userId").count().dropDuplicates(["userId"])\
                   .withColumn("numDislikes", col("count")).drop("count")

    df_like = df.where("page = 'Thumbs Up'").groupby("userId").count().dropDuplicates(["userId"])\
                .withColumn("numLikes", col("count")).drop("count")
    
    df_playlist = df.where("page = 'Add to Playlist'").groupby("userId").count().dropDuplicates(["userId"])\
                    .withColumn("playlistSize", col("count")).drop("count")
    
    df_likeRatio = df_dislike.join(df_like, on="userId", how="left")\
                             .withColumn("like-ratio", col("numLikes")/col("numDislikes"))
    
    df_time = df.groupby("userId").agg(F.max("ts").alias("last_access"))
    ms_per_day = 86400000 # milliseconds per day
    df_time = df_time.join(df, on="userId").withColumn("days", ((col("last_access")-col("registration"))/ms_per_day)\
                                                             .cast(IntegerType())).dropDuplicates(["userId"])\
                                                             .dropDuplicates(["userId"]).select("userId", "days")

    df_songs_in_session = df.groupby("userId", "ts").agg(max("itemInSession")).groupby("userId")\
                                                    .avg("max(itemInSession)")\
                                                    .withColumnRenamed("avg(max(itemInSession))", "avg_song_session")
    
    df_advert = df.where("page = 'Roll Advert'").groupby("userId").count().dropDuplicates(["userId"])\
                  .withColumn("numAdvert", col("count")).drop("count")
    
    df_result = df_downgrade.join(df_friends, on="userId", how="inner")\
                            .join(df_browser, on="userId", how="inner")\
                            .join(df_os, on="userId", how="inner")\
                            .join(df_likeRatio, on="userId", how="inner")\
                            .join(df_playlist, on="userId", how="inner")\
                            .join(df_time, on="userId", how="inner")\
                            .join(df_songs_in_session, on="userId", how="inner")\
                            .join(df_advert, on="userId", how="inner")
    
    # add data for categorical features and add churn value
    df_result = df_result.join(df.select(["userId", "churn", "gender", "level"]), on="userId", how="inner")

    return df_result.dropDuplicates(["userId"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# calculate numerical features
df_features = get_feature_dataframe(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# print dataframe schema
df_features.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- downgrade: integer (nullable = true)
 |-- numFriends: long (nullable = false)
 |-- browser: string (nullable = true)
 |-- os: string (nullable = true)
 |-- numDislikes: long (nullable = false)
 |-- numLikes: long (nullable = true)
 |-- like-ratio: double (nullable = true)
 |-- playlistSize: long (nullable = false)
 |-- days: integer (nullable = true)
 |-- avg_song_session: double (nullable = true)
 |-- numAdvert: long (nullable = false)
 |-- churn: integer (nullable = true)
 |-- gender: string (nullable = false)
 |-- level: string (nullable = true)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

Classification problem
* Logistic Regression
* Decision Trees
* Gradient Boosting Trees
* SVM
* Naive Bayes

#### Create dataset for the Model
Convert the dataset ```df_features``` into a dataset which can directly be used in ML models using a Vector Assembler and Features Scaler (using StandardScaler).

In [31]:
# list of numerical features
numerical_features = ["numFriends", "numDislikes", "numLikes", "like-ratio", "playlistSize", "days", 
                      "avg_song_session", "numAdvert", "downgrade"] 

# list of categorical features
categorical_features = ["genderFeat", "levelFeat", "downgradeFeat", "osFeat", "browserFeat"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
def create_categorical_features(df, columns_list):
    '''
    INPUT:
    columns_list - (list) column names of categorical features 
    df - (pyspark dataframe) dataframe
    
    OUTPUT:
    df - (pyspark dataframe) dataframe with categorical features
    
    Desciption:
    For each element in the columns list a categorical feature will
    be created using a StringIndexer.
    The original columns in columns_list will be replaced with the
    feature.
    '''
    for col in columns_list:
        indexer = StringIndexer(inputCol=col[:-4], outputCol=col)
        df = indexer.fit(df).transform(df)
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
def create_preprocessing_pipeline(numerical_features = numerical_features, categorical_features = categorical_features):
    '''
    INPUT:
    numerical_features - (list) a list of strings naming the columns of numerical features
    categorical_features - (list) a list of strings naming the columns of categorical features
    
    OUTPUT:
    pipeline - (pyspark pipeline) a pipeline which includes all necessary processing steps, including a 
               VetorAssembler and a Standard Scaler
               
    Description:
    Build a pyspark pipeline for transforming the data in the desired layout.
    Important note: the categorical features were already converted using a String Indexer in a previous step.
    '''
    # numieric features: feature vectorizer and scaler 
    assembler_num = VectorAssembler(inputCols = numerical_features, 
                                    outputCol = "numerical_features")
    
    feature_scaler_num = StandardScaler(withMean = True, withStd = True, 
                                   inputCol = "numerical_features", 
                                   outputCol = "scaled_features")

    # categorical features: 
    assembler_cat = VectorAssembler(inputCols = categorical_features+["scaled_features"], 
                                    outputCol = "features")
    
    # create pipeline
    pipeline = Pipeline(stages = [assembler_num, feature_scaler_num, assembler_cat])
    
    return pipeline

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
def create_model_dataset(df, numerical_features = numerical_features, categorical_features = categorical_features):
    '''
    INPUT:
    df_features - (pyspark dataframe) a dataframe which includes all features
    numerical_features - (list) a list of strings naming the columns of numerical features
    categorical_features - (list) a list of strings naming the columns of categorical features
    
    OUTPUT:
    df - (pyspark dataframe) which includes all columns from the input dataset, but extendet 
         with the features in an additional representation (defined with pipeline)
    '''
    # create categorical features
    df = create_categorical_features(df, categorical_features)
    
    # create the pipeline
    pipeline = create_preprocessing_pipeline(numerical_features, categorical_features)

    # fit and transform the data
    df = pipeline.fit(df).transform(df)
    
    # rename column churn into label (label is more common usage in ML algorithms) 
    df = df.withColumnRenamed("churn", "label")
    
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_model = create_model_dataset(df_features, numerical_features, categorical_features)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_model.printSchema()

In [None]:
df_model.select("features").head(1)

#### Model creation

In [None]:
def create_and_evaluate_model(classifier, train, test):

    if classifier == "LogisticRegression":
        clf = LogisticRegression(maxIter=10,regParam=0.0,elasticNetParam=0)
    elif classifier == "RandomForestClassifier":
        clf = RandomForestClassifier()
    elif classifier == "GBTClassifier":
        clf = GBTClassifier(maxIter=10,seed=42)
    elif classifier == "LinearSVC":
        clf = LinearSVC(maxIter=100)
    else:
        return "Error: Invalid model."
    
    # fit (train) model
    model = clf.fit(train)

    # make prediction for unknown test data
    predictions = model.transform(test)
    
    # Select (prediction, true label) and compute test error
    # based on pyspark documentation: https://spark.apache.org/docs/latest/ml-classification-regression.html
    # https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.MulticlassClassificationEvaluator.html
    evaluator = MulticlassClassificationEvaluator(metricName = "f1",
                                                  labelCol = "label",
                                                  predictionCol = "prediction")
    
    f1_score = evaluator.evaluate(predictions)
    
    evaluator.setMetricName("accuracy")
    accuracy = evaluator.evaluate(predictions)

    print("Classifier: {} - F1-Score: {:.2f} - accuracy: {:.2f}".format(classifier, f1_score, accuracy))
    
    return f1_score
    

In [None]:
# split the model data into train and test-set
train, test = df_model.randomSplit([0.8, 0.2], seed=42)

In [None]:
def find_best_model(train, test, classifier=["LogisticRegression", "RandomForestClassifier", "GBTClassifier", "LinearSVC"]):
    
    for clf in classifier:
        create_and_evaluate_model(clf, train, test)
        

In [None]:
find_best_model(train, test, ["LinearSVC"])

In [None]:
find_best_model(train, test, ["GBTClassifier"])#, "LinearSVC"])
# find_best_model(train, test, ["GBTClassifier"])

In [None]:
find_best_model(train, test, ["RandomForestClassifier"])#, "GBTClassifier", "LinearSVC"])
# find_best_model(train, test, ["GBTClassifier"])

In [None]:
find_best_model(train, test, ["LogisticRegression"])#, "RandomForestClassifier", "GBTClassifier", "LinearSVC"])
# find_best_model(train, test, ["GBTClassifier"])

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.