In [111]:
# Starter code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf, isnan, count, when, desc, sort_array, asc, avg, lag, floor
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import sum as Fsum
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler #used because won't distort binary vars
from pyspark.sql.types import DoubleType
import datetime

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import numpy as np

# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
data = spark.read.json(event_data)
data.head()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.46.197:18888/sessions/0 with error payload: "Session '0' not found."


In [89]:
#define necessary functions
def feature_engineering(df):
    '''
    Create necessary features to use machine learning algorithms.
    First loads data set from file
    
    Resulting DF Strucutre:
    
    root
     |-- userId: string
     |-- downgraded: long
     |-- cancelled: long
     |-- visited_cancel: long
     |-- visited_downgrade: long
     |-- dailyHelpVisits: double
     |-- dailyErrors: double
     |-- free: integer
     |-- paid: integer
     |-- avgThumbsUp: double
     |-- avgThumbsDOwn: double
     |-- numFriends: long
     |-- avgSongsTillHome: double
     |-- avgTimeSkipped: double
     |-- skipRate: double
    
    Inputs
        filepath (str) - path to json dataset on file
        
    Outputs
        data - engineered dataset
    '''
    #dataframe of user ids to merge onto
    users = df.where((df.userId != "") | (df.sessionId != ""))\
        .select('userId').dropDuplicates()
    
    #define custom functions
    churn = udf(lambda x: int(x=="Cancellation Confirmation"), IntegerType())
    downgrade_churn = udf(lambda x: int(x=="Submit Downgrade"), IntegerType())
    visited_downgrade = udf(lambda x: int(x=='Downgrade'), IntegerType())
    visited_cancel = udf(lambda x: int(x=='Cancel'), IntegerType())
    song = udf(lambda x: int(x=='NextSong'), IntegerType())
    days = lambda i: i * 86400 
    get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000), DateType())
    skipped = udf(lambda x: int(x!=0), IntegerType())
    free = udf(lambda x: int(x=='free'), IntegerType())
    paid = udf(lambda x: int(x=='paid'), IntegerType())
    home_visit=udf(lambda x : int(x=='Home'), IntegerType())
    
    #define windows
    windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
    session = Window.partitionBy("userId", "sessionId").orderBy(desc("ts"))
    daywindow = Window.partitionBy('userId', 'date').orderBy(desc('ts'))\
        .rangeBetween(Window.unboundedPreceding, 0)

    avgThumbsUp = df.filter(df.page=='Thumbs Up')\
        .select('userId', 'page', 'ts')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'}).groupBy('userId')\
        .mean().withColumnRenamed('avg(count(page))', 'avgThumbsUp')
    
    avgThumbsDown = df.filter(df.page=='Thumbs Down')\
        .select('userId', 'page', 'ts')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'})\
        .groupBy('userId').mean()\
        .withColumnRenamed('avg(count(page))', 'avgThumbsDown')
    
    numFriends = df.filter(df.page=='Add Friend')\
        .select('userId', 'page')\
        .groupBy('userId').count().withColumnRenamed('count', 'numFriends')
    
    '''
    process for calculating skipping variables

    1. dont include thumbs up and down pages because that usually occurs 
        while playing and does not change song
    2. create variable for if action is song
    3. check if next action is song - this will check to see if someone is 
        skipping song or just leaving page
    4. get the difference in timestamp for next action song playing
    5. subtract the difference in timestamp from song length to see 
        how much of song was skipped
    6. get descriptive stats
    '''

    skipping = df.select('userId', 'page', 'ts', 'length', 'sessionId', 'itemInSession')\
        .where((df.page != 'Thumbs Up') & (df.page != 'Thumbs Down'))\
        .withColumn('song', song('page')).orderBy('userId', 'sessionId', 'itemInSession')\
        .withColumn('nextActSong', lag(col('song')).over(session))\
        .withColumn('tsDiff', (lag('ts').over(session)-col('ts'))/1000)\
        .withColumn('timeSkipped', (floor('length')-col('tsDiff')))\
        .withColumn('roundedLength', floor('length'))\
        .where((col('song')==1) & ((col('nextActSong')!=0)&(col('timeSkipped')>=0)))\
        .withColumn('skipped', skipped('timeSkipped'))\
        .select('userId', 'timeSkipped', 'skipped', 'length', 'ts', 'tsDiff')\
        .groupBy('userId').agg({'skipped':'avg', 'timeSkipped':'avg'})\
        .withColumnRenamed('avg(skipped)', 'skipRate')\
        .withColumnRenamed('avg(timeSkipped)', 'avgTimeSkipped')
    
