In [1]:
# import libraries
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf, isnan, when, count
from pyspark.sql.types import DateType,ArrayType, DoubleType, FloatType
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from matplotlib import rcParams
import matplotlib.pylab as pylab
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import csv
import seaborn as sns
from sklearn.metrics import accuracy_score, classification_report
pd.set_option('display.max_columns', 100)

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [2]:
# create a Spark session
ss = SparkSession.builder.appName("spark_project").getOrCreate()
sqlContext = SQLContext(ss)
df = ss.read.format('json').options(header='true').load('mini_sparkify_event_data.json')

In [3]:
def basic_info(df):
    '''
    input:
    df: dataframe
    
    output:
    i)   number of rows and columns
    ii)  top 3 rows of the dataframe
    iii) data type
    iv)  null value

    '''
    print('The dataframe has {} rows and {} columns'.format(df.count(), len(df.columns)))
    check_null = df.toPandas()
    display(df.limit(3).toPandas())
    return df.printSchema(), (check_null.isnull().sum().sort_values(ascending = False))/df.count()

In [4]:
basic_info(df)

The dataframe has 286500 rows and 18 columns


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Martha Tilston,Logged In,Colin,M,50,Freeman,277.89016,paid,"Bakersfield, CA",PUT,NextSong,1538173362000,29,Rockpools,200,1538352117000,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30
1,Five Iron Frenzy,Logged In,Micah,M,79,Long,236.09424,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538331630000,8,Canada,200,1538352180000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9
2,Adam Lambert,Logged In,Colin,M,51,Freeman,282.8273,paid,"Bakersfield, CA",PUT,NextSong,1538173362000,29,Time For Miracles,200,1538352394000,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



(None, artist           0.203812
 song             0.203812
 length           0.203812
 userAgent        0.029131
 firstName        0.029131
 gender           0.029131
 lastName         0.029131
 location         0.029131
 registration     0.029131
 method           0.000000
 page             0.000000
 level            0.000000
 sessionId        0.000000
 itemInSession    0.000000
 status           0.000000
 ts               0.000000
 auth             0.000000
 userId           0.000000
 dtype: float64)

In [5]:
# showing the statistics of non-string columns
df.describe().toPandas().loc[:,['summary', 'itemInSession','length', 'registration', 'sessionId', 'ts']]

Unnamed: 0,summary,itemInSession,length,registration,sessionId,ts
0,count,286500.0,228108.0,278154.0,286500.0,286500.0
1,mean,114.41421291448516,249.1171819778458,1535358834084.4272,1041.526554973822,1540956889810.4834
2,stddev,129.76726201140994,99.2351792105836,3291321616.327586,726.7762634630741,1507543960.8226302
3,min,0.0,0.78322,1521380675000.0,1.0,1538352117000.0
4,max,1321.0,3024.66567,1543247354000.0,2474.0,1543799476000.0


In [6]:
print('There are {} userId, {} artist, {} song, {} page, {} itemInSession, {} location, {} registration, {} sessionId, {} ts and {} userAgent.'\
      .format(df.agg(F.countDistinct(df.userId)).collect()[0][0]
              , df.agg(F.countDistinct(df.artist)).collect()[0][0]
              , df.agg(F.countDistinct(df.song)).collect()[0][0]               
              , df.agg(F.countDistinct(df.page)).collect()[0][0]
              , df.agg(F.countDistinct(df.itemInSession)).collect()[0][0]             
              , df.agg(F.countDistinct(df.location)).collect()[0][0]
              , df.agg(F.countDistinct(df.registration)).collect()[0][0]
              , df.agg(F.countDistinct(df.sessionId)).collect()[0][0]
              , df.agg(F.countDistinct(df.ts)).collect()[0][0]
              , df.agg(F.countDistinct(df.userAgent)).collect()[0][0]               
             ))

There are 226 userId, 17655 artist, 58480 song, 22 page, 1322 itemInSession, 114 location, 225 registration, 2354 sessionId, 277447 ts and 56 userAgent.


### Check if userId has only 1 firstName & lastName

