# Feature engineering using Spark
- The date ranges from 20170330 to 20170512 of all the data. We define the first 30 days as the feature window and the last 14 days as the label window.
- First we difine the churn users as those who are active in the first 30 days and do not have any activity in the lable window.
- Then we create the frequency features for play, download and search in the frequency window [1, 3, 7, 14, 30] in the feature window. For e.g, number of plays in the last 7 days, number of searches in the last 30 days.
- Since we also have play time and song length data for all the played songs, we can also create features based on thses information. Here I define a finished song as a song that is played 80% of its length, and count the numbers of finished songs in the previous frequency window [1, 3, 7, 14, 30]

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
spark = SparkSession \
        .builder \
        .appName("Feature engineering") \
        .getOrCreate()

In [4]:
df = spark.read.csv('data/event.csv', header=True).cache()

In [5]:
df = df.select('uid', 'event', 'date')

In [6]:
df = df.filter((F.col('date')>='2017-03-30') & (F.col('date')<='2017-05-12'))

In [7]:
df.show(5)

+---------+-----+----------+
|      uid|event|      date|
+---------+-----+----------+
|168050395|    D|2017-03-30|
|168340709|    S|2017-03-30|
|168419556|    S|2017-03-30|
|168447876|    S|2017-03-30|
|168507164|    S|2017-03-30|
+---------+-----+----------+
only showing top 5 rows



In [8]:
print('total # of records: ', df.count())
print('total # of users:', df.select('uid').distinct().count())

total # of records:  9939116
total # of users: 30771


### Some basic spark dataframe operations, such as groupBy, orderBy, aggregation

In [9]:
df.groupBy('event').count().show()

+-----+-------+
|event|  count|
+-----+-------+
|    D| 470162|
|    S| 605193|
|    P|8863761|
+-----+-------+



In [10]:
# groupBy, orderBy
df.groupBy('date', 'event').count().orderBy('date', 'event').show()

+----------+-----+------+
|      date|event| count|
+----------+-----+------+
|2017-03-30|    D|106912|
|2017-03-30|    S| 79143|
|2017-03-31|    D| 44768|
|2017-03-31|    P|272971|
|2017-03-31|    S| 44322|
|2017-04-01|    D| 34480|
|2017-04-01|    P|484054|
|2017-04-01|    S| 33110|
|2017-04-02|    D| 26775|
|2017-04-02|    P|449231|
|2017-04-02|    S| 30566|
|2017-04-03|    D| 15231|
|2017-04-03|    P|233018|
|2017-04-03|    S| 22226|
|2017-04-04|    D| 15188|
|2017-04-04|    P|327309|
|2017-04-04|    S| 18433|
|2017-04-05|    D| 12376|
|2017-04-05|    P|276376|
|2017-04-05|    S| 15705|
+----------+-----+------+
only showing top 20 rows



In [11]:
# aggregate
df.groupBy('uid').agg(
        F.count(F.col('event')).alias('total # of activities for the user'),
        F.max(F.col('date')).alias('last activive date')
                    ).show()

+---------+----------------------------------+------------------+
|      uid|total # of activities for the user|last activive date|
+---------+----------------------------------+------------------+
|159183409|                               720|        2017-05-12|
|165374841|                                62|        2017-04-04|
|167587629|                                24|        2017-04-03|
|167589113|                               214|        2017-04-17|
|167595244|                                51|        2017-04-08|
|167612678|                                13|        2017-04-02|
|167614373|                                54|        2017-05-11|
|167614845|                                20|        2017-04-01|
|167622696|                                11|        2017-04-01|
|167631789|                                 9|        2017-04-01|
|167635050|                              3293|        2017-05-12|
|167635731|                               297|        2017-05-12|
|167639944

In [12]:
# filter
df.filter((F.col('date')>='2017-04-01') & (F.col('date')<='2017-04-15')) \
            .groupBy('date', 'event').count() \
            .orderBy('date', 'event').show()

+----------+-----+------+
|      date|event| count|
+----------+-----+------+
|2017-04-01|    D| 34480|
|2017-04-01|    P|484054|
|2017-04-01|    S| 33110|
|2017-04-02|    D| 26775|
|2017-04-02|    P|449231|
|2017-04-02|    S| 30566|
|2017-04-03|    D| 15231|
|2017-04-03|    P|233018|
|2017-04-03|    S| 22226|
|2017-04-04|    D| 15188|
|2017-04-04|    P|327309|
|2017-04-04|    S| 18433|
|2017-04-05|    D| 12376|
|2017-04-05|    P|276376|
|2017-04-05|    S| 15705|
|2017-04-06|    D| 10357|
|2017-04-06|    P|278176|
|2017-04-06|    S| 15317|
|2017-04-07|    D|  8575|
|2017-04-07|    P|267820|
+----------+-----+------+
only showing top 20 rows



