In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import to_date, datediff
from pyspark.sql.functions import concat, lit, avg, split, isnan, when, count, col, sum, mean, stddev, min, max, round
from pyspark.sql import Window
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Bucketizer

## 1.) Data Loading

In [2]:
# Creating a spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("DataProcessing") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/20 22:16:30 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/11/20 22:16:30 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/11/20 22:16:31 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/11/20 22:16:31 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [3]:
#reading in the dataframe from GCS bucket
df = spark.read.format("json").options(header="false", inferschema="true").load('gs://bda-project-6893/Streamed_data.json')

                                                                                

## 2.) Preliminary EDA

In [4]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
df.show(5)

+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|          artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|  Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|NextSong|1538331630000| 

In [6]:
# Number of rows in the dataframe
df.count()

                                                                                

286500

In [7]:
# Count for empty, None, Null, Nan with string literals.
df2 = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])
df2.show()

[Stage 5:>                                                          (0 + 1) / 1]

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 58406|   0|     8346|  8346|            0|    8346| 58392|    0|    8346|     0|   0|        8346|        0|58445|     0|  0|     8346|  8346|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



                                                                                

In [8]:
num_columns = [item[0] for item in df.dtypes if not item[1].startswith('string')]
df.select(*num_columns).describe().show()

[Stage 8:>                                                          (0 + 1) / 1]

+-------+------------------+-----------------+--------------------+-----------------+------------------+--------------------+
|summary|     itemInSession|           length|        registration|        sessionId|            status|                  ts|
+-------+------------------+-----------------+--------------------+-----------------+------------------+--------------------+
|  count|            286500|           228108|              278154|           286500|            286500|              286500|
|   mean|114.41421291448516|249.1171819778458|1.535358834084427...|1041.526554973822|210.05459685863875|1.540956889810483...|
| stddev|129.76726201140994|99.23517921058361| 3.291321616327586E9|726.7762634630741| 31.50507848842214|1.5075439608226302E9|
|    min|                 0|          0.78322|       1521380675000|                1|               200|       1538352117000|
|    max|              1321|       3024.66567|       1543247354000|             2474|               404|       1543799

                                                                                

## 3.) Data Cleaning and Feature Engineering

### a.) Remove rows which have UserId missing (we remove such rows as we cannot model activity of a user without the User Id identifier and thus, they are redundant for our analysis)

In [9]:
df = df.filter(df["userId"] != "")

In [10]:
df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns]).show()

[Stage 11:>                                                         (0 + 1) / 1]

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 50060|   0|        0|     0|            0|       0| 50046|    0|       0|     0|   0|           0|        0|50099|     0|  0|        0|     0|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



                                                                                

In [11]:
df.count()

                                                                                

278154

### b.) Add a Churn column to the dataset (for rows where page column value is "Cancellation Confirmation" we mark the row to be indicative of customer churn i.e.1 else 0)

In [12]:
cancellation_udf = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [13]:
df = df.withColumn("churn", cancellation_udf("page"))

In [14]:
df.select('page','churn').show(10)

[Stage 17:>                                                         (0 + 1) / 1]

+---------------+-----+
|           page|churn|
+---------------+-----+
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|       NextSong|    0|
|Add to Playlist|    0|
|       NextSong|    0|
+---------------+-----+
only showing top 10 rows



Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

### c.) For all user id's which have churned, we set all their past entries to churn as well. (As this user's entire past activity logs will now be modelled to understand the basis of their churn)

In [15]:
# list of users who have cancelled their subscription
cancelled_users = df.select(['userId']).where(df.churn == 1).groupby('userId').count().toPandas()['userId'].values

                                                                                

In [16]:
user_cancellation_udf = udf(lambda x: 1 if x in cancelled_users else 0, IntegerType())

In [17]:
#set the churn value for all entries to 1 for all users who have cancelled their subscription at some point in time
df = df.withColumn("churn", user_cancellation_udf("userId"))

### d.) For each user we find their last date of activity and first date of activity to be able to model days spent on the platform as a feature

In [18]:
# set column last_ts with the last event timestamp for the user and first_ts with the first event timestamp for the user
w = Window.partitionBy('userId')
df = df.withColumn('last_ts', max('ts').over(w))
df = df.withColumn('first_ts', min('ts').over(w))

In [19]:
# convert timestamps to date (string)
def get_date_from_ts(ts):
    return str(datetime.utcfromtimestamp(ts / 1000).strftime('%Y-%m-%d'))

get_date_from_ts_udf = udf(get_date_from_ts, StringType())
df = df.withColumn('last_date', get_date_from_ts_udf(col('last_ts')))
df = df.withColumn('first_date', get_date_from_ts_udf(col('first_ts')))
df = df.withColumn('date', get_date_from_ts_udf(col('ts')))