In [7]:
df.write.mode("overwrite").saveAsTable('userId_Name')
check_name_userId = ss.sql('''select distinct userId, concat(firstName, LastName) as Name from userId_Name''')

In [8]:
check_name_userId.write.mode("overwrite").saveAsTable('userId_Name_2')

In [9]:
check_name_userId2 = ss.sql('''select userId, count(*) from userId_Name_2 group by userId having count(*) > 1''')
check_name_userId2.show(3)
#there's no result which mean each userId has only 1 Name, thus we can drop firstName & lastName

+------+--------+
|userId|count(1)|
+------+--------+
+------+--------+



In [10]:
# dropping some columns
columns_to_drop = ['firstName', 'lastName', 'ts']
df = df.drop(*columns_to_drop)

In [11]:
#checker
df.limit(2).toPandas()

Unnamed: 0,artist,auth,gender,itemInSession,length,level,location,method,page,registration,sessionId,song,status,userAgent,userId
0,Martha Tilston,Logged In,M,50,277.89016,paid,"Bakersfield, CA",PUT,NextSong,1538173362000,29,Rockpools,200,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30
1,Five Iron Frenzy,Logged In,M,79,236.09424,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538331630000,8,Canada,200,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9


In [12]:
# check_null[check_null['page'] == 'Cancellation Confirmation']

In [13]:
# check_null[check_null.userId.isin(check_null[check_null['page'] == 'Cancellation Confirmation']['userId'].unique())]

In [14]:
# df.describe().toPandas().T

In [15]:
# value counts of some columns
for column in ['auth', 'gender', 'level', 'method', 'status', 'page']:
    df.groupBy(column).count().orderBy('count', ascending=False).show()

+----------+------+
|      auth| count|
+----------+------+
| Logged In|278102|
|Logged Out|  8249|
|     Guest|    97|
| Cancelled|    52|
+----------+------+

+------+------+
|gender| count|
+------+------+
|     F|154578|
|     M|123576|
|  null|  8346|
+------+------+

+-----+------+
|level| count|
+-----+------+
| paid|228162|
| free| 58338|
+-----+------+

+------+------+
|method| count|
+------+------+
|   PUT|261064|
|   GET| 25436|
+------+------+

+------+------+
|status| count|
+------+------+
|   200|259812|
|   307| 26430|
|   404|   258|
+------+------+

+--------------------+------+
|                page| count|
+--------------------+------+
|            NextSong|228108|
|                Home| 14457|
|           Thumbs Up| 12551|
|     Add to Playlist|  6526|
|          Add Friend|  4277|
|         Roll Advert|  3933|
|               Login|  3241|
|              Logout|  3226|
|         Thumbs Down|  2546|
|           Downgrade|  2055|
|                Help|  1726|
|    

### For those gender is null, fill as `unknown` <br> dropping status = 404 since it represents page not found

In [16]:
df = df.fillna( { 'gender':'unknown'})#, 'b':0 } )
df = df.filter(df.status != 404)
print((df.count(), len(df.columns)))

(286242, 15)


In [17]:
df_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [18]:
#Define Churn, observe from above value counts, there should be 52 rows
df = df.withColumn('Churn', F.when(F.col('page')=='Cancellation Confirmation', 1)\
                             .otherwise(0))
df.groupBy('page').sum('Churn').orderBy('sum(Churn)', ascending=False).show(3)
# df.toPandas().Churn.value_counts()

+--------------------+----------+
|                page|sum(Churn)|
+--------------------+----------+
|Cancellation Conf...|        52|
|                Home|         0|
|    Submit Downgrade|         0|
+--------------------+----------+
only showing top 3 rows



In [19]:
#check if there's any song without length
df.filter((df.song.isNull()) & (df.length.isNotNull())).show()

+------+----+------+-------------+------+-----+--------+------+----+------------+---------+----+------+---------+------+-----+
|artist|auth|gender|itemInSession|length|level|location|method|page|registration|sessionId|song|status|userAgent|userId|Churn|
+------+----+------+-------------+------+-----+--------+------+----+------------+---------+----+------+---------+------+-----+
+------+----+------+-------------+------+-----+--------+------+----+------------+---------+----+------+---------+------+-----+