## Define the churn label
- Users that are considered to churn are these who are active in the first 30 days and do not have any activity in the last 14 days.

In [13]:
import datetime
from dateutil import parser

label_window_size = 14
label_window_end_date = parser.parse('2017-05-12').date()
label_window_start_date = label_window_end_date - datetime.timedelta(label_window_size - 1)
print('label window: ', label_window_start_date, '-', label_window_end_date, ' days: ', label_window_size)

feature_window_size = 30
feature_window_end_date = label_window_start_date - datetime.timedelta(1)
feature_window_start_date = feature_window_end_date - datetime.timedelta(feature_window_size - 1)
print('feature window: ', feature_window_start_date, '-', feature_window_end_date, ' days: ', feature_window_size)

label window:  2017-04-29 - 2017-05-12  days:  14
feature window:  2017-03-30 - 2017-04-28  days:  30


In [14]:
# all the uid we will model
df_model_uid = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))\
                    .select('uid').distinct()
# active in label window (active label=0)
df_active_uid_in_label_window = df.filter((F.col('date')>=label_window_start_date) & (F.col('date')<=label_window_end_date))\
                            .select('uid').distinct().withColumn('churn',F.lit(0))

In [15]:
df_model_uid

DataFrame[uid: string]

In [16]:
df_churn = df_model_uid.join(df_active_uid_in_label_window, on=['uid'], how='left')
df_churn = df_churn.fillna(1)

In [17]:
df_churn.groupBy('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|16036|
|    0|14735|
+-----+-----+



The numbers of churn users and un-churn users are at the same level, so we do not need to do downsampling or upsampling and use these as target values directly.

## Create frequency features for play, down and search

In [18]:
df_feature_window = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))
df_feature_window.show()

+---------+-----+----------+
|      uid|event|      date|
+---------+-----+----------+
|168050395|    D|2017-03-30|
|168340709|    S|2017-03-30|
|168419556|    S|2017-03-30|
|168447876|    S|2017-03-30|
|168507164|    S|2017-03-30|
|168486888|    S|2017-03-30|
|168315563|    S|2017-03-30|
|168446240|    S|2017-03-30|
|168507164|    S|2017-03-30|
|168506014|    S|2017-03-30|
|168501374|    S|2017-03-30|
|168508002|    S|2017-03-30|
|168463535|    S|2017-03-30|
|168515699|    S|2017-03-30|
|168507164|    S|2017-03-30|
|168507164|    S|2017-03-30|
|168119029|    S|2017-03-30|
|168108703|    S|2017-03-30|
|168314770|    S|2017-03-30|
|168520382|    S|2017-03-30|
+---------+-----+----------+
only showing top 20 rows



In [19]:
# define a function to generate frequency features for a list of time windows
# using when().otherwise(), and list comprehension trick!
def frequency_feature_generation_time_windows(df,event,time_window_list,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(*[F.sum(F.when((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)).alias('freq_'+event+'_last_'+str(time_window)) \
                for time_window in time_window_list]
            )# *[] opens list and make them comma separated
    return df_feature

In [20]:
# generate one event type, all time windows 
event = 'S'
time_window_list = [1,3,7,14,30]
snapshot_date = feature_window_end_date
df_feature = frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date)
df_feature.show(20)

+---------+-------------+-------------+-------------+--------------+--------------+
|      uid|freq_S_last_1|freq_S_last_3|freq_S_last_7|freq_S_last_14|freq_S_last_30|
+---------+-------------+-------------+-------------+--------------+--------------+
|168473056|            0|            0|            0|             0|            35|
|168476045|            0|            0|            0|             0|             2|
|168238056|            0|            0|            0|             0|            36|
|168447589|            0|            0|            0|             0|            11|
|168480574|            0|            0|            0|             1|            11|
|168363625|            3|            5|            5|             5|            14|
|168506583|            0|            0|            0|            21|            39|
|168109701|            0|            0|            0|             1|             3|
|168149184|            0|            0|            0|             0|        

In [21]:
event_list = ['P','D','S']
time_window_list = [1,3,7,14,30]
df_feature_list = []
for event in event_list:
    df_feature_list.append(frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date))

