In this notebook, we will take a look at the event data, convert them into meaningful formats, build features from the data and then build a pipeline for the data cleaning and feature engineering process.

In [2]:
import re
import datetime
from datetime import date
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.style as style
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer

In [3]:
df = spark.read.json("s3://tianyi-wang-data-science-projects/churn-prediction-2020/mini_sparkify_event_data (1).json")

#1 Some basic exploration on the data

In this section we will use a sample of the event data to explore the fields.

##1.1 User ID

In [6]:
print("Numbers of rows in the data: {:,}".format(df.count()))
print("Numbers of rows with N/A user Ids: {:,}".format(df.filter("userId is Null").count()))
print("Numbers of unique customers: {:,}".format(df.select('userId').distinct().count()))

##1.2 userAgent

User agent shows what web browser or app platform the customer used. It's kind of in a messy format. We will only keep the web browser names and discard the versions and other detailed information in the brackets.

In [8]:
display(df.select('userAgent').limit(10))

userAgent
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"""
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"""
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"""
"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"""
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0
Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0


In [9]:
def clean_userAgent(x):
  '''
  Extract agents from the userAgent column;
  Example:
  "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"
  --> ['Mozilla','AppleWebKit','Chrome','Safari']
  '''
  try:
    x = re.sub("[\(\[].*?[\)\]]", "", x)
    x = x.replace("\"", "").split()
    x = [i.split('/')[0] for i in x if i != 'like']
    return x
  except:
    return None
  
clean_userAgent = udf(clean_userAgent, ArrayType(StringType()))

In [10]:
def one_hot_encode(df, col):
  '''
  one-hot encode a categorical column;
  the function will add the one hot encoding columns to the original dataframe and will drop the original categorical column;
  the dataframe can have duplicate rows;
  Input:
  df: the dataframe
  col: the categorical column
  '''
  col_unique_values = [i[col] for i in df.select(col).distinct().collect()]
  other_cols = [i for i in df.columns if i != col]
  for i in col_unique_values:
    df = df.withColumn(col + '_' + i, F.when(F.col(col)==i, 1).otherwise(0))
  df = df.drop(col).groupby(other_cols).sum()
  for c in df.columns:
    df = df.withColumnRenamed(c, c.replace("sum(", "").replace(")", ""))
  return df

In [11]:
agent_one_hot = one_hot_encode(df.select('userId', F.explode(clean_userAgent('userAgent'))).distinct().withColumnRenamed('col','agent'), 
                               'agent')

In [12]:
display(agent_one_hot.select(*[F.round(F.mean(c), 2).alias(c) for c in agent_one_hot.columns if c != "userId"]))

agent_Firefox,agent_Mozilla,agent_Safari,agent_Gecko,agent_Version,agent_AppleWebKit,agent_Mobile,agent_Chrome
0.22,1.0,0.72,0.25,0.2,0.72,0.07,0.52


It seems that in our sample all the customers have used Mozilla.

##1.3 Timestamps

In our data, all timestamps are milliseconds unix timestamps. We will utilize their orignial formats to calculate time intervals but will also convert them to 'yyyy-mm-dd' timestamps to calculate dates related features.

In [15]:
def clean_ts(timestamp):
  '''
  Convert unix timestamp to "YYYY-mm-dd" format
  '''
  try:
    return datetime.datetime.fromtimestamp(int(timestamp)/1000).strftime("%Y-%m-%d")
  except:
    return None

clean_ts = udf(clean_ts, StringType())

def clean_ts_hour(timestamp):
  '''
  Get the bucket hour:
  1am ~ 6am
  6am ~ 12pm
  12pm ~ 6pm
  6pm ~ 1am
  '''
  try:
    hour = int(datetime.datetime.fromtimestamp(int(timestamp)/1000).strftime("%H"))
    if hour >= 1 and hour <= 6:
      return "1am ~ 6am"
    elif hour > 6 and hour <= 12:
      return "6am ~ 12pm"
    elif hour > 12 and hour <= 18:
      return "12pm ~ 6pm"
    else:
      return "6pm ~ 1am"
  except:
    return None
  
clean_ts_hour = udf(clean_ts_hour, StringType())

def days_till_today(timestamp):
  '''
  Calculate how many days have passed since the timestamp
  '''
  from datetime import datetime
  try:
      now = pd.to_datetime(datetime.strftime(datetime.now(), '%Y-%m-%d'), format="%Y/%m/%d")
      that_day = pd.to_datetime(timestamp, format="%Y/%m/%d")
      return (now - that_day).days
  except:
      return None