In [20]:
#check what the page is for row with value under column 'song'
df.filter((df.song.isNotNull()) & (df.length.isNotNull())).groupBy('page').count().orderBy('count', ascending=False).show()

+--------+------+
|    page| count|
+--------+------+
|NextSong|228108|
+--------+------+



### Check if userId has only 1 gender & location

In [21]:
df.write.mode("overwrite").saveAsTable('df_genderlocation')
df_gender_location_check = ss.sql('''select distinct userId, gender, location from df_genderlocation ''')

In [22]:
df_gender_location_check.write.mode("overwrite").saveAsTable('df_genderlocation2')
df_gender_location_check2 = ss.sql('''
select userId, gender, location, count(*) from df_genderlocation2 
group by userId, gender, location having count(*) >1
''')
df_gender_location_check2.show()

#each userId has just 1 gender and 1 location

+------+------+--------+--------+
|userId|gender|location|count(1)|
+------+------+--------+--------+
+------+------+--------+--------+



## Aggregation at userId level

In [23]:
df_group = df.withColumn('is_song', F.when(F.col('song').isNotNull(), 1).otherwise(0)) \
.withColumn('paid_level', F.when(F.col('level')=='paid', 1).otherwise(0)) \
.withColumn('free_level', F.when(F.col('level')=='free', 1).otherwise(0)) \
.withColumn('Submit_Downgrade_page', F.when(F.col('page')=='Submit Downgrade', 1).otherwise(0)) \
.withColumn('Downgrade', F.when(F.col('page')=='Downgrade', 1).otherwise(0)) \
.withColumn('Thumbs_Down', F.when(F.col('page')=='Thumbs Down', 1).otherwise(0)) \
.withColumn('Add_to_Playlist', F.when(F.col('page')=='Add to Playlist', 1).otherwise(0)) \
.withColumn('NextSong', F.when(F.col('page')=='NextSong', 1).otherwise(0)) \
.withColumn('Thumbs_Up', F.when(F.col('page')=='Thumbs Up', 1).otherwise(0)) \
.withColumn('Upgrade', F.when(F.col('page')=='Upgrade', 1).otherwise(0)) \
.withColumn('Add_Friend', F.when(F.col('page')=='Add Friend', 1).otherwise(0)) \
.withColumn('Help', F.when(F.col('page')=='Help', 1).otherwise(0)) \
.groupBy(['userId', 'gender', 'location'])\
.agg(
#     F.max('InvoiceDate').alias('Last_Transaction_Date'),                                                                                         
     F.sum('length').alias('sum_length_time'),
     F.sum('is_song').alias('sum_song'), 
     F.countDistinct('artist').alias('Numofdifferentartist'), 
     F.countDistinct('song').alias('Numofdifferentsong'),                                            
     F.sum('paid_level').alias('sum_paid_level'),  
     F.sum('free_level').alias('sum_free_level'),  
     F.sum('Submit_Downgrade_page').alias('sum_Submit_Downgrade_page'),  
     F.sum('Downgrade').alias('sum_Downgrade'),  
     F.sum('Thumbs_Down').alias('sum_Thumbs_Down'),  
     F.sum('Add_to_Playlist').alias('sum_Add_to_Playlist'),  
     F.sum('NextSong').alias('sum_NextSong'),  
     F.sum('Thumbs_Up').alias('sum_Thumbs_Up'),  
     F.sum('Upgrade').alias('sum_Upgrade'),
     F.sum('Add_Friend').alias('sum_Add_Friend'),  
     F.sum('Help').alias('sum_Help'),    
     F.sum('Churn').alias('sum_Churn')
    #  F.mean('RetailPrice').alias('MeanProductSpend'),
  )\
