In [1]:
from pyspark.sql import SparkSession
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from pyspark.sql import functions as F
from pyspark.sql import Window
import pandas as py

spark = SparkSession.builder \
    .master('local') \
    .appName('first_model') \
    .getOrCreate()

In [2]:
trans=spark.read.csv('/Users/xuetong/customer_churn/data_partition/p0/transactions.csv',inferSchema=True, header=True)

In [3]:
trans=trans\
    .withColumn("transaction_date",trans["transaction_date"].cast('string'))\
    .withColumn("membership_expire_date",trans["membership_expire_date"].cast('string'))

trans = trans\
        .withColumn(
            'transaction_date',
                F.to_date(
                    F.unix_timestamp('transaction_date', 'yyyyMMdd').cast('timestamp')))\
        .withColumn(
            'membership_expire_date',
                F.to_date(
                    F.unix_timestamp('membership_expire_date', 'yyyyMMdd').cast('timestamp')))

## Labeling data

In [4]:
def create_lable(df,churn_days):
    '''
    label whether a transaction of a customer will churn, churn is defined by without membership with more than 
    the parameter churn_days
    params
    -------
    df: spark dataframe, where transaction_date and membership_expire_date must be datetype 
    churn_days: days without membership to be a churned customer
    
    returns:
    -------
    df:spark dataframe
    '''
    df = df.orderBy('msno','transaction_date','membership_expire_date')
    w = Window.partitionBy('msno').orderBy('transaction_date','membership_expire_date')
    df = df.withColumn('next_trans_date',F.lead(F.col('transaction_date')).over(w))
    df = df.withColumn('diff_time',F.datediff(df.next_trans_date,df.membership_expire_date))
    df = df.withColumn ('churn',df.diff_time>churn_days)
    df = df.withColumn('churn_date', F.when(df.churn == True, F.date_add(df.membership_expire_date,churn_days+1)))
    return df

In [5]:
trans=create_lable(trans,30)
trans.limit(5).toPandas()

Unnamed: 0,msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel,next_trans_date,diff_time,churn,churn_date
0,+uYayEzlryVEc1b148hc46DU6sr/YHnPE4OCgPZsQGw=,35,7,0,0,0,2015-06-13,2015-06-15,0,,,,
1,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,35,7,0,0,0,2016-11-20,2016-11-27,0,,,,
2,37rQxo+XZR9P0KnW4yVweOqJDpG3vdOX4sIfwCUGGUc=,23,0,0,149,1,2015-03-31,2015-04-30,0,2015-06-30,61.0,True,2015-05-31
3,37rQxo+XZR9P0KnW4yVweOqJDpG3vdOX4sIfwCUGGUc=,23,0,0,149,1,2015-06-30,2015-07-31,0,2015-07-31,0.0,False,
4,37rQxo+XZR9P0KnW4yVweOqJDpG3vdOX4sIfwCUGGUc=,23,0,0,149,1,2015-07-31,2015-08-31,0,2015-08-31,0.0,False,


In [6]:
trans=trans.withColumnRenamed('msno','msno1')

## feature Engineering

In [7]:
############features from transaction dataset
## price difference
trans = trans.withColumn('price_diff',trans.plan_list_price-trans.actual_amount_paid)
## price per day 
trans = trans.withColumn('amt_per_day',trans.actual_amount_paid/trans.payment_plan_days)
## any discount 
trans = trans.withColumn('is_discount',F.when(trans.price_diff>0,1).otherwise(0))
##previous transaction date 
w2 = Window.partitionBy('msno1').orderBy('transaction_date','membership_expire_date')
trans = trans.withColumn('prev_trans_date',F.lag(trans.transaction_date).over(w2))
## whether previous transaction is canceled
trans = trans.withColumn('prev_canceled',F.lag(trans.is_cancel).over(w2))
## how long ago is previous transaction 
trans = trans.withColumn('prev_tran_diff',F.datediff(trans.transaction_date,trans.prev_trans_date))

In [8]:
##########features from user logs dataset 
logs=spark.read.csv('/Users/xuetong/customer_churn/data_partition/p0/logs.csv',inferSchema=True,header=True)

logs=logs.withColumn("date",logs.date.cast('string'))
logs=logs.withColumn("date",F.to_date(F.unix_timestamp('date','yyyyMMdd').cast('timestamp')))