days_till_today = udf(days_till_today, IntegerType())

In [16]:
display(df.select(clean_ts('registration').alias("registration")).select('registration',days_till_today('registration').alias('customer_age')).limit(10))

registration,customer_age
2018-09-28,596
2018-09-30,594
2018-09-28,596
2018-09-30,594
2018-09-28,596
2018-09-30,594
2018-09-30,594
2018-09-28,596
2018-09-28,596
2018-09-28,596


##1.4 Song popularities

In [18]:
display(df.filter("song is not null").select(F.concat(F.col('artist'), F.lit(" - ") , F.col('song')).alias("singer-song")).groupby('singer-song')\
        .count().orderBy('count', ascending=False).limit(10))

singer-song,count
Dwight Yoakam - You're The One,1122
BjÃÂ¶rk - Undo,1026
Kings Of Leon - Revelry,854
Harmonia - Sehr kosmisch,728
Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner - Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile),641
Florence + The Machine - Dog Days Are Over (Radio Edit),574
OneRepublic - Secrets,463
Kings Of Leon - Use Somebody,459
Five Iron Frenzy - Canada,434
Tub Ring - Invalid,424


##1.5 Pages

Pages represent the actions the customers can take on the website. Main actions are `Thumbs Down`, `Downgrade`, `Add to Playlist`, `Add Friend`, `NextSong`, `Thumbs Up`, `Upgrade`.

In [20]:
display(df.groupby('page').count())

page,count
Cancel,52
Submit Downgrade,63
Thumbs Down,2546
Home,14457
Downgrade,2055
Roll Advert,3933
Logout,3226
Save Settings,310
Cancellation Confirmation,52
About,924


##1.6 Auth

In [22]:
display(df.groupby('auth').count())

auth,count
Logged Out,8249
Cancelled,52
Guest,97
Logged In,278102


#2. Build the first-level cleaned data

Now we want to build a first-level cleaned data -- we won't do any group by calculations yet but will choose the useful columns and clean some fields so it's easier for us to build the features later.

In [24]:
user_activities = df\
                  .filter("auth = 'Logged In'")\
                  .select('userId', 
                          'registration',
                          clean_ts('registration').alias("registration_ts"),
                          'level',
                          F.col('ts').alias('time'),
                          clean_ts('ts').alias("timestamp"),
                          clean_ts_hour('ts').alias("time_in_the_day"),
                          'sessionId',
                          'itemInSession',
                          'page',
                          'artist',
                          F.concat(F.col('artist'), F.lit(" - ") , F.col('song')).alias("singer-song"),
                          'length'
                          )\
                  .withColumn('days_since_registration', 
                              F.datediff(F.to_date('timestamp',"yyyy-MM-dd"), F.to_date('registration_ts',"yyyy-MM-dd")))

#use the maximum time as the current time
max_time = user_activities.groupby().max('time').select(clean_ts('max(time)').alias('max_time')).collect()[0]['max_time']

user_activities = user_activities.withColumn('days_before_today', 
                                             F.datediff(F.to_date(F.lit(max_time),"yyyy-MM-dd"), F.to_date('timestamp',"yyyy-MM-dd")))

In [25]:
display(user_activities.limit(10))

userId,registration,registration_ts,level,time,timestamp,time in the day,sessionId,itemInSession,page,singer-song,days_since_registration,days_before_today
30,1538173362000,2018-09-28,paid,1538352117000,2018-10-01,6pm ~ 1am,29,50,NextSong,Martha Tilston - Rockpools,3,63
9,1538331630000,2018-09-30,free,1538352180000,2018-10-01,6pm ~ 1am,8,79,NextSong,Five Iron Frenzy - Canada,1,63
30,1538173362000,2018-09-28,paid,1538352394000,2018-10-01,6pm ~ 1am,29,51,NextSong,Adam Lambert - Time For Miracles,3,63
9,1538331630000,2018-09-30,free,1538352416000,2018-10-01,6pm ~ 1am,8,80,NextSong,Enigma - Knocking On Forbidden Doors,1,63
30,1538173362000,2018-09-28,paid,1538352676000,2018-10-01,6pm ~ 1am,29,52,NextSong,Daft Punk - Harder Better Faster Stronger,3,63
9,1538331630000,2018-09-30,free,1538352678000,2018-10-01,6pm ~ 1am,8,81,NextSong,The All-American Rejects - Don't Leave Me,1,63
9,1538331630000,2018-09-30,free,1538352886000,2018-10-01,6pm ~ 1am,8,82,NextSong,The Velvet Underground / Nico - Run Run Run,1,63
30,1538173362000,2018-09-28,paid,1538352899000,2018-10-01,6pm ~ 1am,29,53,NextSong,Starflyer 59 - Passengers (Old Album Version),3,63
30,1538173362000,2018-09-28,paid,1538352905000,2018-10-01,6pm ~ 1am,29,54,Add to Playlist,,3,63
30,1538173362000,2018-09-28,paid,1538353084000,2018-10-01,6pm ~ 1am,29,55,NextSong,Frumpies - Fuck Kitty,3,63