#avg daily visits to help site
    dailyHelpVisit = df.filter(df.page=='Help')\
        .select('userId', 'page', 'ts', 'length')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'})\
        .groupBy('userId').mean()\
         .withColumnRenamed('avg(count(page))', 'dailyHelpVisits')

    dailyErrors = df.filter(df.page=='Error')\
        .select('userId', 'page', 'ts', 'length')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'})\
        .groupBy('userId').mean()\
        .withColumnRenamed('avg(count(page))', 'dailyErrors')
    
    #whether a user has downgraded
    churn = df.withColumn("downgraded", downgrade_churn("page"))\
        .withColumn("cancelled", churn("page"))\
        .withColumn('visited_cancel', visited_cancel('page'))\
        .withColumn('visited_downgrade', visited_downgrade('page'))\
        .select(['userId', 'downgraded', 'cancelled', 'visited_cancel', 'visited_downgrade'])\
        .groupBy('userId').sum()\
        .withColumnRenamed('sum(downgraded)', 'downgraded')\
        .withColumnRenamed('sum(cancelled)', 'cancelled')\
        .withColumnRenamed('sum(visited_cancel)', 'visited_cancel')\
        .withColumnRenamed('sum(visited_downgrade)', 'visited_downgrade')

    user_level = df.select('userId', 'level')\
        .where((df.level=='free')|(df.level=='paid'))\
        .dropDuplicates()\
        .withColumn('free', free('level'))\
        .withColumn('paid', paid('level')).drop('level')

    cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
        .select('userID', 'page', 'ts') \
        .withColumn('homevisit', home_visit(col('page'))) \
        .withColumn('songPeriod', Fsum('homevisit').over(windowval))\
    
    avgSongsTillHome = cusum.filter((cusum.page=='NextSong'))\
        .groupBy('userId', 'songPeriod')\
        .agg({'songPeriod':'count'}).drop('songPeriod')\
        .groupby('userId').mean()\
        .withColumnRenamed('avg(count(songPeriod))', 'avgSongsTillHome')
    
    df = users.join(churn, on='userId')\
        .join(dailyHelpVisit, on='userId')\
        .join(dailyErrors, on='userId')\
        .join(user_level, on='userId')\
        .join(avgThumbsUp, on='userId')\
        .join(avgThumbsDown, on='userId')\
        .join(numFriends, on='userId')\
        .join(avgSongsTillHome, on='userId')\
        .join(skipping, on='userId')
    return df

def feature_scaling(df):
    feature_cols = df.drop('userId', 'cancelled').columns
    assembler = VectorAssembler(inputCols=feature_cols,\
                                outputCol='feature_vec')
    
    #pyspark.ml expects target column to be names: 'labelCol', w/ type: Double
    df = df.withColumn("label", df["cancelled"].cast(DoubleType()))
    
    #pyspark default name for features vector column: 'featuresCol'
    minmaxscaler = MinMaxScaler(inputCol="feature_vec", outputCol="features")
    
    df = assembler.transform(df)
    minmaxscaler_model = minmaxscaler.fit(df)
    scaled_df = minmaxscaler_model.transform(df)
    return scaled_df

def custom_evaluation(pred, model_name):
    '''
    Perform custom evaluation of predictions
    
    1.inspect with PySpark.ML evaluator (will use for pipeline)
    2. use RDD-API; PySpark.MLLib to get metrics based on predictions 
    3. display confusion matrix
    
    Inspiration from: https://chih-ling-hsu.github.io/2018/09/17/spark-mllib
    https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html
    https://stackoverflow.com/questions/35572000/how-can-i-plot-a-confusion-matrix
    
    Inputs
        preds(PySpark.ml.DataFrame) - predictions from model
    '''
    #want to evaluate binary class, auc_pr is best for imbalanced classes
    tn_sum = pred.filter((pred.label == 0)&(pred.prediction==0)).count() #true negative
    fn_sum = pred.filter((pred.label == 1)&(pred.prediction==0)).count() #false negative
    fp_sum = pred.filter((pred.label == 0)&(pred.prediction==1)).count() #false positive
    tp_sum = pred.filter((pred.label == 1)&(pred.prediction==1)).count() #true positive

    print("{} \n | tn:{}| fn:{}| fp:{}| tp:{}".format(model_name, tn_sum, fn_sum, fp_sum, tp_sum))