.withColumn('Mean_length_time', F.col('sum_length_time') / F.col('sum_song'))\
.withColumn('Weight_1', F.when(F.col('sum_Churn')==1, 2.17).otherwise(0.65))\
.withColumn('Weight_2', F.when(F.col('sum_Churn')==1, 3).otherwise(1))\
.withColumn('Weight_3', F.when(F.col('sum_Churn')==1, 3.5).otherwise(1))\
.drop('sum_length_time')

In [27]:
# recap: There are 226 userId, 17655 artist, 58480 song, 22 page, 1322 itemInSession
# , 114 location, 225 registration, 2354 sessionId, 277447 ts and 56 userAgent.
# Just make sure we are getting 226 rows (or userId)

print((df_group.count(), len(df_group.columns)))

#It outputs the correct number of userid

(226, 22)


#### Checker begins

In [28]:
df_checker = df_group.toPandas()
df_checker['sum_Churn'].value_counts()

0    174
1     52
Name: sum_Churn, dtype: int64

In [29]:
df_checker.gender.value_counts()

# will drop the single unknown gender record later

M          121
F          104
unknown      1
Name: gender, dtype: int64

#### Checker ends

## Comparison of churn group vs. non-churn group

In [32]:
df_group_churn = df_group.groupBy('sum_churn').agg(
# .withColumn('is_song', F.when(F.col('song').isNotNull(), 1).otherwise(0)) \
# .withColumn('paid_level', F.when(F.col('level')=='paid', 1).otherwise(0)) \
# .withColumn('free_level', F.when(F.col('level')=='free', 1).otherwise(0)) \
# .withColumn('Submit_Downgrade_page', F.when(F.col('page')=='Submit Downgrade', 1).otherwise(0)) \
# .withColumn('Downgrade', F.when(F.col('page')=='Downgrade', 1).otherwise(0)) \
# .withColumn('Thumbs_Down', F.when(F.col('page')=='Thumbs Down', 1).otherwise(0)) \
# .withColumn('Add_to_Playlist', F.when(F.col('page')=='Add to Playlist', 1).otherwise(0)) \
# .withColumn('NextSong', F.when(F.col('page')=='NextSong', 1).otherwise(0)) \
# .withColumn('Thumbs_Up', F.when(F.col('page')=='Thumbs Up', 1).otherwise(0)) \
# .withColumn('Upgrade', F.when(F.col('page')=='Upgrade', 1).otherwise(0)) \
# .withColumn('Add_Friend', F.when(F.col('page')=='Add Friend', 1).otherwise(0)) \
# .withColumn('Help', F.when(F.col('page')=='Help', 1).otherwise(0)) \


#     F.max('InvoiceDate').alias('Last_Transaction_Date'),   
     F.countDistinct('userId').alias('NumofuserId'),    
#      F.sum('length').alias('sum_length_time'),
     F.sum('sum_song').alias('sum_song2'), 
     F.countDistinct('Numofdifferentartist').alias('Numofdifferentartist2'), 
     F.countDistinct('Numofdifferentsong').alias('Numofdifferentsong2'),                                            
     F.sum('sum_paid_level').alias('sum_paid_level2'),  
     F.sum('sum_free_level').alias('sum_free_level2'),  
     F.sum('sum_Submit_Downgrade_page').alias('sum_Submit_Downgrade_page2'),  
     F.sum('sum_Downgrade').alias('sum_Downgrade2'),  
     F.sum('sum_Thumbs_Down').alias('sum_Thumbs_Down2'),  
     F.sum('sum_Add_to_Playlist').alias('sum_Add_to_Playlist2'),  
     F.sum('sum_NextSong').alias('sum_NextSong2'),  
     F.sum('sum_Thumbs_Up').alias('sum_Thumbs_Up2'),  
     F.sum('sum_Upgrade').alias('sum_Upgrade2'),
     F.sum('sum_Add_Friend').alias('sum_Add_Friend2'),  
     F.sum('sum_Help').alias('sum_Help2'),    
     F.sum('sum_Churn').alias('sum_Churn2')
    #  F.mean('RetailPrice').alias('MeanProductSpend'),
  )\