#3. Build the features

We have 2 general types of features: `Static features` and `Dynamic features`. Static features are the features that won't change as we choose different time frames. Examples are gender, states, agents.... yes they can also change but that change is not very meaningful... Dynamic features are usually behavior related features such as the numbers of unique songs the customer has listened to and the numbers of active days.

Since there are many features that can be built from the data. We define these several general aspects:

* Use time: features that are related to the time the customer spent on the website/app. 
* Product actions: features that are specific to the product (music streaming app) -- e.g. numbers of unique artist the customer listened to; numbers of songs the customer added to playlist;...
* Membership: features about which tier (free or paid) the customer is in
* demographic / device features: we will treat these features as static features

##3.1 Use time

* total sessions
* total items
* active days
* active days/total customer life (we will calculate this metrics in the end when we have the customer_age feature)
* total active time (milliseconds) 
<br>
<br>
* average sessions per day
* average time per day (milliseconds) 
* distributions of time in the day (distribution of time spent in these 4 time blocks: 1am ~ 6am, 6am ~ 12pm, 12pm ~ 6pm, 6pm ~ 1am)
<br>
<br>
* average items per session
* average active time per session

In [28]:
total_sessions = user_activities\
                 .select('userId','sessionId')\
                 .distinct()\
                 .groupby('userId')\
                 .count()\
                 .withColumnRenamed('count','sessions')

total_items = user_activities\
              .groupby('userId')\
              .count()\
              .withColumnRenamed('count','items')

active_days = user_activities\
              .select('userId','timestamp')\
              .distinct()\
              .groupby('userId')\
              .count()\
              .withColumnRenamed('count','active_days')

total_active_time = user_activities\
                    .groupby(['userId','sessionId'])\
                    .agg(F.max('time').alias('max'), F.min('time').alias('min'))\
                    .selectExpr('userId','max - min as active_time')\
                    .groupby('userId')\
                    .agg(F.sum('active_time').alias('active_time'))

sessions_per_day = user_activities\
                   .select('userId','timestamp','sessionId')\
                   .distinct()\
                   .groupby('userId','timestamp')\
                   .count()\
                   .groupby('userId').agg(F.avg('count').alias('sessions_per_day'))

time_per_day = user_activities\
              .groupby(['userId','timestamp','sessionId'])\
              .agg(F.max('time').alias('max'), F.min('time').alias('min'))\
              .selectExpr('userId','timestamp','max - min as active_time')\
              .groupby(['userId','timestamp'])\
              .agg(F.sum('active_time'))\
              .groupby('userId')\
              .agg(F.mean('sum(active_time)').alias('time_per_day'))

avg_items_per_session = user_activities\
                       .groupby(['userId','sessionId'])\
                       .count()\
                       .groupby('userId')\
                       .agg(F.avg('count').alias('avg_items_per_session'))

average_time_per_session = user_activities\
                           .groupby(['userId','sessionId'])\
                           .agg(F.max('time').alias('max'), F.min('time').alias('min'))\
                           .selectExpr('userId','max - min as active_time')\
                           .groupby('userId')\
                           .agg(F.avg('active_time').alias('time_per_session'))