In [20]:
# set column last_level to level when timestamp is last timestamp to identify the last activity log for each user
df = df.withColumn('last_level',when(df.last_ts == df.ts, df.level))

In [21]:
# calculate days that a user was active on the platform
df = df.withColumn("days_active", 
              datediff(to_date("last_date","yyyy-MM-dd"),
                       to_date("first_date","yyyy-MM-dd")))

In [22]:
df.show(20)

22/11/20 22:17:18 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 21:>                                                         (0 + 1) / 1]

+--------------------+---------+---------+------+-------------+--------+---------+-----+----------+------+----------+-------------+---------+--------------------+------+-------------+--------------------+------+-----+-------------+-------------+----------+----------+----------+----------+-----------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|  location|method|      page| registration|sessionId|                song|status|           ts|           userAgent|userId|churn|      last_ts|     first_ts| last_date|first_date|      date|last_level|days_active|
+--------------------+---------+---------+------+-------------+--------+---------+-----+----------+------+----------+-------------+---------+--------------------+------+-------------+--------------------+------+-----+-------------+-------------+----------+----------+----------+----------+-----------+
|            Sea Wolf|Logged In|   Brycen|     M|            0| Bradley|265.53424| paid|Laurel

                                                                                

### e.) Model a feature to find average number of songs a user listens to each day

In [23]:
w = Window.partitionBy('userId','date')
songs = df.where(df.page == 'NextSong').select('userId', 'date', count('userId').over(w).alias('n_songs')).distinct()

In [24]:
w = Window.partitionBy('userId')
songs = songs.withColumn('avg_songs', avg('n_songs').over(w))
songs = songs.select(col("userId").alias("songs_userId"), 'avg_songs')
songs = songs.withColumn("avg_songs", round(songs["avg_songs"], 2)).distinct()

In [25]:
songs.show(10)

[Stage 26:>                                                         (0 + 1) / 1]

+------------+---------+
|songs_userId|avg_songs|
+------------+---------+
|          10|    84.13|
|         100|    81.27|
|      100001|     66.5|
|      100002|     39.0|
|      100003|     25.5|
|      100004|    49.58|
|      100005|     38.5|
|      100006|     26.0|
|      100007|     47.0|
|      100008|     96.5|
+------------+---------+
only showing top 10 rows



                                                                                

### f.) Model a feature to find average number of events for each user each day

In [26]:
w = Window.partitionBy('userId', 'date')
events = df.select('userId', 'date', count('userId').over(w).alias('events')).distinct()
w = Window.partitionBy('userId')
events = events.withColumn('avg_events', avg('events').over(w))
events = events.select(col("userId").alias("events_userId"), 'avg_events')
events = events.withColumn("avg_events", round(events["avg_events"], 2)).distinct()

In [27]:
events.show()

[Stage 30:>                                                         (0 + 1) / 1]

+-------------+----------+
|events_userId|avg_events|
+-------------+----------+
|           10|     99.38|
|          100|     97.39|
|       100001|      93.5|
|       100002|      43.6|
|       100003|      39.0|
|       100004|     65.53|
|       100005|      54.0|
|       100006|      44.0|
|       100007|     57.78|
|       100008|     117.5|
|       100009|      67.1|
|       100010|     54.43|
|       100011|      23.0|
|       100012|     85.71|
|       100013|      92.8|
|       100014|     51.67|
|       100015|      70.0|
|       100016|      63.8|
|       100017|      75.0|
|       100018|     67.79|
+-------------+----------+
only showing top 20 rows



                                                                                

### g.) Model a feature to find total number of thumbs up and thumbs down for each user

In [28]:
w = Window.partitionBy('userId')
thumbsup = df.where(df.page == 'Thumbs Up').select('userId', count('userId').over(w).alias('thumbs_up')).distinct()
thumbsup = thumbsup.select(col("userId").alias("thumbsup_userId"), 'thumbs_up')

In [29]:
thumbsup.show(10)

[Stage 36:>                                                         (0 + 1) / 1]

+---------------+---------+
|thumbsup_userId|thumbs_up|
+---------------+---------+
|             10|       37|
|            100|      148|
|         100001|        8|
|         100002|        5|
|         100003|        3|
|         100004|       35|
|         100005|        7|
|         100006|        2|
|         100007|       19|
|         100008|       37|
+---------------+---------+
only showing top 10 rows



                                                                                