.withColumn('Mean_song', F.col('sum_song2') / F.col('NumofuserId'))\
.withColumn('Mean_differentartist', F.col('Numofdifferentartist2') / F.col('NumofuserId'))\
.withColumn('Mean_differentsong', F.col('Numofdifferentsong2') / F.col('NumofuserId'))\
.withColumn('Mean_paid_level', F.col('sum_paid_level2') / F.col('NumofuserId'))\
.withColumn('Mean_free_level', F.col('sum_free_level2') / F.col('NumofuserId'))\
.withColumn('Mean_Submit_Downgrade_page', F.col('sum_Submit_Downgrade_page2') / F.col('NumofuserId'))\
.withColumn('Mean_Downgrade', F.col('sum_Downgrade2') / F.col('NumofuserId'))\
.withColumn('Mean_Thumbs_Down', F.col('sum_Thumbs_Down2') / F.col('NumofuserId'))\
.withColumn('Mean_Add_to_Playlist', F.col('sum_Add_to_Playlist2') / F.col('NumofuserId'))\
.withColumn('Mean_NextSong', F.col('sum_NextSong2') / F.col('NumofuserId'))\
.withColumn('Mean_Thumbs_Up', F.col('sum_Thumbs_Up2') / F.col('NumofuserId'))\
.withColumn('Mean_Upgrade', F.col('sum_Upgrade2') / F.col('NumofuserId'))\
.withColumn('Mean_Add_Friend', F.col('sum_Add_Friend2') / F.col('NumofuserId'))\
.withColumn('Mean_Help', F.col('sum_Help2') / F.col('NumofuserId'))\
.drop('sum_paid_level2')\
.drop('sum_free_level2')\
.drop('sum_song2')\
.drop('Numofdifferentartist2')\
.drop('Numofdifferentsong2')\
.drop('sum_Submit_Downgrade_page2')\
.drop('sum_Downgrade2')\
.drop('sum_Thumbs_Down2')\
.drop('sum_Add_to_Playlist2')\
.drop('sum_NextSong2')\
.drop('sum_Thumbs_Up2')\
.drop('sum_Thumbs_Up2')\
.drop('sum_Upgrade2')\
.drop('sum_Help2')\
.drop('sum_Churn2')

In [33]:
churn_nonchurn_comparions = df_group_churn.toPandas()
churn_nonchurn_comparions

Unnamed: 0,sum_churn,NumofuserId,sum_Add_Friend2,Mean_song,Mean_differentartist,Mean_differentsong,Mean_paid_level,Mean_free_level,Mean_Submit_Downgrade_page,Mean_Downgrade,Mean_Thumbs_Down,Mean_Add_to_Playlist,Mean_NextSong,Mean_Thumbs_Up,Mean_Upgrade,Mean_Add_Friend,Mean_Help
0,0,174,3641,1101.804598,0.971264,0.95977,1123.591954,263.821839,0.310345,9.873563,11.781609,31.54023,1101.804598,61.448276,2.224138,20.925287,8.545977
1,1,52,636,699.884615,0.980769,0.980769,624.153846,238.0,0.173077,6.480769,9.538462,19.961538,699.884615,35.75,2.153846,12.230769,4.596154


In [34]:
churn_nonchurn_comparions.columns

Index(['sum_churn', 'NumofuserId', 'sum_Add_Friend2', 'Mean_song',
       'Mean_differentartist', 'Mean_differentsong', 'Mean_paid_level',
       'Mean_free_level', 'Mean_Submit_Downgrade_page', 'Mean_Downgrade',
       'Mean_Thumbs_Down', 'Mean_Add_to_Playlist', 'Mean_NextSong',
       'Mean_Thumbs_Up', 'Mean_Upgrade', 'Mean_Add_Friend', 'Mean_Help'],
      dtype='object')

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [36]:
ID_cols = ['userId']
cat_cols = ['gender']
non_cat_cols = [column for column in df_group.columns if column not in ('userId','gender','sum_Churn', 'location',
                                                                       'Mean_length_time', 'Weight_1', 'Weight_2', 'Weight_3')]