time_distribution = user_activities\
                    .groupby('userId','sessionId','time_in_the_day')\
                    .agg(F.max('time').alias('max'), F.min('time').alias('min'))\
                    .selectExpr('userId','time_in_the_day','max - min as active_time')\
                    .groupby(['userId','time_in_the_day'])\
                    .agg(F.sum('active_time'))\
                    .withColumn('1am ~ 6am', F.when(F.col('time_in_the_day') == '1am ~ 6am', F.col('`sum(active_time)`')).otherwise(0))\
                    .withColumn('6am ~ 12pm', F.when(F.col('time_in_the_day') == '6am ~ 12pm', F.col('`sum(active_time)`')).otherwise(0))\
                    .withColumn('12pm ~ 6pm', F.when(F.col('time_in_the_day') == '12pm ~ 6pm', F.col('`sum(active_time)`')).otherwise(0))\
                    .withColumn('6pm ~ 1am', F.when(F.col('time_in_the_day') == '6pm ~ 1am', F.col('`sum(active_time)`')).otherwise(0))\
                    .withColumn('x', F.col('1am ~ 6am')+F.col('6am ~ 12pm')+F.col('12pm ~ 6pm')+F.col('6pm ~ 1am'))\
                    .groupby('userId')\
                    .sum()\
                    .selectExpr('userId', 
                                '`sum(1am ~ 6am)`/ `sum(x)` as perc_1_6',
                                '`sum(6am ~ 12pm)` / `sum(x)` as perc_6_12',
                                '`sum(12pm ~ 6pm)` / `sum(x)` as perc_12_18',
                                '`sum(6pm ~ 1am)` / `sum(x)` as perc_18_1')

In [29]:
display(time_distribution.limit(10))

userId,perc_1_6,perc_6_12,perc_12_18,perc_18_1
200002,0.0134594742491528,0.216656494012165,0.3774335445036095,0.3924504872350725
100010,0.2348490769543401,0.2607662607662607,0.5043846622793992,0.0
125,1.0,0.0,0.0,0.0
51,0.2275806777560965,0.2495519282820218,0.2889954911230449,0.2338719028388366
124,0.1402659737992998,0.2165851820264387,0.3016518407595095,0.3414970034147518
7,0.516182776609401,0.1569934176385437,0.1991915439871084,0.1276322617649468
54,0.2603587610232361,0.106301577309481,0.3027164095919779,0.3306232520753049
15,0.1306360349145907,0.2518826413211611,0.3946646954484549,0.2228166283157931
155,0.3262183235867446,0.2956191648712424,0.0825228275366779,0.2956396840053349
100014,0.0992790216597712,0.0036961547821854,0.7639784619128742,0.1330463616451691


In [30]:
display(total_sessions\
.join(total_items, on="userId")\
.join(active_days, on="userId")\
.join(total_active_time, on="userId")\
.join(sessions_per_day, on="userId")\
.join(time_per_day, on="userId")\
.join(avg_items_per_session, on="userId")\
.join(average_time_per_session, on="userId")\
.join(time_distribution, on="userId")\
.limit(10))

userId,sessions,items,active_days,sum(active_time),sessions_per_day,time_per_day,avg_items_per_session,avg(active_time),perc_1_6,perc_6_12,perc_12_18,perc_18_1
100010,7,381,7,64883000,1.0,9269000.0,54.42857142857143,9269000.0,0.1,0.4,0.5,0.0
200002,6,474,7,95904000,1.0,13668571.42857143,79.0,15984000.0,0.0909090909090909,0.1818181818181818,0.3636363636363636,0.3636363636363636
125,1,10,1,1774000,1.0,1774000.0,10.0,1774000.0,1.0,0.0,0.0,0.0
124,29,4825,33,1007450000,1.2424242424242424,30411212.12121212,166.3793103448276,34739655.1724138,0.1772151898734177,0.2025316455696202,0.2531645569620253,0.3670886075949367
51,10,2463,13,523272000,1.2307692307692308,40147384.615384616,246.3,52327200.0,0.2162162162162162,0.2162162162162162,0.2432432432432432,0.3243243243243243
7,7,201,8,36809000,1.0,4577500.0,28.714285714285715,5258428.571428572,0.2,0.3,0.2,0.3
15,15,2278,19,475455000,1.105263157894737,24945736.842105266,151.86666666666667,31697000.0,0.1627906976744186,0.1860465116279069,0.2790697674418604,0.3720930232558139
54,37,3436,31,715452000,1.5806451612903225,22948483.870967746,92.86486486486488,19336540.54054054,0.25,0.1710526315789473,0.25,0.3289473684210526
155,6,1002,8,197486000,1.125,24584500.0,167.0,32914333.33333333,0.1764705882352941,0.2352941176470588,0.1176470588235294,0.4705882352941176
100014,6,309,6,66526000,1.0,11087666.666666666,51.5,11087666.666666666,0.2222222222222222,0.1111111111111111,0.4444444444444444,0.2222222222222222