In [22]:
def join_feature_data(df_master,df_feature_list):
    for df_feature in df_feature_list:
        df_master = df_master.join(df_feature,on='uid',how='left')
        #df_master.persist() # uncomment if number of joins is too many
    return df_master

In [23]:
df_model = join_feature_data(df_churn,df_feature_list)

In [24]:
df_model.show()

+---------+-----+-------------+-------------+-------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+
|      uid|churn|freq_P_last_1|freq_P_last_3|freq_P_last_7|freq_P_last_14|freq_P_last_30|freq_D_last_1|freq_D_last_3|freq_D_last_7|freq_D_last_14|freq_D_last_30|freq_S_last_1|freq_S_last_3|freq_S_last_7|freq_S_last_14|freq_S_last_30|
+---------+-----+-------------+-------------+-------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+
|159183409|    0|            0|            0|            1|            79|           547|            0|            0|            0|             0|             1|            0|            0|            2|             2|             9|
|165374841|    1|            0|            0|            0|     

# Create play time features

In [25]:
df_time = spark.read.csv('data/play/play.csv', header=True).cache()

In [26]:
df_time.show()

+---------+------+----------+---------+--------+----------+
|      uid|device|   song_id|play_time|song_len|      date|
+---------+------+----------+---------+--------+----------+
|168551607|    ar| 5517668.0|      231|   245.0|2017-03-30|
|168542809|    ar| 7149583.0|      214|   215.0|2017-03-30|
|168541712|    ip| 7665801.0|      259|   259.0|2017-03-30|
|168537267|    ip|  492830.0|        1|   215.0|2017-03-30|
|168537689|    ar|   60333.0|       57|   186.0|2017-03-30|
|168537376|    ar| 1119983.0|       48|   194.0|2017-03-30|
|168537267|    ip|   55722.0|        1|   273.0|2017-03-30|
|168542540|    ar|15249349.0|      333|   333.0|2017-03-30|
|168542981|    ar| 8762277.0|        4|   232.0|2017-03-30|
|168543067|    ar| 5517668.0|      192|   245.0|2017-03-30|
|168537267|    ip|  388733.0|        2|   263.0|2017-03-30|
|168540947|    ar| 6651913.0|      251|   251.0|2017-03-30|
|168541542|    ar| 6972794.0|       41|    47.0|2017-03-30|
|168537267|    ip|  625294.0|        0| 

In [27]:
df_time.printSchema()

root
 |-- uid: string (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- play_time: string (nullable = true)
 |-- song_len: string (nullable = true)
 |-- date: string (nullable = true)



In [28]:
df_time = df_time.withColumn('play_time', F.col('play_time').cast('float'))

In [29]:
df_time = df_time.withColumn('song_len', F.col('song_len').cast('float'))

In [30]:
df_time = df_time.withColumn('finished', F.when(F.col('play_time') / F.col('song_len') > 0.8, 1).otherwise(0))

In [31]:
df_time.groupBy('finished').count().show()

+--------+-------+
|finished|  count|
+--------+-------+
|       1|4718103|
|       0|4743080|
+--------+-------+



In [32]:
df_time = df_time[df_time['finished'] == 1]

In [33]:
df_time.count()

4718103

In [34]:
time_window_list = [1,3,7,14,30]

In [35]:
def frequency_feature_generation_time_windows(df,time_window_list,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .groupBy('uid') \
        .agg(*[F.sum(F.when((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)).alias('freq_finished'+'_last_'+str(time_window)) \
                for time_window in time_window_list]
            )# *[] opens list and make them comma separated
    return df_feature

In [36]:
df_time = frequency_feature_generation_time_windows(df_time, time_window_list, feature_window_end_date)

In [37]:
df_time.show()

+---------+--------------------+--------------------+--------------------+---------------------+---------------------+
|      uid|freq_finished_last_1|freq_finished_last_3|freq_finished_last_7|freq_finished_last_14|freq_finished_last_30|
+---------+--------------------+--------------------+--------------------+---------------------+---------------------+
|168541323|                   0|                   0|                   2|                    8|                   95|
|168896704|                 106|                 331|                 792|                 1346|                 3034|
|168966725|                   0|                   0|                 113|                  115|                  663|
|168970658|                   0|                   0|                   0|                    5|                   14|
|168812339|                 126|                 321|                 633|                 1299|                 2141|
|168944638|                   0|                

In [40]:
df_model_final = df_model.join(df_time, on='uid', how='left')

In [46]:
df_model_final.fillna(0).toPandas().to_csv('data/df_model_final.csv', index=False)