In [37]:
column_string = ', '.join(ID_cols + cat_cols + non_cat_cols)
column_string

'userId, gender, sum_song, Numofdifferentartist, Numofdifferentsong, sum_paid_level, sum_free_level, sum_Submit_Downgrade_page, sum_Downgrade, sum_Thumbs_Down, sum_Add_to_Playlist, sum_NextSong, sum_Thumbs_Up, sum_Upgrade, sum_Add_Friend, sum_Help'

In [38]:
indexers = [
    StringIndexer(
        inputCol=col, 
        outputCol=col+'_Index') 
    for col in cat_cols]

In [39]:
encoders = [
    OneHotEncoder(
        dropLast=False, 
        inputCol=indexer.getOutputCol(),
        outputCol=indexer.getOutputCol()+'_Encoded') 
    for indexer in indexers]  

In [41]:
scaler_assembler = VectorAssembler(
    inputCols=non_cat_cols, 
    outputCol='numFeatures')

In [42]:
scaler = StandardScaler(
            inputCol='numFeatures',
            outputCol='numFeatures_Scaled',
            withStd=True,
            withMean=True)

In [43]:
assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in encoders] + ['numFeatures_Scaled'], 
    outputCol='features')

In [44]:
df_group.write.mode("overwrite").saveAsTable('user_add_features')

In [45]:
# select the required features 
df_features_query = ('''
    SELECT {0}, sum_Churn, Weight_1, Weight_2, Weight_3
    FROM user_add_features''' ).format(column_string)

df_features = ss.sql(df_features_query)

In [46]:
df_features.count(), len(df_features.columns)

(226, 20)

In [47]:
# Dropping that 1 row with unknown gender
df_features = df_features.filter( df_features.gender !='unknown')

In [48]:
df_features.count(), len(df_features.columns)

(225, 20)

# Modeling: Random Forest
###### Starting with numTrees = 15

In [49]:
# train test split: since it's an imbalanced dataset, we need to split the data by stratified sampling,
# to make sure that each label has the same proportion in both train and test data
train_data = df_features.sampleBy("sum_Churn", fractions={0: 0.7, 1: 0.7}, seed=42)
# subtract function: Return a new DataFrame containing rows in this DataFrame but not in another DataFrame.
test_data = df_features.subtract(train_data)

display(train_data.groupBy("sum_Churn").count().show())
test_data.groupBy("sum_Churn").count().show()

# train_data, test_data = df_features.randomSplit([0.7,0.3], seed = 42)

+---------+-----+
|sum_Churn|count|
+---------+-----+
|        0|  119|
|        1|   41|
+---------+-----+



None

+---------+-----+
|sum_Churn|count|
+---------+-----+
|        0|   54|
|        1|   11|
+---------+-----+



In [50]:
rfc = RandomForestClassifier(labelCol='sum_Churn',featuresCol='features', seed = 42, numTrees = 15)

tmp_rfc = [[i,j] for i,j in zip(indexers, encoders)]
tmp_rfc = [i for row in tmp_rfc for i in row]
tmp_rfc += [scaler_assembler]
tmp_rfc += [scaler]
tmp_rfc += [assembler]
tmp_rfc += [rfc]
pl_rfc = Pipeline(stages=tmp_rfc)

# fit
rfc_model = pl_rfc.fit(train_data)
#predict
rfc_predictions = rfc_model.transform(test_data)

In [51]:
eval_acc_rfc = rfc_predictions.select(['prediction','sum_Churn']).withColumn('sum_Churn', F.col('sum_Churn').cast(FloatType())).orderBy('prediction')
eval_acc_rfc = eval_acc_rfc.select(['prediction','sum_Churn'])

In [52]:
metrics_rfc = MulticlassMetrics(eval_acc_rfc.rdd.map(tuple))

In [53]:
print(metrics_rfc.confusionMatrix().toArray())
print("Accuracy = %s" % metrics_rfc.accuracy)

[[ 50.   4.]
 [ 10.   1.]]
Accuracy = 0.7846153846153846