##3.2 Product actions

* numbers of unique songs
* numbers of unique singers
* largest song time percentage (calculate the time distribution among the songs that the customer had listened to and pick the largest percentage)
* largest singer percentage 
<br>
<br>
* numbers thumbsup
* numbers thumbsdown
* numbers add playlist
* numbers add friend
* numbers error
* perc thumbsup
* perc thumbsdown
* perc add playlist
* perc add friend
<br>
<br>
* perc popular songs (percentage of time spent on popular songs; popular songs: ranked by the numbers of unique listeners and pick the ones ranked in the top 50%)

In [32]:
unique_songs = user_activities\
               .select('userId','singer-song')\
               .distinct()\
               .groupby('userId')\
               .agg(F.count('singer-song').alias('unique_songs'))

unique_singers = user_activities\
               .select('userId','artist')\
               .distinct()\
               .groupby('userId')\
               .agg(F.count('artist').alias('unique_artists'))

song_perc =  user_activities\
            .groupby(['userId','singer-song'])\
            .agg(F.sum('length').alias('song_time'))\
            .select('userId',
                    'song_time',
                    F.sum('song_time').over(Window.partitionBy('userId')).alias('total_time'))\
            .selectExpr('userId',
                        'song_time/total_time as perc')\
            .groupby('userId')\
            .agg(F.max('perc').alias('max_song_perc'))

singer_perc = user_activities\
              .filter("`singer-song` is not Null and page == 'NextSong'")\
              .groupby(['userId','artist'])\
              .agg(F.sum('length').alias('artist_time'))\
              .select('userId',
                      'artist_time',
                      F.sum('artist_time').over(Window.partitionBy('userId')).alias('total_time'))\
              .selectExpr('userId',
                          'artist_time/total_time as perc')\
              .groupby('userId')\
              .agg(F.max('perc').alias('max_artist_perc'))

numbers_actions = user_activities\
                  .select('userId',
                          F.when(F.col('page') == 'Thumbs Up', 1).otherwise(0).alias('thumbsup'),
                          F.when(F.col('page') == 'Thumbs Down', 1).otherwise(0).alias('thumbsdown'),
                          F.when(F.col('page') == 'Add to Playlist', 1).otherwise(0).alias('add_playlist'),
                          F.when(F.col('page') == 'Add Friend', 1).otherwise(0).alias('add_friend'),
                          F.when(F.col('page') == 'Error', 1).otherwise(0).alias('error'),
                          F.lit(1).alias('x'))\
                  .groupby('userId')\
                  .sum()\
                  .selectExpr('userId',
                              '`sum(thumbsup)` as numbers_thup',
                              '`sum(thumbsdown)` as numbers_thdn',
                              '`sum(add_playlist)` as numbers_addlist',
                              '`sum(add_friend)` as numbers_addfrd',
                              '`sum(error)` as numbers_error',
                              '`sum(thumbsup)`/`sum(x)` as perc_thup',
                              '`sum(thumbsdown)`/`sum(x)` as perc_thdn',
                              '`sum(add_playlist)`/`sum(x)` as perc_addlist',
                              '`sum(add_friend)`/`sum(x)` as perc_addfrd')

song_popularity = user_activities\
                  .filter("`singer-song` is not Null and page == 'NextSong'")\
                  .select('userId','singer-song')\
                  .distinct()\
                  .groupby('singer-song')\
                  .count()
most_popular_bar = song_popularity.approxQuantile('count', [0.5], 0.25)[0]
popular_songs = song_popularity.filter("count >= {}".format(most_popular_bar))
popular_songs_perc = user_activities\
                    .filter("`singer-song` is not Null and page == 'NextSong'")\
                    .join(popular_songs, on="singer-song", how="left")\
                    .select('userId',
                            F.when(F.col('count').isNotNull(), 1).otherwise(0).alias('popular'),
                            F.lit(1).alias('x'))\
                    .groupby('userId')\
                    .sum()\
                    .selectExpr('userId','`sum(popular)`/`sum(x)` as perc_popular_songs')

In [33]:
a = unique_songs\
        .join(unique_singers, on="userId")\
        .join(song_perc, on='userId')\
        .join(singer_perc, on='userId')\
        .join(numbers_actions, on='userId')\
        .join(popular_songs_perc, on='userId')