VBox()

In [12]:
#prepare data for ML
df = feature_engineering(data)
df_scaled = feature_scaling(df)
df.persist()
df_scaled.persist() #keep operations in memory to reduce computational time when testing

VBox()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 54034)
----------------------------------------
DataFrame[userId: string, downgraded: bigint, cancelled: bigint, visited_cancel: bigint, visited_downgrade: bigint, dailyHelpVisits: double, dailyErrors: double, free: int, paid: int, avgThumbsUp: double, avgThumbsDown: double, numFriends: bigint, avgSongsTillHome: double, avgTimeSkipped: double, skipRate: double, label: double, feature_vec: vector, features: vector]
Traceback (most recent call last):
  File "/usr/lib64/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python2.7/SocketServer.py"

In [18]:
#split data for training
train, rest = df_scaled.randomSplit([0.85, 0.15], seed=42)
validation, test = rest.randomSplit([0.5,0.5], seed=42)

VBox()

In [93]:
#random forest classifier model
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=10)
rf_model = rf.fit(train)
rf_preds = rf_model.transform(validation)
custom_evaluation(rf_preds, 'Random Forest')

VBox()

Random Forest 
 | tn:1043| fn:0| fp:0| tp:279

In [94]:
#gradient boosted trees (ie ada boost)
from pyspark.ml.classification import GBTClassifier
gbtrees = GBTClassifier(maxIter=10)
gbtree_model = gbtrees.fit(train)
gbtree_preds = gbtree_model.transform(validation)
custom_evaluation(gbtree_preds, 'Gradient Boosted Trees')

VBox()

Gradient Boosted Trees 
 | tn:1043| fn:0| fp:0| tp:279

In [104]:
gbt_paramMap=gbtrees.extractParamMap()
gbt_paramexpl = gbtrees.explainParams()

VBox()

In [95]:
#SVM: https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-support-vector-machine
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(maxIter=10, regParam=0.1)
svm_model=svm.fit(train)
svm_preds=svm_model.transform(validation)
custom_evaluation(svm_preds, 'Support Vector Machine')

VBox()

Support Vector Machine 
 | tn:1043| fn:0| fp:0| tp:279

In [92]:
#logistic regression model
from pyspark.ml.classification import LogisticRegression
logReg = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = logReg.fit(train)
lr_preds = lrModel.transform(validation)

custom_evaluation(lr_preds, 'Logistic Regression')

VBox()

Logistic Regression 
 | tn:1043| fn:279| fp:0| tp:0

In [107]:
import time
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#gbtree cross val
#https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.GBTClassifier
gbtrees_paramGrid = ParamGridBuilder() \
    .addGrid(gbtrees.featureSubsetStrategy,['all', 'onethird', 'sqrt', 'log2'])\
    .addGrid(gbtrees.maxDepth,[1,5,10])\
    .addGrid(gbtrees.maxIter,[5,10,20])\
    .addGrid(gbtrees.stepSize,[0.1, 0.25, 0.5])\
    .addGrid(gbtrees.minInstancesPerNode,[1,3,5])\
    .addGrid(gbtrees.maxBins,[20,32,64])\
    .build()

gbtrees_crossval = CrossValidator(estimator=gbtrees,
                          estimatorParamMaps=gbtrees_paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

start=time.time()
gbt_cv_model = gbtrees_crossval.fit(train)
end=time.time()
print("time for cv: {}".format(end-start))

gbt_cv_pred = gbt_cv_model.transform(test)

VBox()

An error was encountered:
Invalid status code '400' from https://172.31.46.197:18888/sessions/0/statements/106 with error payload: "requirement failed: Session isn't active."
