# Group Level Sparkify

In [1]:
# find spark content
import findspark
findspark.init()

# Pre-processing packages
import numpy as np
import pandas as pd
import datetime
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, desc, col, max as scala_max, sum as scala_sum
from pyspark.sql.functions import sum as scala_sum, unix_timestamp, avg as scala_avg, count as scala_count
from pyspark.sql.functions import from_unixtime, col
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.types import StringType, BooleanType, TimestampType
from pyspark.sql import Window

# Machine Learning Packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

# # Logistic Regression
# from pyspark.ml.classification import LogisticRegression

# Random Forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator

# # Gradient-bossted Tree Regression
# from pyspark.ml.regression import GBTRegressor

In [2]:
# create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

---

# Load and Clean Dataset

In [3]:
# Read in full sparkify dataset
event_data = "mini_sparkify_event_data.json"
df = spark.read.json(event_data)

---

# Pre-processing

## Cleaning Date

In [4]:
# date addition
to_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).strftime("%Y-%m-%d"))
df = df.withColumn("date", to_datetime(df.ts))

# datime addition
to_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).isoformat())
df = df.withColumn("datetime", to_datetime(df.ts))

---

## Cancellation Phase

In [5]:
# window function for values unbounded preceding the label
window_fun = (Window.partitionBy("userId")
             .orderBy(desc("ts"))
             .rangeBetween(Window.unboundedPreceding, 0))

# flagging cancellations
def user_defined_function(phase): 
    return udf(lambda x: 
               1 if x == phase
               else 0, IntegerType())
    
flag_cancel_func = user_defined_function("Cancellation Confirmation")

# adding downgrade indicator indicator
df = df.withColumn("cancel_ind", flag_cancel_func("page"))

# adding churn flag
df = df.withColumn("cancel_user", 
                   scala_sum("cancel_ind").over(window_fun))

---

## Downgrade Phase

In [6]:
# flagging downgrades
flag_downgraded_func = user_defined_function("Submit Downgrade")

# adding indicator
df = df.withColumn("downgrade_ind", flag_downgraded_func("page"))

# sum flags over the window statement
df = df.withColumn("downgrade_phase", 
                   scala_sum("downgrade_ind").over(window_fun))

---

## Period Between Downgrade

### Period Spark Version

In [7]:
period_function = udf(lambda page: int(page=='Downgrade'), IntegerType())

over_ts_func = (Window.partitionBy("userId")
    .orderBy(desc("ts"))
    .rangeBetween(Window.unboundedPreceding, 0))

df_ready = (df.filter((df.page == 'NextSong') | (df.page == 'Downgrade'))
    .select(["userId", "page", "gender", "length", "ts"])
    .withColumn("downgrade_ind", period_function(col("page")))
    .withColumn("period", scala_sum("downgrade_ind").over(over_ts_func))
).filter(df.userId == '200011')

### Period SQL Version

In [8]:
df.createOrReplaceTempView("sparkify_table")

downgrade_indicator = spark.sql("""
SELECT 
    userId, 
    page, 
    gender,
    level,
    length,
    date,
    ts,
    CASE 
        WHEN page = 'Downgrade' 
        THEN 1 
        ELSE 0 
    END AS downgrade_ind,
    cancel_user
FROM sparkify_table
WHERE (page = 'NextSong') or (page = 'Downgrade')
""")

downgrade_indicator.createOrReplaceTempView('downgrade_table')

df_ready = spark.sql("""
SELECT
    *,
    SUM(downgrade_ind)
    OVER(PARTITION BY userId 
        ORDER BY ts DESC 
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS downgrade_period
FROM downgrade_table
--WHERE userId = 200011
""")

df_ready = df_ready.drop('downgrade_ind')

df_ready.head()

Row(userId='100010', page='NextSong', gender='F', level='free', length=185.25995, date='2018-11-21', ts=1542823951000, cancel_user=0, downgrade_period=0)

## Gender Endoder

In [9]:
df.select('gender').distinct().show()

+------+
|gender|
+------+
|     F|
|  null|
|     M|
+------+



*null genders will have 0 and 0 for both Male and Female columns.

In [10]:
male_encoder = udf(lambda x: 1 if x == 'M' else 0, IntegerType())
df_ready = df_ready.withColumn("Male", male_encoder(df_ready.gender))

female_encoder = udf(lambda x: 1 if x == 'F' else 0, IntegerType())
df_ready = df_ready.withColumn("Female", female_encoder(df_ready.gender))

df_ready = df_ready.drop('gender')

## Level Encoder

In [11]:
df.select('level').distinct().show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [12]:
free_encoder = udf(lambda x: 1 if x == 'free' else 0, IntegerType())
df_ready = df_ready.withColumn('free', free_encoder(df_ready.level))