In [34]:
display(a.limit(10))

userId,unique_songs,unique_artists,max_song_perc,max_artist_perc,numbers_thup,numbers_thdn,numbers_addlist,numbers_addfrd,numbers_error,perc_thup,perc_thdn,perc_addlist,perc_addfrd,perc_popular_songs
100010,270,252,0.010414485726938,0.0114571774559577,17,5,7,4,0,0.0446194225721784,0.0131233595800524,0.0183727034120734,0.0104986876640419,0.1636363636363636
200002,378,339,0.0062979675491267,0.0108147649871793,21,6,8,4,0,0.0443037974683544,0.0126582278481012,0.0168776371308016,0.0084388185654008,0.1033591731266149
125,8,8,0.1872859779587806,0.1872859779587806,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
124,3392,2232,0.0103648274427722,0.0103648274427722,171,41,118,74,6,0.035440414507772,0.0084974093264248,0.0244559585492228,0.0153367875647668,0.1480755087031135
51,1868,1385,0.0150385885919975,0.0150385885919975,100,21,52,28,1,0.0406008932196508,0.0085261875761266,0.0211124644742184,0.0113682501015022,0.1473235433443865
7,148,142,0.0344836729366379,0.0344836729366379,7,1,5,1,1,0.0348258706467661,0.0049751243781094,0.0248756218905472,0.0049751243781094,0.18
15,1719,1302,0.0080332945951972,0.0098634729735986,81,14,59,31,2,0.0355575065847234,0.006145741878841,0.0258999122036874,0.0136084284460052,0.1394984326018808
54,2445,1744,0.0110626081719804,0.0110626081719804,163,29,72,33,1,0.0474388824214202,0.0084400465657741,0.0209545983701979,0.009604190919674,0.1474832805350228
155,762,643,0.013196093088608,0.0141237261847796,58,3,24,11,3,0.0578842315369261,0.0029940119760479,0.0239520958083832,0.0109780439121756,0.15
100014,248,233,0.0383270302139576,0.0383270302139576,17,3,7,6,0,0.0550161812297734,0.0097087378640776,0.0226537216828478,0.0194174757281553,0.1322957198443579


##3.3 Membership 

* last_status (whether the customer is in the 'paid' level on the last day)
* perc_paid_days 
* paid_days

In [36]:
last_status = user_activities\
              .withColumn('rownum', F.row_number().over(Window.partitionBy("userId").orderBy(F.col('time').desc())))\
              .filter('rownum==1')\
              .select('userId',
                      F.when(F.col('level')=='paid', 1).otherwise(0).alias('last_status_paid'))

perc_paid_days = user_activities\
                 .select('userId',
                         'timestamp',
                         F.when(F.col('level')=='paid', 1).otherwise(0).alias('paid'),
                         F.lit(1).alias('x'))\
                 .distinct()\
                 .groupby('userId')\
                 .agg(F.sum('paid').alias('paid_days'), F.sum('x').alias('total_days'))\
                 .selectExpr('userId', 'paid_days', 'paid_days/total_days as perc_paid_days')

##3.4 Static Features
* gender
* state
* useragent
* customer_age (for how long the customer has joined)

In [38]:
def extract_state(x):
  '''
  Extract states from location column
  '''
  try:
    return x.split(",")[1].split("-")
  except:
    return None

extract_state = udf(extract_state, ArrayType(StringType()))

gender = df.select(F.when(F.col('gender')=="F", 1).otherwise(0).alias('female'))
state = one_hot_encode(df.select('userId', F.explode(extract_state('location')).alias('state')), 'state')
customer_age = df\
               .select('userId',clean_ts('registration').alias("registration"))\
               .select('userId',days_till_today('registration').alias('customer_age'))

# 4. Productionize the feature engineering process

In [40]:
%run Users/tw2567@columbia.edu/Udacity/churn_prediction/util

In [41]:
df = spark.read.json("s3://tianyi-wang-data-science-projects/churn-prediction-2020/mini_sparkify_event_data (1).json")

In [42]:
dp = DataPreparation(df)

In [43]:
final_table = dp.run()

In [44]:
final_table.coalesce(1).write.csv("s3://tianyi-wang-data-science-projects/churn-prediction-2020/sample_features_table", mode="overwrite", header=True) 