In [55]:
target_names = ['not_churn', 'churn']
print(classification_report(eval_acc_rfc.toPandas().sum_Churn, eval_acc_rfc.toPandas().prediction, target_names=target_names))

             precision    recall  f1-score   support

  not_churn       0.83      0.93      0.88        54
      churn       0.20      0.09      0.13        11

avg / total       0.73      0.78      0.75        65



#### Let's try tuning the parameter of Random Forest

In [56]:
for rate in (0.75, 1):
    for tree in range(20,40,6):
        for depth in range(3,5,1):
            print('subsamplingRate = {} , numTrees = {} , maxDepth = {}'.format(rate,tree,depth))
            rfc_tune = RandomForestClassifier(labelCol='sum_Churn',featuresCol='features', seed = 42, subsamplingRate = rate, numTrees = tree, maxDepth = depth )

            tmp_rfc_tune = [[i,j] for i,j in zip(indexers, encoders)]
            tmp_rfc_tune = [i for row in tmp_rfc_tune for i in row]
            tmp_rfc_tune += [scaler_assembler]
            tmp_rfc_tune += [scaler]
            tmp_rfc_tune += [assembler]
            tmp_rfc_tune += [rfc_tune]

            pl_rfc_tune = Pipeline(stages=tmp_rfc_tune)
            rfc_model_tune = pl_rfc_tune.fit(train_data)
            rfc_predictions_tune = rfc_model_tune.transform(test_data)

            eval_acc_rfc_tune = rfc_predictions_tune.select(['prediction','sum_Churn']).withColumn('sum_Churn', F.col('sum_Churn').cast(FloatType())).orderBy('prediction')
            eval_acc_rfc_tune = eval_acc_rfc_tune.select(['prediction','sum_Churn'])
            metrics_rfc_tune = MulticlassMetrics(eval_acc_rfc_tune.rdd.map(tuple))
            print(metrics_rfc_tune.confusionMatrix().toArray())
            print("Accuracy = %s" % metrics_rfc_tune.accuracy)
            print('==========================================')

subsamplingRate = 0.75 , numTrees = 20 , maxDepth = 3
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 20 , maxDepth = 4
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 26 , maxDepth = 3
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 26 , maxDepth = 4
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 32 , maxDepth = 3
[[ 54.   0.]
 [ 11.   0.]]
Accuracy = 0.8307692307692308
subsamplingRate = 0.75 , numTrees = 32 , maxDepth = 4
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 38 , maxDepth = 3
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 0.75 , numTrees = 38 , maxDepth = 4
[[ 53.   1.]
 [ 11.   0.]]
Accuracy = 0.8153846153846154
subsamplingRate = 1 , numTrees = 20 , maxDepth = 3
[[ 54.   0.]
 [ 11.   0.]]
Accuracy = 0.8307692307692308
subs

### From the parameters we tried, subsamplingRate = 1, numTrees = 26, maxDepth 4 output the best model

In [57]:
rfc_tune = RandomForestClassifier(labelCol='sum_Churn',featuresCol='features', seed = 42, subsamplingRate = 1, numTrees = 26, maxDepth = 4 )

tmp_rfc_tune = [[i,j] for i,j in zip(indexers, encoders)]
tmp_rfc_tune = [i for row in tmp_rfc_tune for i in row]
tmp_rfc_tune += [scaler_assembler]
tmp_rfc_tune += [scaler]
tmp_rfc_tune += [assembler]
tmp_rfc_tune += [rfc_tune]

pl_rfc_tune = Pipeline(stages=tmp_rfc_tune)
rfc_model_tune = pl_rfc_tune.fit(train_data)
rfc_predictions_tune = rfc_model_tune.transform(test_data)

eval_acc_rfc_tune = rfc_predictions_tune.select(['prediction','sum_Churn']).withColumn('sum_Churn', F.col('sum_Churn').cast(FloatType())).orderBy('prediction')
eval_acc_rfc_tune = eval_acc_rfc_tune.select(['prediction','sum_Churn'])
metrics_rfc_tune = MulticlassMetrics(eval_acc_rfc_tune.rdd.map(tuple))
print(metrics_rfc_tune.confusionMatrix().toArray())
print("Accuracy = %s" % metrics_rfc_tune.accuracy)
print(classification_report(eval_acc_rfc_tune.toPandas().sum_Churn, eval_acc_rfc_tune.toPandas().prediction, target_names=target_names))