In [30]:
w = Window.partitionBy('userId')
thumbsdown = df.where(df.page == 'Thumbs Down').select('userId', count('userId').over(w).alias('thumbs_down')).distinct()
thumbsdown = thumbsdown.select(col("userId").alias("thumbsdown_userId"), 'thumbs_down')

In [31]:
thumbsdown.show(10)

[Stage 39:>                                                         (0 + 1) / 1]

+-----------------+-----------+
|thumbsdown_userId|thumbs_down|
+-----------------+-----------+
|               10|          4|
|              100|         27|
|           100001|          2|
|           100004|         11|
|           100005|          3|
|           100006|          2|
|           100007|          6|
|           100008|          6|
|           100009|          8|
|           100010|          5|
+-----------------+-----------+
only showing top 10 rows



                                                                                

### h.) Model a feature to find the state in which the user was when the last activity was logged

In [32]:
def get_state(location):
    location = location.split(',')[-1].strip()
    if (len(location) > 2):
        location = location.split('-')[-1].strip()
    
    return location

In [33]:
get_state_udf = udf(get_state, StringType())
df = df.withColumn('state', get_state_udf(col('location')))

In [34]:
#add column with last location of the user
df = df.withColumn('last_state',when(df.last_ts == df.ts, df.state))

In [35]:
df.tail(2)

                                                                                

[Row(artist='Camila', auth='Logged In', firstName='Madison', gender='F', itemInSession=135, lastName='Atkinson', length=252.29016, level='paid', location='New Haven-Milford, CT', method='PUT', page='NextSong', registration=1531811983000, sessionId=2274, song='Besame', status=200, ts=1543303237000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='99', churn=0, last_ts=1543303277000, first_ts=1538604854000, last_date='2018-11-27', first_date='2018-10-03', date='2018-11-27', last_level=None, days_active=55, state='CT', last_state=None),
 Row(artist=None, auth='Logged In', firstName='Madison', gender='F', itemInSession=136, lastName='Atkinson', length=None, level='paid', location='New Haven-Milford, CT', method='PUT', page='Add to Playlist', registration=1531811983000, sessionId=2274, song=None, status=200, ts=1543303277000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gec

### j.) Model a feature to find the number of friends each user had

In [36]:
w = Window.partitionBy('userId')
addfriend = df.where(df.page == 'Add Friend').select('userId', count('userId').over(w).alias('addfriend')).distinct()
addfriend = addfriend.select(col("userId").alias("addfriend_userId"), 'addfriend')

## 4.) Creating the final dataset from the engineered features

In [37]:
df_ml = df.select('userId', 'gender', 'churn', 'last_level', 'days_active', 'last_state')\
    .dropna().drop_duplicates()
df_ml = df_ml.join(songs, df_ml.userId == songs.songs_userId).distinct()
df_ml = df_ml.join(events, df_ml.userId == events.events_userId).distinct()
df_ml = df_ml.join(thumbsup, df_ml.userId == thumbsup.thumbsup_userId, how='left').distinct()
df_ml = df_ml.fillna(0, subset=['thumbs_up'])
df_ml = df_ml.join(thumbsdown, df_ml.userId == thumbsdown.thumbsdown_userId, how='left').distinct()
df_ml = df_ml.fillna(0, subset=['thumbs_down'])
df_ml = df_ml.join(addfriend, df_ml.userId == addfriend.addfriend_userId, how='left').distinct()
df_ml = df_ml.fillna(0, subset=['addfriend'])
df_ml = df_ml.drop('songs_userId','events_userId', 'thumbsup_userId', 'thumbsdown_userId', 'addfriend_userId')

In [38]:
df_ml.show(10)

                                                                                

+------+------+-----+----------+-----------+----------+---------+----------+---------+-----------+---------+
|userId|gender|churn|last_level|days_active|last_state|avg_songs|avg_events|thumbs_up|thumbs_down|addfriend|
+------+------+-----+----------+-----------+----------+---------+----------+---------+-----------+---------+
|    10|     M|    0|      paid|         42|        MS|    84.13|     99.38|       37|          4|       12|
|   100|     M|    0|      paid|         59|        TX|    81.27|     97.39|      148|         27|       49|
|100001|     F|    1|      free|          1|        FL|     66.5|      93.5|        8|          2|        2|
|100002|     F|    0|      paid|         56|        CA|     39.0|      43.6|        5|          0|        1|
|100003|     F|    1|      free|          2|        FL|     25.5|      39.0|        3|          0|        0|
|100004|     F|    0|      paid|         57|        NY|    49.58|     65.53|       35|         11|       19|
|100005|     M|    

In [40]:
# Save the final dataframe to GCS bucket to be consumed by the ML models
df_ml.write.csv(r'gs://bda-project-6893/Processed_data.csv')

                                                                                