#relative temporal data
days = lambda i: i * 86400 
## two week window 
window3 = Window.partitionBy(logs.msno).orderBy(logs.date.cast("timestamp").cast("long")).rangeBetween(-days(14),0)
## 1 month window
window4 = Window.partitionBy(logs.msno).orderBy(logs.date.cast("timestamp").cast("long")).rangeBetween(-days(30),0)
## user history window 
window5 = Window.partitionBy('msno').orderBy('date')
## previous 2 month to 1 month window
window6 = Window.partitionBy(logs.msno).orderBy(logs.date.cast("timestamp").cast("long")).rangeBetween(-days(60),0)

#######  unique songs played
# sum of daily unique song over last two weeks 
logs = logs.withColumn('fourtheen_sum_uniq', F.sum(logs.num_unq).over(window3))
# max of daily unique song over last two weeks 
logs = logs.withColumn('fourtheen_max_uniq', F.max(logs.num_unq).over(window3))
# average of daily unique song over last two weeks 
logs = logs.withColumn('fourtheen_avg_uniq', logs.fourtheen_sum_uniq/14)
#standard deviation of daily unique song over last two weeks
logs = logs.withColumn('std_uniq_foutheen_days', F.stddev(logs.num_unq).over(window3))


# sum of daily unique song over last 30 days
logs = logs.withColumn('thirty_sum_uniq', F.sum(logs.num_unq).over(window4))
# max of daily unique song over last 30 days
logs = logs.withColumn('thirty_max_uniq', F.max(logs.num_unq).over(window4))
# average of daily unique song over 30 days  
logs = logs.withColumn('thirty_avg_uniq', logs.thirty_sum_uniq/30)
#standard deviation of daily unique song over last 30 days
logs = logs.withColumn('std_uniq_thirty_days', F.stddev(logs.num_unq).over(window4))

# sum of daily unique song over last 60 days
logs = logs.withColumn('sixty_sum_uniq', F.sum(logs.num_unq).over(window6))
# max of daily unique song over last 60 days
logs = logs.withColumn('sixty_max_uniq', F.max(logs.num_unq).over(window6))
# average of daily unique song over last 60 days
logs = logs.withColumn('sixty_avg_uniq', logs.sixty_sum_uniq/30)
#standard deviation of daily unique song over last 60 days
logs = logs.withColumn('std_uniq_thirty_days', F.stddev(logs.num_unq).over(window6))

# difference of average number of unique songs between previous 14 days and previous 30 days
logs = logs.withColumn('diff_uniq_avg_14_30', logs.fourtheen_avg_uniq - logs.thirty_avg_uniq)
logs = logs.withColumn('diff_uniq_avg_30_60', logs.thirty_avg_uniq - logs.sixty_avg_uniq)
# cumulative sum of unique song in history
logs = logs.withColumn('unique_song_cum_sum', F.sum(logs.num_unq).over(window5))


#######  total senconds played 
# daily total seconds over last 14 days
logs = logs.withColumn('sum_secs_fourtheen_days',F.sum(logs.total_secs).over(window3))
logs = logs.withColumn('max_secs_fourtheen_days',F.max(logs.total_secs).over(window3))
logs = logs.withColumn('avg_secs_foutheen_days',logs.sum_secs_fourtheen_days/14)
logs = logs.withColumn('std_sec_foutheen_days', F.stddev(logs.total_secs).over(window3))
# daily total seconds over last 30 days
logs = logs.withColumn('sum_secs_thirty_days',F.sum(logs.total_secs).over(window4))
logs = logs.withColumn('max_secs_thirty_days',F.max(logs.total_secs).over(window4))
logs = logs.withColumn('avg_secs_thirty_days',logs.sum_secs_thirty_days/30)
logs = logs.withColumn('std_sec_thirty_days', F.stddev(logs.total_secs).over(window4))
#daily total seconds over last 60 to last 30 days
logs = logs.withColumn('total_secs_sixty_days',F.sum(logs.total_secs).over(window6))
logs = logs.withColumn('max_secs_sixty_days',F.max(logs.total_secs).over(window6))
logs = logs.withColumn('avg_secs_sixty_days',logs.total_secs_sixty_days/30)
logs = logs.withColumn('std_sec_sixty_days', F.stddev(logs.total_secs).over(window6))
# change in average daily total seconds between previous 30 days and previous 60 days 
logs = logs.withColumn('diff_sec_avg_14_30', logs.avg_secs_foutheen_days - logs.avg_secs_thirty_days)
logs = logs.withColumn('diff_sec_avg_30_60', logs.avg_secs_thirty_days - logs.avg_secs_sixty_days)