[[ 54.   0.]
 [ 10.   1.]]
Accuracy = 0.8461538461538461
             precision    recall  f1-score   support

  not_churn       0.84      1.00      0.92        54
      churn       1.00      0.09      0.17        11

avg / total       0.87      0.85      0.79        65



# Let's try another supervised learning model - Logistic Regression

In [58]:
log_reg = LogisticRegression(featuresCol='features',labelCol='sum_Churn')

tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for row in tmp for i in row]
tmp += [scaler_assembler]
tmp += [scaler]
tmp += [assembler]
tmp += [log_reg]

pl = Pipeline(stages=tmp)

fit_model = pl.fit(train_data)

results = fit_model.transform(test_data)

In [59]:
eval_acc = results.select(['prediction','sum_Churn']).withColumn('sum_Churn', F.col('sum_Churn').cast(FloatType())).orderBy('prediction')
eval_acc = eval_acc.select(['prediction','sum_Churn'])

metrics = MulticlassMetrics(eval_acc.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
print("Accuracy = %s" % metrics.accuracy)

[[ 51.   3.]
 [  9.   2.]]
Accuracy = 0.8153846153846154


In [60]:
print(classification_report(eval_acc.toPandas().sum_Churn, eval_acc.toPandas().prediction, target_names=target_names))

             precision    recall  f1-score   support

  not_churn       0.85      0.94      0.89        54
      churn       0.40      0.18      0.25        11

avg / total       0.77      0.82      0.79        65



### Since it's an imbalanced dataset with churn = 52 & non-churn = 173, let's place more weight on churn and see the change of accuracy and recall.

In [61]:
for column in ['Weight_1','Weight_2','Weight_3']:
    log_reg = LogisticRegression(featuresCol='features',labelCol='sum_Churn', weightCol=column)

    tmp = [[i,j] for i,j in zip(indexers, encoders)]
    tmp = [i for row in tmp for i in row]
    tmp += [scaler_assembler]
    tmp += [scaler]
    tmp += [assembler]
    tmp+=[log_reg]

    pl = Pipeline(stages=tmp)

    fit_model = pl.fit(train_data)

    results = fit_model.transform(test_data)
    eval_acc = results.select(['prediction','sum_Churn']).withColumn('sum_Churn', F.col('sum_Churn').cast(FloatType())).orderBy('prediction')
    eval_acc = eval_acc.select(['prediction','sum_Churn'])

    metrics = MulticlassMetrics(eval_acc.rdd.map(tuple))
    print('weightCol = {} \n'.format(column))
    print(metrics.confusionMatrix().toArray())
    print("\nAccuracy = %s" % metrics.accuracy)
    print(classification_report(eval_acc.toPandas().sum_Churn, eval_acc.toPandas().prediction, target_names=target_names))
    print('=============================================================================================')

weightCol = Weight_1 

[[ 28.  26.]
 [  2.   9.]]

Accuracy = 0.5692307692307692
             precision    recall  f1-score   support

  not_churn       0.93      0.52      0.67        54
      churn       0.26      0.82      0.39        11

avg / total       0.82      0.57      0.62        65

weightCol = Weight_2 

[[ 33.  21.]
 [  2.   9.]]

Accuracy = 0.6461538461538462
             precision    recall  f1-score   support

  not_churn       0.94      0.61      0.74        54
      churn       0.30      0.82      0.44        11

avg / total       0.83      0.65      0.69        65

weightCol = Weight_3 

[[ 28.  26.]
 [  2.   9.]]

Accuracy = 0.5692307692307692
             precision    recall  f1-score   support

  not_churn       0.93      0.52      0.67        54
      churn       0.26      0.82      0.39        11

avg / total       0.82      0.57      0.62        65