paid_encoder = udf(lambda x: 1 if x == 'paid' else 0, IntegerType())
df_ready = df_ready.withColumn('paid', paid_encoder(df_ready.level))

df_ready = df_ready.drop('level')

## Feature Engineering

### Daily Seconds Used

In [13]:
df_ready.printSchema()

root
 |-- userId: string (nullable = true)
 |-- page: string (nullable = true)
 |-- length: double (nullable = true)
 |-- date: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- cancel_user: long (nullable = true)
 |-- downgrade_period: long (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Female: integer (nullable = true)
 |-- free: integer (nullable = true)
 |-- paid: integer (nullable = true)



In [14]:
over_date_func = (Window.partitionBy(["userId", "date"]))

# also filtering only on songs
df_ready = df_ready.filter((df.page == 'NextSong'))\
    .withColumn("DailySeconds", scala_sum("length").over(over_date_func))

df_ready = df_ready.drop('length').drop('page')

### Daily Use Count

In [15]:
df_ready = df_ready.withColumn("DailyUse", scala_count("userId").over(over_date_func))

# Processing

### Feature Columns

In [16]:
# get available columns
feature_columns = df_ready.columns

# removed column
for col in ['userId', 'date', 'cancel_user', 'ts']:
    feature_columns.remove(col)

print("Feature Columns")
feature_columns

Feature Columns


['downgrade_period',
 'Male',
 'Female',
 'free',
 'paid',
 'DailySeconds',
 'DailyUse']

## Random Forest Classifier Model

### Table Prep

In [17]:
data = df_ready

# modeling the data
feature_assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = feature_assembler.transform(data)

data.printSchema()

root
 |-- userId: string (nullable = true)
 |-- date: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- cancel_user: long (nullable = true)
 |-- downgrade_period: long (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Female: integer (nullable = true)
 |-- free: integer (nullable = true)
 |-- paid: integer (nullable = true)
 |-- DailySeconds: double (nullable = true)
 |-- DailyUse: long (nullable = false)
 |-- features: vector (nullable = true)



In [18]:
labelIndexer = StringIndexer(inputCol="cancel_user", outputCol="indexedLabel").fit(data)
labelIndexer.labels

['0', '1']

In [19]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

featureIndexer.categoryMaps

{1: {0.0: 0, 1.0: 1}, 2: {0.0: 0, 1.0: 1}, 3: {0.0: 0, 1.0: 1}, 4: {0.0: 0, 1.0: 1}}

In [20]:
# modeling predictions
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

data.printSchema()

root
 |-- userId: string (nullable = true)
 |-- date: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- cancel_user: long (nullable = true)
 |-- downgrade_period: long (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Female: integer (nullable = true)
 |-- free: integer (nullable = true)
 |-- paid: integer (nullable = true)
 |-- DailySeconds: double (nullable = true)
 |-- DailyUse: long (nullable = false)
 |-- features: vector (nullable = true)



### Train & Test Split

In [21]:
trainingData, testData = data.randomSplit([0.7, 0.3], seed=42)
trainingData.count(), testData.count()

(160001, 68107)

### Pipelines

In [22]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

### No Validation Model Implementation

In [23]:
model = pipeline.fit(trainingData)

In [24]:
pred = model.transform(testData)

In [25]:
for metric_name in ['accuracy', 'weightedPrecision', 'weightedRecall', 'f1']:
    evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName=metric_name)
    accuracy = evaluator.evaluate(pred)
    print(f"{metric_name} = {accuracy:2.2%}")

accuracy = 84.08%
weightedPrecision = 86.62%
weightedRecall = 84.08%
f1 = 77.00%


### Cross Validation Implementation 

In [26]:
paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [10, 20, 30])\
    .addGrid(rf.maxDepth, [4, 8, 12])\
    .build()

In [27]:
crossval = CrossValidator(estimator=pipeline, 
                          estimatorParamMaps=paramGrid, 
                          evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', 
                                                                      predictionCol='prediction', 
                                                                      metricName='accuracy'), 
                          numFolds=3)

In [None]:
cvmodel = crossval.fit(trainingData)

In [None]:
cv_pred = cvmodel.transform(testData)

In [None]:
cv_pred.printSchema()

In [None]:
cv_pred.select("predictedLabel", "indexedLabel", "features").show(5)

# Testing MulticlassMetrics

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
metrics = MulticlassMetrics(cv_pred.rdd.map(lambda x: (x.prediction, x.indexedLabel)))

In [None]:
# Overall statistics
accuracy = metrics.accuracy
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Accuracy = %s" % accuracy)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

In [None]:
# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)