### Read data and imports

In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import types as T
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import lit, udf, struct, countDistinct, collect_list, avg, count, col
from pyspark.sql.types import ArrayType, BooleanType, LongType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1552102017728_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# the list of all pages in the service 
pages = df.select('page').distinct().toPandas()['page'].values

VBox()

### Feature engineering.

#### Next several functions are needed to detect customer churn from  a sequnce of imteractions

In [3]:
# Auxiliary functions to check if user churned

def churn_state(levels):
    ''' from a list of user account levels (paid/free) creates a list 
        of booleans indicating if user churned after given moment action'''
    if len(levels)>1:
        change = (np.array(levels[1:])!=np.array(levels[:-1]))
        churned = (change & (np.array(levels[:-1])=='paid'))
        churned = np.insert(churned, 0, False)
    else:
        churned = np.array([False])
    return churned

def was_paying(levels):
    ''' determines if user was on paid level'''
    return (np.array(levels)=='paid').any().tolist()

def churned(levels):
    ''' determines if there was a churn'''
    return churn_state(levels).any().tolist()

def num_first_churn(levels):
    ''' returns the index of the interaction after the first churn event happened'''
    did_churn = churned(levels)
    churn_states = churn_state(levels)
    #assert len(churn_states)>0
    if not did_churn:
        num = len(churn_states)-1
    else:    
        num = np.argmax(churn_states)
    return num

def time_first_churn(levels, timestamps):
    ''' returns timestamp of the first churn event'''
    return timestamps[num_first_churn(levels)]

VBox()

#### The next function creates a table with churn informations
For every user it indicates if the user churned and provides the timestamp of the moment of the churn

In [4]:
def get_churn_info(log_df):
    '''
    creates a dataframe of users indicating if user churned and the timestamp of the first churn event.
    args:
        log_df: dataframe of user events
    returns:
        churn_info: dataframe with user churn information
    '''
    convert = udf(churned, BooleanType())
    paid = udf(was_paying, BooleanType())
    _max = udf(lambda x: max(x), LongType())
    _min = udf(lambda x: min(x), LongType())
    get_churn_time = udf(lambda x: time_first_churn(x[0],x[1]), LongType())
    
    churn_info = log_df.groupby('userId').agg({'level':'collect_list','ts':'collect_list'})
    churn_info = churn_info.withColumn('last_ts', _max('collect_list(ts)'))
    churn_info = churn_info.withColumn('first_ts', _min('collect_list(ts)'))
    churn_info = churn_info.withColumn('churned', convert('collect_list(level)'))
    churn_info = churn_info.withColumn('was_paying', paid('collect_list(level)'))
    churn_info = churn_info.withColumn('first_churn_time', get_churn_time(struct('collect_list(level)', 'collect_list(ts)')))
    churn_info = churn_info.filter("was_paying=True")
    churn_info = churn_info.select(['userId','churned','first_churn_time','first_ts','last_ts'])
    return churn_info

VBox()

#### The next function creates the feature dataset from user log data
For every user the dataset contains binary label indicating if user churned and provides user features:
average session duration and fraction of the session time user spends on each of the 22 pages of the service.

In [5]:
def user_stats(log_df):
    '''
    function that creates features dataset from log dataset
    args:
        log_df: dataframe of user events
    returns:
        features_df: dataframe with user service usage statistics:
            'userId': user id
            'avg_session_duration': average session duration
            'avg(page)': for every page in the list of service pages, the avergae time of using the page per session
            'nr_of_sessions': number of sessions
    '''
    churn_info = get_churn_info(log_df)
    relevant_df = log_df.filter("userId != ''")
    relevant_df = relevant_df.join(churn_info, on='userId')
    is_before_period = udf(lambda x: (x[0]<=x[1]), BooleanType())
    
    relevant_df = relevant_df.withColumn('is_before_period', is_before_period(struct('ts', 'first_churn_time')))
    relevant_df = relevant_df.filter("is_before_period = True")
    
    number_of_sessions = relevant_df.groupby('userId').agg(countDistinct('sessionId')).\
                            withColumnRenamed('count(DISTINCT sessionId)', 'nr_of_sessions')
    _length = udf(lambda x: max(x)-min(x), LongType())
    avg_session_duration = df.filter("userId != ''").groupby(['userId','sessionId']).agg(collect_list('ts').alias('ts'))
    avg_session_duration = avg_session_duration.withColumn('duration', _length('ts'))
    avg_session_duration = avg_session_duration.select(['userId','sessionId','duration'])
    avg_session_duration = avg_session_duration.groupby('userId').agg(avg('duration').alias('avg_session_duration'))
    
    pages_per_session = relevant_df.groupby(['userId','sessionId']).agg(collect_list('page').alias('pages'))
    for page in pages:
        count_page = udf(lambda x: x.count(page)/len(x))
        pages_per_session = pages_per_session.withColumn(page, count_page('pages'))
    pages_per_session = pages_per_session.drop('pages')
    expr = {page:'avg' for page in pages}
    pages_per_session = pages_per_session.groupby('userId').agg(expr)
    
    features_df = churn_info.join(pages_per_session, on='userId')
    features_df = features_df.join(avg_session_duration, on='userId')
    features_df = features_df.join(number_of_sessions, on='userId')
    for col in ['was_paying','first_churn_time']:
        features_df = features_df.drop(col)
        
    return features_df

VBox()

### Model training

#### Feature scaling

In [None]:
user_data = user_stats(df)
user_data = user_data.drop('userId')
feature_columns = [col for col in user_data.columns if col!='churned']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
X = assembler.transform(user_data)['features','churned']
X = X.withColumn('churned_num', X.churned.cast('integer'))['features','churned_num']

train, test = X.randomSplit([0.7, 0.3], seed=42)
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler = scaler.fit(train)
train = scaler.transform(train)['scaled_features','churned_num']

#### Logistic regression training and evaluating on the test set

In [7]:
lr = LogisticRegression(featuresCol="scaled_features", labelCol="churned_num")
lr = lr.fit(train)
test = scaler.transform(test)['scaled_features','churned_num']
predictions = lr.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol='churned_num')
print('Logistic Regression, test set perfromance, Area Under ROC:', evaluator.evaluate(predictions))

VBox()

('Logistic Regression, test set perfromance, Area Under ROC:', 0.7728633058054973)

#### ROC curve for logistic regression classifier

In [None]:
def show_roc(ax, predictions, labels, title='ROC curve'):
    ''' outputs the roc curve on ax axes'''
    prediction_prob = predictions.toPandas()['probability'].apply(lambda x:x[1]).values
    fpr, tpr, _ = roc_curve(labels, prediction_prob)
    ax.plot(fpr, tpr)
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title(title)

fig = plt.figure()
ax = fig.add_subplot(111)
show_roc(ax, predictions, labels)
plt.save_fig('lr_roc.png')

#### Gradient boosted trees training and evaluating on the test set

In [8]:
gbt = GBTClassifier(featuresCol="scaled_features", labelCol="churned_num", maxIter=10)
model = gbt.fit(train)
predictions_gb = model.transform(test)
print('Gradient boosted trees, test set perfromance, Area Under ROC:', evaluator.evaluate(predictions_gb))

fig = plt.figure()
ax = fig.add_subplot(111)
show_roc(ax, predictions_gb, labels)
plt.save_fig('gb_roc.png')

VBox()

('Gradient boosted trees, test set perfromance, Area Under ROC:', 0.7949019888338394)