# sum of seconds on prior prior month
window7 = Window.partitionBy(logs.msno).orderBy(logs.date.cast("timestamp").cast("long")).rangeBetween(-days(60),-days(30))

logs = logs.withColumn('sum_secs_tow_month',F.sum(logs.total_secs).over(window7))

## overall logged in day 
logs = logs.withColumn('total_login_day',F.count(logs.date).over(window5))

##day since last login
logs=logs.withColumn('last_login', F.lag(logs.date).over(window5))
logs=logs.withColumn('days_since_last_login',F.datediff(logs.last_login,logs.date))

In [9]:
##num 25 features 
logs = logs.withColumn('fourtheen_sum_25', F.sum(logs.num_25).over(window3))
logs = logs.withColumn('thirty_sum_25',F.sum(logs.num_25).over(window4))
logs = logs.withColumn('fourtheen_avg_25', logs.fourtheen_sum_25/14)
logs = logs.withColumn('thirty_avg_25', logs.thirty_sum_25/30)
logs = logs.withColumn('diff_2week_25', logs.fourtheen_avg_25-logs.thirty_avg_25)
logs = logs.withColumn('std_25_one_month', F.stddev(logs.num_25).over(window4))

##num 50 features 
logs = logs.withColumn('fourtheen_sum_50', F.sum(logs.num_50).over(window3))
logs = logs.withColumn('thirty_sum_50',F.sum(logs.num_50).over(window4))
logs = logs.withColumn('fourtheen_avg_50', logs.fourtheen_sum_50/14)
logs = logs.withColumn('thirty_avg_50', logs.thirty_sum_50/30)
logs = logs.withColumn('diff_2week_50', logs.fourtheen_avg_50-logs.thirty_avg_50)
logs = logs.withColumn('std_50_one_month', F.stddev(logs.num_50).over(window4))

##num 75 features 
logs = logs.withColumn('fourtheen_sum_75', F.sum(logs.num_75).over(window3))
logs = logs.withColumn('thirty_sum_75',F.sum(logs.num_75).over(window4))
logs = logs.withColumn('fourtheen_avg_75', logs.fourtheen_sum_75/14)
logs = logs.withColumn('thirty_avg_75', logs.thirty_sum_75/30)
logs = logs.withColumn('diff_2week_75', logs.fourtheen_avg_75-logs.thirty_avg_75)
logs = logs.withColumn('std_75_one_month', F.stddev(logs.num_75).over(window4))

##num 985 feature
logs = logs.withColumn('fourtheen_sum_985', F.sum(logs.num_985).over(window3))
logs = logs.withColumn('thirty_sum_985',F.sum(logs.num_985).over(window4))
logs = logs.withColumn('fourtheen_avg_985', logs.fourtheen_sum_985/14)
logs = logs.withColumn('thirty_avg_985', logs.thirty_sum_985/30)
logs = logs.withColumn('diff_2week_985', logs.fourtheen_avg_985-logs.thirty_avg_985)
logs = logs.withColumn('std_985_one_month', F.stddev(logs.num_985).over(window4))

##num 100 feature
logs = logs.withColumn('fourtheen_sum_100', F.sum(logs.num_100).over(window3))
logs = logs.withColumn('thirty_sum_100',F.sum(logs.num_100).over(window4))
logs = logs.withColumn('fourtheen_avg_100', logs.fourtheen_sum_100/14)
logs = logs.withColumn('thirty_avg_100', logs.thirty_sum_100/30)
logs = logs.withColumn('diff_2week_100', logs.fourtheen_avg_100-logs.thirty_avg_100)
logs = logs.withColumn('std_100_one_month', F.stddev(logs.num_100).over(window4))



In [10]:
logs.limit(5).toPandas()

Unnamed: 0,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs,fourtheen_sum_uniq,...,fourtheen_avg_985,thirty_avg_985,diff_2week_985,std_985_one_month,fourtheen_sum_100,thirty_sum_100,fourtheen_avg_100,thirty_avg_100,diff_2week_100,std_100_one_month
0,+uYayEzlryVEc1b148hc46DU6sr/YHnPE4OCgPZsQGw=,2015-06-13,3,0,0,0,1,4,390.891,4,...,0.0,0.0,0.0,,1,1,0.071429,0.033333,0.038095,
1,/nTKo6fPYX88w+22j72VcZvY0FRR6OqerS9JHcGCD9A=,2015-11-14,0,1,1,0,1,2,520.045,2,...,0.0,0.0,0.0,,1,1,0.071429,0.033333,0.038095,
2,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,2016-06-16,1,0,0,0,17,17,3873.695,17,...,0.0,0.0,0.0,,17,17,1.214286,0.566667,0.647619,
3,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,2016-06-23,0,0,0,1,1,2,489.802,19,...,0.071429,0.033333,0.038095,0.707107,18,18,1.285714,0.6,0.685714,11.313708
4,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,2016-11-20,2,2,0,0,0,2,150.792,2,...,0.0,0.0,0.0,,0,0,0.0,0.0,0.0,


In [11]:
## filtering out date with unknown label
trans = trans.filter(trans.transaction_date <= F.unix_timestamp(F.lit('2017-01-29 00:00:00')).cast('timestamp'))
## join logs to transaction 
trans_logs = trans.join(logs,(trans.msno1 == logs.msno)&(logs.date.\
                                between(trans.transaction_date,trans.membership_expire_date)), how = 'left')

In [12]:
trans_logs.limit(5).toPandas()

Unnamed: 0,msno1,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel,next_trans_date,...,fourtheen_avg_985,thirty_avg_985,diff_2week_985,std_985_one_month,fourtheen_sum_100,thirty_sum_100,fourtheen_avg_100,thirty_avg_100,diff_2week_100,std_100_one_month
0,+uYayEzlryVEc1b148hc46DU6sr/YHnPE4OCgPZsQGw=,35,7,0,0,0,2015-06-13,2015-06-15,0,,...,0.0,0.0,0.0,,1,1,0.071429,0.033333,0.038095,
1,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,35,7,0,0,0,2016-11-20,2016-11-27,0,,...,0.0,0.0,0.0,,0,0,0.0,0.0,0.0,
2,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,35,7,0,0,0,2016-11-20,2016-11-27,0,,...,0.071429,0.033333,0.038095,0.707107,0,0,0.0,0.0,0.0,0.0
3,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,35,7,0,0,0,2016-11-20,2016-11-27,0,,...,0.142857,0.066667,0.07619,0.57735,3,3,0.214286,0.1,0.114286,1.732051
4,1vBzVoPyEXo5ehJkkb27ebI9zrgDwAt31cjFM2HY62k=,35,7,0,0,0,2016-11-20,2016-11-27,0,,...,0.142857,0.066667,0.07619,0.57735,3,3,0.214286,0.1,0.114286,1.5


In [14]:
### making aggregated a=feature 
agg_col=[
        'num_25','num_50','num_75','num_985','num_100','num_unq','total_secs','fourtheen_sum_uniq',
         'fourtheen_max_uniq','fourtheen_avg_uniq','std_uniq_foutheen_days','thirty_sum_uniq','thirty_max_uniq',
         'thirty_avg_uniq','std_uniq_thirty_days','sixty_sum_uniq','sixty_max_uniq','sixty_avg_uniq',
         'diff_uniq_avg_14_30','diff_uniq_avg_30_60','unique_song_cum_sum','sum_secs_fourtheen_days',
         'max_secs_fourtheen_days','avg_secs_foutheen_days','std_sec_foutheen_days','sum_secs_thirty_days',
         'max_secs_thirty_days','avg_secs_thirty_days','std_sec_thirty_days','total_secs_sixty_days',
         'max_secs_sixty_days','avg_secs_sixty_days','std_sec_sixty_days','diff_sec_avg_14_30','diff_sec_avg_30_60',
         'sum_secs_tow_month','total_login_day','days_since_last_login'
        ]
funcs=[F.mean,F.stddev,F.max,F.min]
exprs = [f(F.col(c)) for f in funcs for c in agg_col]+[F.count(F.col('date')),F.max(F.col('date'))]

trans_logs_agg=trans_logs.groupby('msno1','transaction_date','membership_expire_date').agg(*exprs)

trans_logs_agg=trans_logs_agg.withColumnRenamed('msno1','msno2')

trans_logs_agg=trans_logs_agg.withColumnRenamed('transaction_date','transaction_date2')
trans_logs_agg=trans_logs_agg.withColumnRenamed('membership_expire_date','membership_expire_date2')
##join aggragated feature back to transaction dataframe
training = trans.join(trans_logs_agg,(trans.msno1 == trans_logs_agg.msno2)&(trans.transaction_date==trans_logs_agg.transaction_date2)&\
                    (trans.membership_expire_date == trans_logs_agg.membership_expire_date2),how='left')

In [None]:
##save data 
training.write.csv('/Users/xuetong/Downloads/train_df.csv')