#### Import Libraries

In [1]:
import findspark
findspark.init()

In [2]:
# import libraries 

import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt

import plotly.graph_objs as go 
from plotly.subplots import make_subplots

from datetime import datetime, timedelta

from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import avg, concat, countDistinct,\
    col, datediff, date_format, desc, \
    format_number, isnan, lag, lit, udf, split

from pyspark.ml import Pipeline 
from pyspark.ml.feature import MinMaxScaler, StringIndexer, VectorAssembler 
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType

from pyspark.sql.functions import split 
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier 
from pyspark.ml.evaluation  import MulticlassClassificationEvaluator 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# create Spark Session 
spark = SparkSession \
    .builder \
    .appName("Sparkify_churn1")\
    .getOrCreate()

#### 1. Load Dataset

In [None]:
### Load the dataset
path = 'mini_sparkify_event_data.json'
df = spark.read.json(path)

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df.limit(5).toPandas()

In [None]:
df.dtypes

#### Column Split: Categorical  vs Numerical
    => 데이터프레임의 각 컬럼을 분류해준다 

In [None]:
categorical_cols = []
numerical_cols = []


for i in range(len(df.dtypes)):
    if df.dtypes[i][1] == 'string':
        categorical_cols.append(df.dtypes[i][0])
        
    else:
        numerical_cols.append(df.dtypes[i][0])

print("Categorical columns:{}".format(categorical_cols))
print("Numerical columns : {}".format(numerical_cols))

### Categorical Columns
- auth
- gender 
- level 
- location 
- method 
-page
-userAgent 
-userId

=> 각 컬럼에 데이터를 이해하기 위해서 고유의 데이터들을 확인해본다

In [None]:
# auth 
df.select("auth").distinct().show()

In [None]:
# gender 
df.select("gender").distinct().show()

In [None]:
# level 
df.select("level").distinct().show()

In [None]:
# location 
df.select("location").distinct().show(), df.select("location").distinct().count()

In [None]:
# method 
df.select("method").distinct().show()

In [None]:
# page 
df.select("page").distinct().show(), df.select("page").distinct().count()

In [None]:
# userAgent 
df.select("userAgent").distinct().show()

In [None]:
# userId 
df.select("userId").distinct().show(), df.select("userId").distinct().count()

##### 컬럼 탐색 결과:
- Auth: Authorization 과 관련된 데이터로 로그인, 로그아웃, 서비스탈퇴 등의 데이터
- Page: 유저의 행동과 관련된 데이터로 로그인,NextSong(다음 노래 듣기),Submit upgrade, downgrade 등의 중요도가 높은 컬럼으로 볼 수 있다

#### Page 컬럼 데이터 비율 확인해보기

In [None]:
page_list = df.select("page").distinct().toPandas().values.tolist()

# ratio check 
for i in range(len(page_list)):
    page_name = page_list[i].__getitem__(0)
    page_count = df.where(df['page']==page_name).count()/df.count()*100
    
    print("column {}".format(page_name), page_count)

#### Ratio
- nextsong:79%
- Home: 5%
- Thumbs up: 4%
- Login, logout:1%


#### 결측치 제거

In [None]:
# null value check on each columns 
for col in df.columns:
    null_count = df.filter(
        (df[col] == "") | (df[col].isNull()) | isnan(df[col])).count()
    print('Column :{}'.format(col), null_count)

#### Page 결측치 체킹 결과:
- artist,length, song :58392
- firstName, gender, lastName, location, registration, userAgent, userId : 8346

page 컬럼에는 Nextsong 과 같은 노래만 듣는 행동만 담긴게 아니라, Thumb up 혹은 submit upgrade 등의 여러가지 행동이 담겨 있으므로 artist, length, 결측치가 생길수 밖에 없다

그러므로 일단 userId가 없는 데이터 제거하고, sessionId 가 존재하는것만 살려본다

In [None]:
# drop null values 
def drop_na(df):
    """
        drop null value on userId, sessionId
    """
    
    # define filter condition 
    filter_user_id = (df['userId'] != "") & (df['userId'].isNotNull())& (~isnan(df['userId']))
    filter_session_id = (df['sessionId'].isNotNull()) & (~isnan(df['sessionId']))
    
    df_clean = df.filter(filter_user_id).filter(filter_session_id)
    
    return df_clean

In [None]:
# apply drop_na function 
df_clean = drop_na(df)

In [None]:
## null check again 

for col in df_clean.columns:
    null_count = df_clean.filter(
        (df_clean[col] == "") | (df_clean[col].isNull()) | isnan(df_clean[col])).count()
    print('Column :{}'.format(col), null_count)

In [None]:
# count 
df_clean.count()

## 연습용

##### Data preprocessing 
데이터 프레임을 전처리 하여 원하는 데이터 프레임으로 만들어준다
- ts 컬럼을 통해 date, year, month, day, hour, dayofweek, weekofyear을 생성한다
- location 컬럼에서 state를 추출해낸다
- page 컬럼에서 Cancellation Confirmation, Submit downgrade 를 통해 
    유저별로 canceled , downgraded 를 확인하는 컬럼, 그리고 이를 확인하는 churn_service, churn_paid 컬럼, 이탈할 가능성의 단계를 표시하는 phase_cancel, phase_downgrade  컬럼을 생성한다

In [None]:
# convert ts to timestamp, an dcreate date, year, month, day, hour, weekday, weekofyear
def convert_ts(df):
    """
        Convert timestamp column into serveral time columns 
    """
    ts = (F.col('ts')/1000).cast('timestamp')
    
    df_clean = df.withColumn('date',date_format(ts,format='yyyy-MM-dd'))\
        .withColumn('date',F.to_date(F.col('date'),'yyyy-MM-dd'))\
        .withColumn('year',F.year(F.col('date')))\
        .withColumn('month',F.month(F.col('date')))\
        .withColumn('day',F.dayofmonth(F.col('date')))\
        .withColumn('hour', F.hour(ts))\
        .withColumn('dayofweek',F.dayofweek(F.col('date')))\
        .withColumn('weekofyear',F.weekofyear(F.col('date')))
    
    return df_clean

In [None]:
# convert ts to timestamp, an dcreate date, year, month, day, hour, weekday, weekofyear
def convert_registration(df):
    """
        Convert registration column into serveral time columns 
    """
    regi_ts = (F.col('registration')/1000).cast('timestamp')

    df_regi = df.withColumn('regi_date',F.date_format(regi_ts,format='yyyy-MM-dd'))\
        .withColumn('regi_date',F.to_date(F.col('regi_date'),'yyyy-MM-dd'))\
            .withColumn('regi_year',F.year(F.col('regi_date')))\
            .withColumn('regi_month',F.month(F.col('regi_date')))\
            .withColumn('regi_day',F.dayofmonth(F.col('regi_date')))\
            .withColumn('regi_hour', F.hour(regi_ts))\
            .withColumn('regi_dayofweek',F.dayofweek(F.col('regi_date')))\
            .withColumn('regi_weekofyear',F.weekofyear(F.col('regi_date')))
    
    return df_regi

In [None]:
# extract location_state
def extract_location_state(df):
    """
        splits the location column to extract state
    """
    df_extract =  df.withColumn('state',F.split(F.col('location'),", ").getItem(1))
    
    return df_extract

In [None]:
def set_churn_label(df):
    """
        Add churn_label, upgrade, downgrade, cancelled
    """
    # udf to flag churn_service, churn_paid 
    add_cancelled = udf(lambda x:1 if x == "Cancellation Confirmation" else 0, IntegerType())
    add_downgraded = udf(lambda x:1 if x== "Submit Downgrade" else 0, IntegerType())
    add_upgraded = udf(lambda x:1 if x=="Submit Upgrade" else 0, IntegerType())
    
    # apply udf and create flag column 
    df_flag = df.withColumn('cancelled',add_cancelled('page'))\
    .withColumn('downgraded',add_downgraded('page'))\
    .withColumn('upgraded',add_upgraded('page'))
    
    #set windowval and create phase columns 
    windowval = Window.partitionBy(['userId','level']).orderBy(F.desc('ts')).rangeBetween(Window.unboundedPreceding,0)
    df_phase = df_flag.withColumn('phase_downgrade',F.sum('downgraded').over(windowval))\
        .withColumn('phase_upgrade',F.sum('upgraded').over(windowval))
    
    # phase_cancel
    window_cancel = Window.partitionBy(['userId']).orderBy(F.desc('ts')).rangeBetween(Window.unboundedPreceding,0)
    df_phase = df_phase.withColumn('phase_cancel',F.sum('cancelled').over(window_cancel))
    
    # create total churn num
    df_churn = df_phase.withColumn('churn_paid',F.sum('downgraded').over(Window.partitionBy('userId')))\
    .withColumn('churn_service',F.sum('cancelled').over(Window.partitionBy('userId')))
    
    return df_churn

In [None]:
# # prepared df
def cleaned_df(df):
    """
        clean and format dataframe 
    """
    df_clean = drop_na(df)
    df_clean = convert_ts(df_clean)
    df_clean = convert_registration(df_clean)
    df_clean = extract_location_state(df_clean)
    df_clean = set_churn_label(df_clean)
    
    return df_clean

In [None]:
def load_data_format(spark_session, data_path):
    """
        Loads a dataset file from the spark session 
        correspond to each data format
    """
    #csv, json,parquet
#     .split("/")[-1].
    
    data_format = data_path.split(".")[-1]
    
    if data_format == "json":
        return spark_session.read.json(data_path)
    elif data_format == "csv":
        return spark_session.read.csv(data_path)
    elif data_format == "parquet":
        return spark_session.read.parquet(data_path)

In [None]:
# load data 
df = load_data_format(spark,'mini_sparkify_event_data.json')

# # apply clean function 
# df_clean = cleaned_df(df)

In [None]:
# apply clean function 
df_clean = cleaned_df(df)

In [None]:
# checking 
df_clean

In [None]:
df_clean.count()

## 2. Exploratory Data Analysis 
- 유저의 이탈율(churn rate)에 대해 분석해본다.
- 유저 데이터를 통해 이탈율(churn rate)와 관련이 있는 데이터를 분석하고, 추출해본다

##### Define Churn 
- 1. Churn from service 
    => page 컬럼에서 Cancellation confirmation 한것을 서비스에서 이탈했다고 정의한다.
- 2. Churn from paid membership 
    => page 컬럼에서 Submit downgrade 한것을 서비스 유료 결제 멤버쉽에서 이탈했다고 정의한다. 


##### Auth 데이터 분석

In [None]:
# auth checking 
df_clean.select('auth').distinct().show()

In [None]:
df_clean.select(['auth','page'])\
    .filter(df_clean['auth'] == 'Cancelled').distinct().show()

page 컬럼의 Cancellation confirmation 하면
그 이후로는 auth상태는 Cancelled 로 변환이 된다.
즉 auth:Cancelled 를 최종적으로 churn 되었다고 볼 수 있다.

#### 이탈한 유저 데이터 탐색하기
- userId:32

In [None]:
# user 32 

df_clean.select(['userId','auth','level','page','year','month','day','hour'])\
    .filter(df_clean['userId'] == '32').orderBy('hour' ,ascending=False).show()

#### userId:32 의 관찰 결과 
- paid 유저가 Cancellation confirmation 하고 auth가 Cancelled 된것을 확인 가능 

#### 유료 결제 해지를 한 유저의 데이터 탐색 
- userId:11

In [None]:
# user 11 
# explore user 11 data 
# 2018 10 11 15
df_clean.select(['userId','auth','level','ts','page','year','month','day','hour'])\
    .filter(df_clean['userId']=='11')\
    .filter(df_clean['month'] == 10)\
    .filter((df_clean['day'] >= 10) & (df_clean['day'] <=11))\
    .filter((df_clean['ts']>='1539239509000')&(df_clean['ts']<='1539240639000'))\
    .orderBy('ts', ascending=False).show()

Submit downgrade 요청하면 , level이 free로 바뀌는걸 확인 가능하다

#### Customer segmentation
고객들을 유형별로 나눠서 분류하기 위한 상태체크 데이터가 필요하다고 생각해서 
1. Churn from service (플랫폼에서 벗어난 유저상태)
    - cancelled (cancel confirmation 할 경우에 cancelled 체크 컬럼)
    - churn_service (이미 플랫폼에서 벗어난 유저그룹 체크 컬럼)
        :0일경우 벗어나지 않았고, 1일 경우 True
    - phase_cancel(서비스에서 이탈할때까지의 구간동안에 일어난 이벤트 데이터 체크)
        : 1일 경우에 이탈하기전까지의 일어난 데이터라고 보면된다

2. Churn from paid (유료결제를 해지한 유저)
    - downgraded (downgrade, submit Downgrade 적용된경우)
        : 적용 되면 1
    - churn_paid (한번이라도 downgrade, submit downgrade 가 적용되었을 경우에 churn_paid 컬럼을 통해 확인 가능)
       : 유료 결제 해지했을 경우에 1, 2번 해지했을 경우에 2
    - phase_downgrade(유료 결제에서 이탈하기까지의 구간동안에 일어난 이벤트 데이터 체크) 
        : 1일 경우에 유료결제에서 이탈하기 전까지의 일어난 데이터, 2일 경우에 최근 시간에서 역순으로 유료결제 해지하기전까지의 구간데이터

#### 이탈 유저 데이터 탐색 
- userId:18

In [None]:
label_cols = ['userId','level','auth','page','ts','cancelled','phase_cancel','churn_service','downgraded','phase_downgrade','churn_paid']
event_cols = ['userId','level','auth','cancelled','phase_cancel','churn_service','downgraded','phase_downgrade','churn_paid']
## check: user 18

df_clean.filter(df_clean['userId']=='18')\
    .select(label_cols)\
    .limit(5).toPandas()

In [None]:
df_clean.filter(df_clean['userId']=='18')\
    .select(event_cols)\
    .distinct().show()

###### 유저 18 결과
- 유저18은 항상 유료결제
- 유료결제로 이용하다가 마지막에 서비스에서 이탈한것을 확인 할수 있다 
- 그러므로 churn_service = 1 로 유저 18의 모든 데이터가 이탈이용 구간으로 체킹 된것을 확인 가능

##### 유료결제 이탈 유저 탐색
- user 11

In [None]:
# event_cols 
df_clean.filter(df_clean['userId']=='11')\
    .select(event_cols)\
    .distinct().show()

- 위의 데이터에서 유저 11은 일단 유료결제 멤버쉽 해지를 1번 한적이 있는 유저로, churn_paid=1 로써 확인가능
- 유료결제 해지까지 구간은 phase_downgrade=1 로써 구분 가능.
- 하지만 유저 11은 무료에서 다시 유료결제 멤버쉽을 가입한것을 확인 할 수 있다.
- 유저 11이 다시 paid 에서 한번더 이탈한다면 => churn_paid=2 가 될것이다

In [None]:
# user 11

df_clean.filter(df_clean['userId']=='11')\
    .select(label_cols)\
    .filter(df_clean['month'] == 10)\
    .filter((df_clean['day'] >= 10) & (df_clean['day'] <=11))\
    .filter((df_clean['ts']>='1539239509000')&(df_clean['ts']<='1539240639000'))\
    .orderBy('ts', ascending=False).show()

#### 결제 이탈 + 서비스 이탈 유저 탐색 
- 결제에서 이탈하기도 하고 , 그리고 최종적으로 서비스에서도 이탈한 유저를 탐색해본다
- userId:12

In [None]:
# event_cols 
df_clean.filter(df_clean['userId']=='12')\
    .select(event_cols)\
    .orderBy('ts', ascending=False)\
    .distinct().show()

- 위의 데이터를 보면 최종적으로 churn_service=1, churn_paid=1을 통해 결제와 서비스에서 이탈한것을 확인 가능
- 1. 무료=> 유료 결제로 해지하기 까지의 phase_downgrade=1 컬럼 구간을 통해 구분 가능하고
- 2. 다시 free=> paid로 결제
- 3. 하지만 최종적으로는 cancelled=1 로 서비스에서 이탈하였으므로 churn_service=1로 최종적으로 구분가능
- 최종적으로 모든 유저 12의 데이터는 서비스에서 이탈한 유저의 데이터로써 볼 수 있음과 동시에 결제에서 이탈했다고도 볼수 있음

##### 이탈하지 않은 유저 탐색 
- 결제 및 서비스에서 이탈하지 않은 유저 데이터 탐색해본다
- userId:104


In [None]:
filter_churn_service = F.col('churn_service') !=0
filter_churn_paid = F.col('churn_paid') !=0

In [None]:
# event_cols 
df_clean.filter(df_clean['userId']=='104')\
    .select(event_cols)\
    .orderBy('ts', ascending=False)\
    .distinct().show()

- 위의 유저 104데이터를 통해 free에서 paid로 유료결제 한 후로 쭉 이용하고 있는 유저로 확인 할 수 있다.

#### Churn user count analysis
- 이탈 유저 수 분석

In [None]:
## total number of users 
total_num_users = df_clean.select('userId').distinct().count()

In [None]:
## 1. number of churn_paid users 
# churn_paid =1 , userId 
num_churn_paid = df_clean.select('userId')\
    .filter(df_clean['churn_paid'] !=0)\
    .distinct().count()

print('{} users Cancelled paid level from the service'.format(num_churn_paid))
print('{:.2f}% of users are downgraded'.format(num_churn_paid/total_num_users*100))

In [None]:
#2. number of churn_service users 
num_churn_service = df_clean.select('userId')\
    .filter(df_clean['churn_service'] !=0)\
    .distinct().count()

print('{} users unsubscribed from the service'.format(num_churn_service))
print('{:.2f} of users are unsubscribed'.format(num_churn_service/total_num_users*100))

In [None]:
# 3. number of churn_paid then, chrun_service either 
filter_churn_service = F.col('churn_service')!=0
filter_churn_paid = F.col('churn_paid') !=0

num_churn_paid_service = df_clean.select('userId')\
    .filter(filter_churn_service&filter_churn_paid).distinct().count()

print('{} users downgraded and unsubscribed'.format(num_churn_paid_service))
print('{:.2f}% of users are downgraded and unsubscribed'.format(num_churn_paid_service/total_num_users*100))

#### Explore Data 
- Impact of gender
- Impact of the subscription level(paid vs free)
- Impact of location
- Impact of avg number of repeat
- Impact of the avg number of ads 
- Impact of the number of logins and time between two logins
- Impact of login count 
- Impact of daily behavior
- Impact of the listening time per session
- Impact of the time of the activity (count of actions)
    - week
    - month
- impact of the time between registration, upgrade and downgrade events

#### Impact of the gender
- Cancellation analysis per gender 
    - churned_user vs active_user 
- Downgrade analysis 
    - churned_paid user vs active paid user


In [None]:
# row_number column = recent value
windowval = Window.partitionBy('userId').orderBy(F.desc('ts'))
df_clean = df_clean.withColumn('row_number',F.row_number().over(windowval))

In [None]:
# set segmetation data 
filter_churn_service = F.col('churn_service') !=0
filter_churn_paid = F.col('churn_paid') !=0
filter_no_churn = ~(filter_churn_service|filter_churn_paid)
filter_churn_either = (filter_churn_service&filter_churn_paid)


In [None]:
# customer segmentation
gender_cols = ['userId','gender']

gender_no_churn = df_clean.select(['userId','gender']).filter(filter_no_churn).distinct().toPandas()
gender_churn_paid = df_clean.select(gender_cols)\
    .filter(filter_churn_paid).distinct().toPandas()
gender_churn_service = df_clean.select(gender_cols)\
    .filter(filter_churn_service).distinct().toPandas()

gender_active_paid = df_clean.filter(F.col('row_number')==1)\
    .filter(F.col('level')=='paid').filter(filter_no_churn)\
    .select('userId','gender').distinct().toPandas()

In [None]:
# unique user df
df_total_user = df_clean.select(['userId','gender']).distinct().toPandas()

In [None]:
# total male, female count
num_total_male = df_total_user[df_total_user['gender']=='M'].gender.count()
num_total_female = df_total_user[df_total_user['gender']=='F'].gender.count()

In [None]:
# cancellation analysis 
# male 
print('churnd user')
print('male ratio:',gender_churn_service[gender_churn_service.gender=='M'].gender.count()/num_total_male)
print('female ratio:',gender_churn_service[gender_churn_service.gender=='F'].gender.count()/num_total_female)

In [None]:
def get_ratio_by_gender(df,title):
    
    df_total_user = df_clean.select(['userId','gender']).distinct().toPandas()
    
    num_total_male = df_total_user[df_total_user['gender']=='M'].gender.count()
    num_total_female = df_total_user[df_total_user['gender']=='F'].gender.count()
    
    print(title)
    print('male ratio:',df[df.gender=='M'].count()[0]/num_total_male)
    print('female ratio:',df[df.gender=='F'].count()[0]/num_total_female)

In [None]:
get_ratio_by_gender(gender_no_churn,'active no churn user')

In [None]:
get_ratio_by_gender(gender_churn_paid,'churn paid user')

In [None]:
get_ratio_by_gender(gender_active_paid,'active paid user')

In [None]:
trace1 = go.Bar(
    x=['Male','Female'],
    y=[sum(gender_churn_service.gender == 'M'),sum(gender_churn_service.gender=='F')],
    name='churned user'
)

trace2 = go.Bar(
    x=['Male','Female'],
    y=[sum(gender_no_churn.gender == 'M'),sum(gender_no_churn.gender=='F')],
    name='active user'
)

trace3 = go.Bar(
    x=['Male','Female'],
    y=[sum(gender_churn_paid.gender=='M'),sum(gender_churn_paid.gender=='F')],
    name='churn paid user'
)
trace4 = go.Bar(
    x=['Male','Female'],
    y=[sum(gender_active_paid.gender=='M'), sum(gender_active_paid.gender=='F')],
    name='active paid user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,2)
fig.append_trace(trace4,1,2)

fig.layout.update(height=500, width=700, )
fig.update_xaxes(
    title_text = 'Gender'
)
fig.update_yaxes(
    title_text = 'Count of users'
)
fig.show()

- 위의 그래프로 보아서 성별의 영향이 큰지는 딱히 잘 모르겠지만 그래프를 통해서 
- 남성이 서비스에서 이탈할 확률이 근소하게 더 높은것 같고 
- 결제에서의 이탈 확률은 여성이 조금 더 높다는걸 확인할수 있다. 
- 하지만 유저 수가 225명으로 얼마 되지 않기 떄문에 큰 영향은 확신할수 없다

#### Impact of level (free vs paid)

In [None]:
level_no_churn = df_clean.select(['userId','level'])\
    .filter(F.col('row_number')==1)\
    .filter(filter_no_churn).distinct().toPandas()

level_churn_service= df_clean.select(['userId','level'])\
    .filter(F.col('row_number')==1)\
    .filter(filter_churn_service).distinct().toPandas()

In [None]:
trace1 = go.Bar(
    x=['Paid', 'Free'],
    y=[100*len(level_churn_service[level_churn_service['level']=='paid'])/level_churn_service.shape[0],
      100*len(level_churn_service[level_churn_service['level']=='free'])/level_churn_service.shape[0]],
    name='churned user'
)

trace2 = go.Bar(
    x=['Paid', 'Free'],
    y=[100* len(level_no_churn[level_no_churn['level']=='paid'])/level_no_churn.shape[0],
      100* len(level_no_churn[level_no_churn['level']=='free'])/level_no_churn.shape[0]],
    name='active user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=1,
    subplot_titles=['Service cancellation analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)


fig.layout.update(height=500, width=700, )
fig.update_xaxes(
    title_text = 'Level'
)
fig.update_yaxes(
    title_text = 'Count of users'
)
fig.show()

Level결과
- 위의 결과로 서비스 이탈 유저는 paid 의 비율이 높은걸 확인 가능 
- 하지만 현재 사용 유저중에서의 paid의 비율이 높은걸로도 확인가능 

#### Impact of the location of the user

In [None]:
df_clean.select(['location','state']).distinct().show()

In [None]:
location_cols = ['userId','state']

location_no_churn = df_clean.select(['userId','state','level'])\
    .filter(filter_no_churn).distinct().toPandas()
location_churn_paid = df_clean.select(location_cols)\
    .filter(filter_churn_paid).distinct().toPandas()
location_churn_service = df_clean.select(location_cols)\
    .filter(filter_churn_service).distinct().toPandas()

In [None]:
locations = df_clean.select('state').distinct().toPandas()['state']

In [None]:
trace1 = go.Bar(
    x=locations,
    y=location_churn_service.groupby('state')['userId'].count(),
    name='churned user'
)

trace2 = go.Bar(
    x=locations,
    y=location_no_churn.groupby('state')['userId'].count(),
    name='active user'
)

trace3 = go.Bar(
    x=locations,
    y=location_churn_paid.groupby('state')['userId'].count(),
    name='churn paid user'
)
trace4 = go.Bar(
    x=locations,
    y=location_no_churn[location_no_churn['level']=='paid'].groupby('state')['userId'].count(),
    name='active paid user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,2)
fig.append_trace(trace4,1,2)

fig.layout.update(height=500, width=1000, )
fig.update_xaxes(
    title_text = 'location'
)
fig.update_yaxes(
    title_text = 'Count of users'
)
fig.show()

location 결과
- 특정 지역에서 서비스 이탈이 많은걸 확인 할수 있지만, 유저가 그리 많지 않아서 큰 의미는 잘 모르겠다.
- 하지만 유료결제 이탈에서의 LA 지역이 뚜렷하게 나타난다

#### Impact of avg number of repeat
- 같은노래의 반복 횟수를 분석해본다

In [None]:
df_clean.select(['userId']).distinct().count()

In [None]:
repeat_cols = ['userId','level','churn_service','churn_paid']

df_count_repeat = df_clean.groupby(repeat_cols)\
    .agg(F.count('song').alias('total_num_song'),\
        F.countDistinct('song').alias('unique_num_song'),\
        F.countDistinct('artist').alias('distinct_artist'))

In [None]:
df_count_repeat.orderBy('userId').show()

In [None]:
# compute ratio of repeat song, distinct artist 

df_count_repeat = df_count_repeat\
    .withColumn('repeat_ratio',F.round(100-(100 * df_count_repeat['unique_num_song']/df_count_repeat['total_num_song'])))\
    .withColumn('repeat_distinct_artist', F.round(100*df_count_repeat['distinct_artist']/df_count_repeat['total_num_song']))

In [None]:
# segmentation
repeat_cols = ['userId','level','repeat_ratio','repeat_distinct_artist','distinct_artist']

count_repeat_no_churn = df_count_repeat.filter(filter_no_churn)\
    .select(repeat_cols).distinct().dropna().toPandas()

count_repeat_churn_paid = df_count_repeat.filter(filter_churn_paid)\
    .select(repeat_cols).distinct().dropna().toPandas()
count_repeat_churn_service = df_count_repeat.filter(filter_churn_service)\
    .select(repeat_cols).distinct().dropna().toPandas()

In [None]:
# active user ratio
count_repeat_no_churn.repeat_ratio.mean(),count_repeat_no_churn.repeat_distinct_artist.mean()

In [None]:
# churn_service ratio
count_repeat_churn_service.repeat_ratio.mean(), count_repeat_churn_service.repeat_distinct_artist.mean()

In [None]:
# churn paid
count_repeat_churn_paid.repeat_ratio.mean(),count_repeat_churn_paid.repeat_distinct_artist.mean(),

In [None]:
# active paid user 
count_repeat_active_paid = count_repeat_no_churn[count_repeat_no_churn['level']=='paid']

count_repeat_active_paid.repeat_ratio.mean(), count_repeat_active_paid.repeat_distinct_artist.mean()

##### visualization : number of repeat

In [None]:
trace1 = go.Bar(
    x=['repeat','distinct_artist'],
    y=[# churn_service ratio
count_repeat_churn_service.repeat_ratio.mean(), count_repeat_churn_service.repeat_distinct_artist.mean()],
    name='churned user'
)

trace2 = go.Bar(
    x=['repeat','distinct_artist'],
    y=[count_repeat_no_churn.repeat_ratio.mean(),count_repeat_no_churn.repeat_distinct_artist.mean()],
    name='active user'
)

trace3 = go.Bar(
    x=['repeat','distinct_artist'],
    y=[count_repeat_churn_paid.repeat_ratio.mean(),count_repeat_churn_paid.repeat_distinct_artist.mean()],
    name='churn paid user'
)
trace4 = go.Bar(
    x=['repeat','distinct_artist'],
    y=[count_repeat_active_paid.repeat_ratio.mean(), count_repeat_active_paid.repeat_distinct_artist.mean()],
    name='active paid user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,2)
fig.append_trace(trace4,1,2)

fig.layout.update(height=500, width=700, )
fig.update_xaxes(
    title_text = 'repeat'
)
fig.update_yaxes(
    title_text = 'avg ratio of the user'
)
fig.show()

number of repeat 결과
- 서비스나 유료결제에서의 이탈 유저의 평균 같은 노래 반복 비율은 상대적으로, 새로운 노래를 듣는 비율보다 낮으며 활성화된 이용 유저들보다 낮다.
- 같은 노래를 듣는 반복 비율이 낮으면 => 서비스, 유료결제에서의 이탈에 영향을 준다고 볼 수 있다

### Impact of the avg number of ads
- 평균 광고 시청 회수 분석을 통해 이탈율에게 영향을 주는지 확인

In [None]:
is_ads = udf(lambda x: 1 if x == 'Roll Advert' else 0 , IntegerType())
is_song = udf(lambda x:1 if x == 'NextSong' else 0, IntegerType())

# apply udf to on page column 

df_count_ads = df_clean.withColumn('is_song', is_song(F.col('page')))\
    .withColumn('is_ads', is_ads(F.col('page')))


In [None]:
df_count_ads = df_count_ads.groupby(['userId','level','churn_service','churn_paid'])\
    .agg(F.sum('is_song').alias('num_song'),\
        F.sum('is_ads').alias('num_ads'))

In [None]:
df_count_ads = df_count_ads.withColumn('ratio_song_ads',F.round(100*df_count_ads['num_ads']/df_count_ads['num_song'],2))

In [None]:
# 422 count
df_count_ads.show()

In [None]:
ads_cols = ['userId','ratio_song_ads','level']

count_ads_no_churn = df_count_ads.filter(filter_no_churn)\
    .select(ads_cols).distinct().toPandas()
count_ads_churn_paid = df_count_ads.filter(filter_churn_paid)\
    .select(ads_cols).distinct().toPandas()
count_ads_churn_service = df_count_ads.filter(filter_churn_service)\
    .select(ads_cols).distinct().toPandas()

In [None]:
# 1.churn_service_paid vs active_paid
# churn_Service_paid_user
trace1 = go.Box(
    x=count_ads_churn_service[count_ads_churn_service['level']=='paid']['ratio_song_ads'],
    name='churned service paid user'
)
# active paid user
trace2 = go.Box(
    x=count_ads_no_churn[count_ads_no_churn['level']=='paid']['ratio_song_ads'],
    name='active paid user'
)

# 2. churn_Service free user vs active free user
# churn_service_free_user
trace3 = go.Box(
    x=count_ads_churn_service[count_ads_churn_service['level']=='free']['ratio_song_ads'],
    name='churn service free user'
)
trace4 = go.Box(
    x=count_ads_no_churn[count_ads_no_churn['level']=='free']['ratio_song_ads'],
    name='active free user'
)

### 3. downgrade: churn_paid user vs active paid user

trace5 = go.Box(
    x=count_ads_churn_paid[count_ads_churn_paid['level']=='paid']['ratio_song_ads'],
    name='churn paid user'
)
trace6 = go.Box(
    x=count_ads_no_churn[count_ads_no_churn['level']=='paid']['ratio_song_ads'],
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=3, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)
fig.append_trace(trace5,3,1)
fig.append_trace(trace6,3,1)


fig.layout.update(height=900, width=700, )
fig.update_xaxes(
    title_text = 'ratio of ads listened : Active vs churned user'
)
fig.update_yaxes(
    title_text = 'ratio of ads'
)
fig.show()

광고 횟수 분석 결과
- 전체적으로 서비스나 유료 멤버쉽에서의 이탈한 유저들은 상대적으로 노래 대비 광고 시청 비율이 높았다.
- free 유저가 paid 유저보다 평균적으로 약 10배정도 광고를 더 시청한다
- 광고 시청 횟수 비율이 이탈율에 영향을 주는것 같다

#### Impact of the number of logins 

##### count login

In [None]:
# login count df
df_count_login = df_clean.groupby(['userId','level','churn_service','churn_paid'])\
    .agg(F.countDistinct('sessionId').alias('count_login'))

In [None]:
# checking 
df_count_login.toPandas()

In [None]:
login_cols = ['userId','count_login','level']

login_no_churn = df_count_login.filter(filter_no_churn)\
    .select(login_cols).toPandas()
login_churn_paid = df_count_login.filter(filter_churn_paid)\
    .select(login_cols).toPandas()
login_churn_service = df_count_login.filter(filter_churn_service)\
    .select(login_cols).toPandas()

In [None]:
print("active user",login_no_churn.count_login.mean())
print("churn service user", login_churn_service.count_login.mean())
print("active paid user",login_no_churn[login_no_churn['level']=='paid'].count_login.mean())
print("churn paid user",login_churn_paid.count_login.mean())

In [None]:
# 1.churn_service_paid vs active_paid
# churn_Service_paid_user
trace1 = go.Box(
    x=login_churn_service[login_churn_service['level']=='paid']['count_login'],
    name='churned service paid user'
)
# active paid user
trace2 = go.Box(
    x=login_no_churn[login_no_churn['level']=='paid']['count_login'],
    name='active paid user'
)

# 2. churn_Service free user vs active free user
# churn_service_free_user
trace3 = go.Box(
    x=login_churn_service[login_churn_service['level']=='free']['count_login'],
    name='churn service free user'
)
trace4 = go.Box(
    x=login_no_churn[login_no_churn['level']=='free']['count_login'],
    name='active free user'
)

### 3. downgrade: churn_paid user vs active paid user

trace5 = go.Box(
    x=login_churn_paid[login_churn_paid['level']=='paid']['count_login'],
    name='churn paid user'
)
trace6 = go.Box(
    x=login_no_churn[login_no_churn['level']=='paid']['count_login'],
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=3, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)
fig.append_trace(trace5,3,1)
fig.append_trace(trace6,3,1)


fig.layout.update(height=900, width=700, )
fig.update_xaxes(
    title_text = 'count of login : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

#### avg time between two login(delta time)
- 세션 사이의 평균 시간차

In [None]:
recent_user_level = df_clean.filter(F.col('row_number')==1)\
    .select(['userId','level']).distinct()

In [None]:
# grouping by userd and sorting by descending date
windowval = Window.partitionBy('userId').orderBy(F.desc('ts'))

# add next_date column 
df_delta_login = df_clean.withColumn('next_date',lag('date',1).over(windowval))

# calculate the delta between two login 
df_delta_login = df_delta_login.withColumn("delta_time", F.datediff(F.col('next_date'),F.col('date')))


In [None]:
# compute the avg delta_time between login for each user 

delta_cols = ['userId','level','churn_service','churn_paid','delta_time']

df_delta_login = df_delta_login.select(delta_cols)\
    .filter(df_delta_login['delta_time'] != 0)
distinct_session = df_clean.groupby('userId').agg(F.countDistinct('sessionId').alias('user_num_session'))

# join: df_delta_login + distinct_session
df_delta_login = df_delta_login.join(distinct_session, on=['userId'], how='inner')

# calculate avg delta time , total_avg_delta
df_delta_login = df_delta_login.withColumn('avg_delta',F.col('delta_time')/F.col('user_num_session'))\
    .withColumn('total_avg_delta',F.sum('avg_delta').over(Window.partitionBy('userId')))


In [None]:
delta_login = df_delta_login.groupby(['userId','level','churn_service','churn_paid'])\
    .agg(F.max('total_avg_delta').alias('total_avg_delta'))

In [None]:
# function 
def set_recent_user_level(df):
    """
        Distinct only recent userId, level 
        
        Input:
            df(spark dataframe)
    """
    recent_user_level = df_clean.filter(F.col('row_number')==1)\
    .select(['userId','level']).distinct()
    
    df_result =  recent_user_level.join(df, on=['userId','level'], how='inner')
    
    return df_result

In [None]:
res_delta_login = set_recent_user_level(delta_login)

In [None]:
delta_login_no_churn = res_delta_login.filter(filter_no_churn).toPandas()
delta_login_churn_paid = res_delta_login.filter(filter_churn_paid).toPandas()
delta_login_churn_service = res_delta_login.filter(filter_churn_service).toPandas()

In [None]:
res_delta_login.toPandas().head()

In [None]:
# avg 
print("churned service user avg delta time of login:",delta_login_churn_service.total_avg_delta.mean())
print("churned paid user avg delta time of login:",delta_login_churn_paid.total_avg_delta.mean())
print("no churned user avg delta time of login:",delta_login_no_churn.total_avg_delta.mean())

In [None]:
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=delta_login_churn_service['total_avg_delta'],
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=delta_login_no_churn['total_avg_delta'],
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=delta_login_churn_paid['total_avg_delta'],
    name='churn paid ser'
)
trace4 = go.Box(
    x=delta_login_no_churn[delta_login_no_churn['level']=='paid']['total_avg_delta'],
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=900, width=700, )
fig.update_xaxes(
    title_text = 'avg delta time of login : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

avg delta time of login 결과 
- 서비스, 유료멤버쉽에서 이탈한 유저들의 평균 로그인 하는 간격의 시간차가 더 평균적으로 더 짧은것을 확인 할 수 있다. 자주 로그인 하는 유저들이 이탈한다는것을 확인 가능.

### Impact of daily behavior
- page에 해당하는 다른 여러 액션이 이탈율에 영향을 주는지 분석해본다 
- 노래를 듣는지, thumb up 같은 요소 분석
- 주간 단위로 나눠서 분석한다

In [None]:
# udf=> is_song, is_thumb_up, is_thumb_down 
is_song = udf(lambda x: 1 if x == 'NextSong' else 0 , IntegerType())
is_thumb_up = udf(lambda x:1 if x =='Thumbs Up' else 0, IntegerType())
is_thumb_down  = udf(lambda x:1 if x== 'Thumbs Down' else 0, IntegerType())

# add column using udf 
# daily_action
df_daily_action = df_clean.withColumn('is_song',is_song(F.col('page')))\
    .withColumn('is_thumb_up',is_thumb_up(F.col('page')))\
    .withColumn('is_thumb_down',is_thumb_down(F.col('page')))\
    .withColumn('weekofyear',F.weekofyear(F.col('date')))

In [None]:
# create window per week 
windowval = Window.partitionBy(['userId','year','weekofyear']).orderBy('ts')\
    .rangeBetween(Window.unboundedPreceding,0)

# compute total_num_song_week, thumb_up_week
df_daily_action = df_daily_action.withColumn('total_song_week',F.sum('is_song').over(windowval))\
    .withColumn('total_thumb_up_week',F.sum('is_thumb_up').over(windowval))\
    .withColumn('total_thumb_down_week', F.sum('is_thumb_down').over(windowval))


In [None]:
# groupby 

daily_cols = ['userId','year','weekofyear','level','churn_service','churn_paid']

df_daily_filt = df_daily_action.groupby(daily_cols)\
    .agg(F.max('total_song_week').alias('max_song_week'), \
        F.max('total_thumb_up_week').alias('max_thumb_up_week'),\
        F.max('total_thumb_down_week').alias('max_thumb_down_week'))

# calculate avg 
df_daily_filt = df_daily_filt.withColumn('avg_song_week',F.col('max_song_week')/7)\
    .withColumn('avg_thumb_up_week',F.col('max_thumb_up_week')/7)\
    .withColumn('avg_thumb_down_week',F.col('max_thumb_down_week')/7)

In [None]:
# apply segmentation 
avg_week_no_churn = df_daily_filt.filter(filter_no_churn).distinct().toPandas()
avg_week_churn_paid = df_daily_filt.filter(filter_churn_paid).distinct().toPandas()
avg_week_churn_service = df_daily_filt.filter(filter_churn_service).distinct().toPandas()

In [None]:
avg_week_no_churn.columns

###### avg_num_song_week

In [None]:
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=avg_week_churn_service['avg_song_week'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=avg_week_no_churn['avg_song_week'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=avg_week_churn_paid['avg_song_week'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=avg_week_no_churn[avg_week_no_churn['level']=='paid']['avg_song_week'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=900, width=700, )
fig.update_xaxes(
    title_text = 'avg song count per week : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

avg_song_count per week 결과 
- 서비스에서 이탈한 유저가 상대적으로 주간 평균 노래 듣는 횟수가 활성화된 유저보다 적은 편이다.
- 유료멤버쉽 에서 이탈한 유저가 상대적으로 활성화된 유료 유저보다 주간 평균 노래 듣ㄷ는 회수가 근소하게 많다.

###### avg_thump_up_week

In [None]:
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=avg_week_churn_service['avg_thumb_up_week'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=avg_week_no_churn['avg_thumb_up_week'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=avg_week_churn_paid['avg_thumb_up_week'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=avg_week_no_churn[avg_week_no_churn['level']=='paid']['avg_thumb_up_week'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=900, width=700, )
fig.update_xaxes(
    title_text = 'avg thumb up count per week : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

avg thumb up week 결과
- 서비스 이탈 유저가 평균적으로 따봉주는 횟수가 적다 
- 유료 멤버쉽 이탈 유저가 평균적으로 따봉주는 횟수가 많다 

- 결과적으로 서비스 이탈 유저는 노래를 듣거나, 따봉을 주거나 하는 daily action 에서의 빈도가 낮고, 유료 멤버쉽 이탈 유저는 daily action 에서의 빈도가 잦다

#### Impact of the listening time per session
- 세션마다 노래를 듣는 시간을 분석해보자

In [None]:
# window parititon by userId and sorting by descending timestamp
window_user_ts = Window.partitionBy('userId').orderBy(desc('ts'))

# session window 
window_session = Window.partitionBy(['userId','sessionId']).orderBy('ts')\
    .rangeBetween(Window.unboundedPreceding,0)

# new column next_ts, next_action

df_listen_session = df_clean.withColumn('next_ts',lag('ts',1).over(window_user_ts))\
    .withColumn('next_action',lag('page',1).over(window_user_ts))\
    .filter(F.col('page')=='NextSong')

# calculate the difference between two timestamp
df_listen_session = df_listen_session.withColumn('diff_ts',(F.col('next_ts')-F.col('ts'))/1000)

# add column with total listening per session 
# 세션별로 노래 들은 시간 ts => list_session
df_listen_session = df_listen_session.withColumn('listen_session',F.sum('diff_ts').over(window_session))

# 즉 list_Session의 총합 max로 해서 
# 그룹으로 묶어버리기 

df_listen_session_filt = df_listen_session.groupby(['userId','sessionId','level','churn_service','churn_paid'])\
    .agg(F.max('listen_session').alias('total_listen_session'),\
        F.max('iteminSession').alias('item_session'))

# avg listen time session 
df_listen_session_filt = df_listen_session_filt.withColumn('avg_listen_session',
                                                        F.round(F.col('total_listen_session')/F.col('item_session')))

In [None]:
# segmentation 

listen_session_no_churn = df_listen_session_filt.filter(filter_no_churn).toPandas()
listen_session_churn_paid= df_listen_session_filt.filter(filter_churn_paid).toPandas()
listen_session_churn_service = df_listen_session_filt.filter(filter_churn_service).toPandas()

In [None]:
listen_session_no_churn.head()

In [None]:
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=listen_session_churn_service['avg_listen_session'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=listen_session_no_churn['avg_listen_session'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=listen_session_churn_paid['avg_listen_session'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=listen_session_no_churn[listen_session_no_churn['level']=='paid']['avg_listen_session'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=900, width=1200, )
fig.update_xaxes(
    title_text = 'avg listen time per session : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

In [None]:
trace1 = go.Bar(
    x=['avg_listen_time','avg_item_in_session'],
    y=[listen_session_churn_service.avg_listen_session.mean(),listen_session_churn_service.item_session.mean()],
    name='churn service user'
)

trace2 = go.Bar(
    x=['avg_listen_time','avg_item_in_session'],
    y=[listen_session_no_churn.avg_listen_session.mean(),listen_session_no_churn.item_session.mean()],
    name='active user'
)

trace3 = go.Bar(
    x=['avg_listen_time','avg_item_in_session'],
    y=[listen_session_churn_paid.avg_listen_session.mean(),listen_session_churn_paid.item_session.mean()],
    name='churn paid user'
)
trace4 = go.Bar(
    x=['avg_listen_time','avg_item_in_session'],
    y=[listen_session_no_churn[listen_session_no_churn['level']=='paid'].avg_listen_session.mean(),
      listen_session_no_churn[listen_session_no_churn['level']=='paid'].item_session.mean()],
    name='active paid user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,2)
fig.append_trace(trace4,1,2)

fig.layout.update(height=600, width=900, )
# fig.update_xaxes(
#     title_text = 'avg listen time'
# )
fig.update_yaxes(
    title_text = 'avg listen time per session'
)
fig.show()

avg_listen_time per session+ avg_number_item per session 결과
- 서비스 이탈 유저보다 현재 활성화 되어 있는 유저가 세션마다 듣는 시간이 평균적으로 높다 
- 서비스 이탈 유저보다 활성화 유저의 세션별 아이템 수가 평균적으로 높다
- 유료 멤버쉽 이탈 유저의 평균 세션별 듣는 시간이 현재 유료 활성화 유저보다 높다 
- 유료 멤버쉽 이탈 유저의 평균 세션별 아이템 수가 현재 유료 활성화 유저보다 낮으므로, 
    유료 멤버쉽 이탈 유저는 아무튼 세션별 평균적으로 노래를 오래 듣는다고 파악할수 있다.

#### Impact of the time of the activity (count of actions)


- 각 유저의 행동 횟수를 page 컬럼을 통해 구해본다

###### avg_action_per_day

In [None]:
# get the total number of actions per user 
df_action_user = df_clean.groupby(['userId','level','churn_service','churn_paid'])\
    .agg(F.count('page').alias('action_per_user')).toPandas()

In [None]:
# count number of actions per day 
df_action_user_day = df_clean.groupby(['userId','year','month','day'])\
    .agg(F.count('page').alias('action_per_day')).toPandas()

In [None]:
avg_action_day = pd.merge(df_action_user, df_action_user_day,on='userId')

In [None]:
avg_action_day['avg_action_per_day'] = avg_action_day['action_per_day']/avg_action_day['action_per_user']

In [None]:
avg_action_day.head()

In [None]:
# set segmentation 

where_churn_paid = (avg_action_day['churn_paid']!=0)
where_churn_service = (avg_action_day['churn_service']!=0)
where_no_churn = (~where_churn_paid)&(~where_churn_service)

action_day_no_churn = avg_action_day[(avg_action_day['churn_service']==0)&(avg_action_day['churn_paid']==0)]
action_day_churn_paid = avg_action_day[(avg_action_day['churn_paid']!=0)&(avg_action_day['churn_service']==0)]
action_day_churn_service = avg_action_day[avg_action_day['churn_service']!=0]


In [None]:
# data quality count check 
assert avg_action_day.shape[0] == (action_day_no_churn.shape[0]+action_day_churn_paid.shape[0]+action_day_churn_service.shape[0])

In [None]:
# box plot unique 
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=action_day_churn_service['avg_action_per_day'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=action_day_no_churn['avg_action_per_day'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=action_day_churn_paid['avg_action_per_day'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=action_day_no_churn[action_day_no_churn['level']=='paid']['avg_action_per_day'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=700, width=700, )
fig.update_xaxes(
    title_text = 'avg count of action per day : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

##### avg_action per week, weekend

In [None]:
df_action_user

In [None]:
is_weekday = udf(lambda x:1 if x in [1,2,3,4,5] else 0, IntegerType())

In [None]:
df_week_action = df_clean.withColumn('is_weekday',is_weekday('dayofweek'))

In [None]:
df_week_action_weekday = df_week_action.filter(F.col('is_weekday')==1)\
    .groupby(['userId','level'])\
    .agg(F.count('page').alias('count_action_weekday'))
df_week_action_weekend = df_week_action.filter(F.col('is_weekday')!=1)\
    .groupby(['userId','level'])\
    .agg(F.count('page').alias('count_action_weekend'))

In [None]:
count_week_action=df_week_action.groupby(['userId','level','churn_service','churn_paid','is_weekday'])\
    .agg(F.count('page').alias('count_action_week'))

In [None]:
# set segmentation 
filter_weekday = F.col('is_weekday') == 1
filter_weekend = ~(filter_weekday)
# where_weekday = 
count_action_weekday = count_week_action.filter(filter_weekday)
count_action_weekend = count_week_action.filter(filter_weekend)


action_weekday_no_churn = count_action_weekday.filter(filter_no_churn).toPandas()
action_weekday_churn_paid = count_action_weekday.filter(filter_churn_paid).toPandas()
action_weekday_churn_service = count_action_weekday.filter(filter_churn_service).toPandas()

action_weekend_no_churn = count_action_weekend.filter(filter_no_churn).toPandas()
action_weekend_churn_paid = count_action_weekend.filter(filter_churn_paid).toPandas()
action_weekend_churn_service = count_action_weekend.filter(filter_churn_service).toPandas()


In [None]:
trace1 = go.Bar(
    x=['weekday','weekend'],
    y=[action_weekday_churn_service.count_action_week.mean(),
       action_weekend_churn_service.count_action_week.mean()],
    name='churn service user'
)

trace2 = go.Bar(
    x=['weekday','weekend'],
    y=[action_weekday_no_churn.count_action_week.mean(),
       action_weekend_no_churn.count_action_week.mean()],
    name='active user'
)

trace3 = go.Bar(
    x=['weekday','weekend'],
    y=[action_weekday_churn_paid.count_action_week.mean(),
       action_weekend_churn_paid.count_action_week.mean()],
    name='churn paid user'
)
trace4 = go.Bar(
    x=['weekday','weekend'],
    y=[action_weekday_no_churn[action_weekday_no_churn['level']=='paid'].count_action_week.mean(),
      action_weekend_no_churn[action_weekend_no_churn['level']=='paid'].count_action_week.mean()],
    name='active paid user'
)


In [None]:
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,2)
fig.append_trace(trace4,1,2)

fig.layout.update(height=600, width=900, )
# fig.update_xaxes(
#     title_text = 'avg listen time'
# )
fig.update_yaxes(
    title_text = 'avg count of action per weekday of weekend'
)
fig.show()

###### 주말 vs 평일 비교 결과 
- 전체적으로 평일에 이용을 많이 한다 
- 서비스 ,유료 멤버쉽 이탈 유저들이 평균적으로 평일이나 주말이나 action의 횟수가 적다

In [None]:
# # set segmentation 
# filter_weekday = F.col('is_weekday') == 1
# filter_weekend = ~(filter_weekday)


action_week_no_churn = count_week_action.filter(filter_no_churn).toPandas()
action_week_churn_paid = count_week_action.filter(filter_churn_paid).toPandas()
action_week_churn_service = count_week_action.filter(filter_churn_service).toPandas()

In [None]:
# box plot unique 
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=action_week_churn_service['count_action_week'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=action_week_no_churn['count_action_week'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=action_week_churn_paid['count_action_week'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=action_week_no_churn[action_week_no_churn['level']=='paid']['count_action_week'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=700, width=700, )
fig.update_xaxes(
    title_text = 'avg count of action on weekday : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

##### impact of the time between registration, upgrade and downgrade events
- 회원 가입 시간부터 최근 활동 시간까지의 기간 분석

In [None]:
# convert ts on registration 

regi_ts = (F.col('registration')/1000).cast('timestamp')

df_regi = df_clean.withColumn('regi_date',F.date_format(regi_ts,format='yyyy-MM-dd'))\
    .withColumn('regi_date',F.to_date(F.col('regi_date'),'yyyy-MM-dd'))\
        .withColumn('regi_year',F.year(F.col('date')))\
        .withColumn('regi_month',F.month(F.col('date')))\
        .withColumn('regi_day',F.dayofmonth(F.col('date')))\
        .withColumn('regi_hour', F.hour(regi_ts))\
        .withColumn('regi_dayofweek',F.dayofweek(F.col('date')))\
        .withColumn('regi_weekofyear',F.weekofyear(F.col('date')))

In [None]:
# add last_interaction , days_from_reg colu,n
df_from_reg = df_regi.filter(F.col('row_number')==1)\
    .groupby(['userId','level','churn_service','churn_paid','regi_date'])\
    .agg(F.max('date').alias('last_interaction'))\
    .withColumn('day_from_reg',F.datediff('last_interaction','regi_date'))

In [None]:
df_from_reg.orderBy('userId').show()

In [None]:
# box plot 

from_reg_no_churn = df_from_reg.filter(filter_no_churn).toPandas()
from_reg_churn_paid = df_from_reg.filter(filter_churn_paid).toPandas()
from_reg_churn_service = df_from_reg.filter(filter_churn_service).toPandas()

In [None]:
# avg 
print("Avg of day_from_reg: Active vs churned from service\n",\
     from_reg_no_churn['day_from_reg'].mean(), from_reg_churn_service['day_from_reg'].mean())
print("Avg of day_from_reg: Active paid vs churned from paid\n",\
     from_reg_no_churn[from_reg_no_churn['level']=='paid']['day_from_reg'].mean(), from_reg_churn_paid['day_from_reg'].mean())


In [None]:
# box plot unique 
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=from_reg_churn_service['day_from_reg'].unique(),
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=from_reg_no_churn['day_from_reg'].unique(),
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=from_reg_churn_paid['day_from_reg'].unique(),
    name='churn paid ser'
)
trace4 = go.Box(
    x=from_reg_churn_paid[from_reg_churn_paid['level']=='paid']['day_from_reg'].unique(),
    name='active paid user'
)

In [None]:
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=['Service cancellation analysis','Service downgrade analysis']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,2,1)
fig.append_trace(trace4,2,1)

fig.layout.update(height=700, width=700, )
fig.update_xaxes(
    title_text = 'distribution of days from registration : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

day_from_reg 분석 결과 
- 서비스 이탈 유저는 가입한 시간으로부터 최근 활동시간까지의 기간이 짧다
- 유료 멤버십 이탈 유저와 활성화된 유료 유저와의 큰 차이는 없다

##### upgrade and downgrade
- from upgrade ~ downgrade_date
- from reg ~ to upgrade

#### from_reg ~ to upgrade
- 회원등록부터 유료결제 까지의 걸리는 평균 시간을 분석해본다
- 유료결제에서 유료 해지까지의 걸리는 평균 시간을 분석해본다


In [None]:
df_clean

In [None]:
### upgrade_df 

df_upgrade = df_regi.select(['userId','level','page','upgraded','phase_upgrade','churn_paid','churn_service','ts','date','regi_date'])\
    .filter(F.col('page')=='Submit Upgrade')\
    .filter(F.col('level')=='free').dropDuplicates()\
    .withColumn('reg_to_upgrade',F.datediff('date','regi_date'))\
    .withColumn('upgrade_date',F.col('date'))\
    .orderBy('userId',F.desc('phase_upgrade'),F.asc('date'))
    

In [None]:
windowval = Window.partitionBy('userId').orderBy(F.asc('ts'))
upgrade_user = df_upgrade.withColumn('row_number',F.row_number().over(windowval))

In [None]:
# row number == 1
upgrade_user = upgrade_user.filter(F.col('row_number')==1)

In [None]:
# upgrade segmentation 
upgrade_no_churn = upgrade_user.filter(filter_no_churn).toPandas()
upgrade_churn_paid = upgrade_user.filter(filter_churn_paid).toPandas()
upgrade_churn_service = upgrade_user.filter(filter_churn_service).toPandas()

In [None]:
# box plot unique 
# 1.churn_service vs active
# churn service
trace1 = go.Box(
    x=upgrade_churn_service['reg_to_upgrade'],
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=upgrade_no_churn['reg_to_upgrade'],
    name='active user'
)

# 2. churn paid: churn_paid vs active paid
# churn_service_free_user
trace3 = go.Box(
    x=upgrade_churn_paid['reg_to_upgrade'],
    name='churn paid ser'
)


In [None]:
fig = make_subplots(
    rows=1, cols=1,
    subplot_titles=['Distribution upgrade time from registration']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
fig.append_trace(trace3,1,1)
# fig.append_trace(trace4,2,1)

fig.layout.update(height=700, width=700, )
fig.update_xaxes(
    title_text = 'distribution of days to upgrade from registration : Active vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

from_registration_to_upgrade 결과:
- 서비스에서 이탈한 유저는 유료결제 멤버쉽에도 빠르게 반응 하는 경향이 있다 

In [None]:
df_upgrade.toPandas()

In [None]:
# create upgrade_downgrade dataframe
up_down_cols = ['userId','level','page','phase_upgrade','phase_downgrade','churn_paid','churn_service','date','ts']

df_up_down=df_regi.select(up_down_cols)\
    .filter((F.col('page')=='Submit Upgrade')|(F.col('page')=='Submit Downgrade'))\
    

In [None]:
# create next_date , next_action, up_to_downgrade column
windowval = Window.partitionBy('userId').orderBy(F.desc('ts'))

df_up_down = df_up_down.withColumn('next_date',lag('date',1).over(windowval))\
    .withColumn('next_action',lag('page',1).over(windowval))\
    .withColumn('up_to_downgrade',F.datediff('next_date','date'))

In [None]:
# customer segmentation 

# 1. churn_Service !=0
up_down_churn_paid = df_up_down.filter(filter_churn_paid).toPandas()
up_down_churn_service = df_up_down.filter(filter_churn_service).toPandas()

In [None]:
# box plot 
# box plot unique 

trace1 = go.Box(
    x=up_down_churn_service['up_to_downgrade'],
    name='churned service user'
)
# active user
trace2 = go.Box(
    x=up_down_churn_paid['up_to_downgrade'],
    name='churn paid user'
)



In [None]:
fig = make_subplots(
    rows=1, cols=1,
    subplot_titles=['Distribution downgrade period from upgrade']
)

fig.append_trace(trace1,1,1)
fig.append_trace(trace2,1,1)
# fig.append_trace(trace3,1,1)
# fig.append_trace(trace4,2,1)

fig.layout.update(height=700, width=700, )
fig.update_xaxes(
    title_text = 'distribution of days to downgrade from upgrade : churn_paid vs churned user'
)
# fig.update_yaxes(
#     title_text = ''
# )
fig.show()

from_upgrade to downgrade 결과
- 서비스에서 이탈하고, 유료멤버쉽에서 이탈한 유저가 짧은시간동안 빠르게 이용하고, 빠르게 서비스에서 이탈하는 
경향을 확인할 수 있다

In [None]:
def get_week_from_df(df, week_day_start, week_day_last):
    """
        Function to extract the rows corresponding to a given week
    """
    

    # filter by week day_start, week_day_last 
    filter_week = (F.col('date')>=week_day_start) &(F.col('date')<=week_day_last)
    
    # is_weekday or not 
    is_weekday = udf(lambda x:1 if x in ['1','2','3','4','5'] else 0, IntegerType())
    is_ads = udf(lambda x:1 if x == "Roll Advert" else 0, IntegerType())
    
    # apply filter_week 
    df_week = df.filter(filter_week).orderBy('ts',ascending=False)\
        .withColumn('is_weekday',is_weekday('dayofweek'))\
        .withColumn('is_ads',is_ads('page'))
    return df_week

##### week summary Class

    1. 주간별 노래, 아티스트 , 로긴, 광고 카운트 
    2. 주간별, 평일 카운트+ 주말 카운트
    3. 주간별 평균 
        - song count 
        - artist distinct count 
        - login count 
        - ads count 
        - song repeat
    4. 주간별 delta_login time 
    5. 주간별 listen_time session
    6. 주간별 num_actions_per_session
    

In [None]:
class WeekSummary:
    """
    
    """
    
    def __init__(self, df_week):
        # get df from given weeek
        self.df_week = df_week
        # get unique userId ,level
        self.users = self.get_users()
        # get week summary 
        self.week_summary = self.get_week_summary()
    
    def get_users(self):
        """
            Extract the list of unique combination of userId, level
        """
        service_users = self.df_week.select(['userId','level']).distinct()
        return service_users
    
    #last_interaction
    def get_last_interaction(self):
        """
            get last interaction date from user
        """
        df_last_time = self.df_week.groupby(['userId','level'])\
            .agg(F.max('ts').alias('last_interaction'))
        
        return df_last_time
    
    
    def compute_count_song(self):
        """
            extract the count of songs per user during week or weekend 
            1. total count of songs 
            2. total song per weekdays 
            3. total song per weekend
        """
        song_week = self.df_week.groupby(['userId','level'])\
            .agg(count('song').alias('count_song'))
        
        song_weekday = self.df_week.filter(F.col('is_weekday')==1)\
            .groupby(['userId','level'])\
            .agg(countDistinct('song').alias('count_song_weekday'))
    
        song_weekend = self.df_week.filter(F.col('is_weekday')!=1)\
            .groupby(['userId','level'])\
            .agg(countDistinct('song').alias('count_song_weekend'))
        
        return song_week, song_weekday, song_weekend
    
    def compute_count_artist(self):
        """
            extract the count of distinct artist listend to per user
        """
        artist_week = self.df_week.groupby(['userId','level'])\
            .agg(countDistinct('artist').alias('count_distinct_artist'))
        
        return artist_week
    
    def compute_count_login(self):
        """
            Extract the number of session per user
        """
        login_week = self.df_week.groupby(['userId','level'])\
            .agg(countDistinct('sessionId').alias('count_login'))
        
        
        return login_week
    
    def compute_count_ads(self):
        """
            Extract the number of ads listened to per user
        """
        ads_week = self.df_week.groupby(['userId','level'])\
            .agg(F.sum('is_ads').alias('count_ads'))
        
        return ads_week
    
    #get_last_interaction
    #compute_counts
    def compute_count_merge(self):
        """
            add column per count
        """
        df_count = self.users
        
        # add songs on weekdays and weekend 
        song_week, song_weekday, song_weekend = self.compute_count_song()
        
        df_count = df_count.join(song_week, on=['userId','level'], how='full')\
            .join(song_weekday, on=['userId','level'], how='full')\
            .join(song_weekend, on=['userId','level'], how='full')
        
        # add distinct artists 
        artist_week = self.compute_count_artist()
        df_count = df_count.join(artist_week, on=['userId','level'], how='full')
        
        # add login (unique sessionId?)
        login_week = self.compute_count_login()
        df_count = df_count.join(login_week, on=['userId','level'], how='full')
        
        # add ads
        ads_week = self.compute_count_ads()
        df_count = df_count.join(ads_week, on=['userId','level'], how='full')
        
        # repeat
        df_count = df_count.withColumn('count_repeat',F.col('count_song')-(F.col('count_song_weekday')+F.col('count_song_weekend')))
        
        return df_count
        
    def compute_delta_login(self):
        """
            Calculate avg delta time between two login in the given week
        """
        
        # set inwdow 
        window_login = Window.partitionBy(['userId']).orderBy(F.desc('ts'))
        
        # add new column next date 
        df_delta = self.df_week.withColumn('next_date',lag('date',1).over(window_login))
        
        # calculate delta betwwen two login
        df_delta = df_delta.withColumn('delta_time',F.datediff(F.col('next_date'),F.col('date')))
        
        # compute the avg delta_time between login for each user 

        delta_cols = ['userId','level','churn_service','churn_paid','delta_time']

        df_delta_login = df_delta.select(delta_cols)\
            .filter(df_delta_login['delta_time'] != 0)
        distinct_session = df_churn.groupby('userId').agg(F.countDistinct('sessionId').alias('user_num_session'))

        # join: df_delta_login + distinct_session
        df_delta_login = df_delta_login.join(distinct_session, on=['userId'], how='inner')

        # calculate avg delta time , total_avg_delta
        df_delta_login = df_delta_login.withColumn('avg_delta',F.col('delta_time')/F.col('user_num_session'))\
            .withColumn('total_avg_delta',F.sum('avg_delta').over(Window.partitionBy('userId')))
        
        # group by , relevant data 
        df_delta_login = df_delta_login.groupby(['userId','level'])\
            .agg(F.max('total_avg_delta').alias('time_inter_login'))
        
        
        return df_delta_login
    
    def compute_listen_session(self):
        """
            Calculate the average listening per session per user 
        """
        # window per user by desc timestmp
        window_user = Window.partitionBy("userId").orderBy(F.desc('ts'))
        #window per user, session
        window_session = Window.partitionBy(["userId","sessionId"]).orderBy("ts").rangeBetween(Window.unboundedPreceding,0)
        
        # add two new columns: next_ts, next_action
        df_listen_session = self.df_week.withColumn('next_ts',lag('ts',1).over(window_user))\
            .withColumn('next_action',lag('page',1).over(window_user))
        
        # calculate the diff between two timestamp
        df_listen_session = df_listen_session.withColumn("diff_ts",(F.col('next_ts').cast('integer')- F.col('ts').cast('integer'))/1000)
        
        # keep only the Nextsong action , filter 
        df_listen_session = df_listen_session.filter(F.col['page']=='NextSong')
        # add a column total listening 
        df_listen_session = df_listen_session.withColumn("listen_session",F.sum("diff_ts").over(window_session))
        
        # extract max value only for each session per user
        df_listen_session = df_listen_session.groupby(['userId','sessionId','level'])\
            .agg(F.max('listen_session').alias('total_listen_session'),\
                F.max('itemInSession').alias('item_session'))
        
        df_listen_session = df_listen_session.withColumn('avg_listen_session',
            F.round((F.col('total_list_session')/F.col('item_session'))/60,2))                                            
        
        # add a column with total number of session , avg_listen_time per session
        num_session = self.df_week.groupby(['userId','level'])\
            .agg(countDistinct('sessionId').alias('num_session'))
        
        df_listen_session = df_listen_session.join(num_session, on=['userId','level'], how='full')
        
        df_listen_session = df_listen_session.withColumn("week_total_listen",
                            F.sum('avg_listen_session').over(Window.partitionBy('userId')))\
            .withColumn('avg_listen_time_session',F.round((F.col('week_total_listen')/F.col('num_session')),2))
        
        # keep relevant columns and distinct 
        df_listen_session = df_listen_session.select(['userId','level','avg_listen_time_session']).distinct()
    
        return df_listen_session
        
        
    def compute_action_session(self):
        """
            calculate the avg number of action per session per user for the given week
        """
        
        window_user = Window.partitionBy("userId").orderBy('ts',ascending=False)
        window_session = Window.partitionBy(['userId','sessionId']).orderBy('ts').rangeBetween(Window.unboundedPreceding,0)
        
        #
        df_action_session = df_week.groupby(['userId','level','sessionId'])\
            .agg(F.count('page').alias('action_per_session'))
        df_action_session = df_action_session.withColumn('total_action_week',\
                                        F.sum('action_per_session').over(Window.partitionBy('userId')))
        
        # add column with total number of session in the week 
        
        num_session = df_week.groupby(['userId','level'])\
            .agg(F.countDistinct('sessionId').alias('num_session'))
        
        df_action_session = df_action_session.join(num_session, on=['userId','level'], how='full')
        
        df_action_session = df_action_session.withColumn('avg_num_action_session',\
                                        F.round(F.col('total_action_week')/F.col('num_session')),2)
        
        # keep only the relevant columns 
        df_action_session = df_action_session.select(['userId','level','avg_num_action_session']).distinct()
        
        return df_action_session
    
    
    def compute_avg(self):
        """
            Add a columns per avg to avg_df one row per user for the given week
        """
        
        # only specific user dataframe
        df_user_avg = self.users
        
        # delta time between login
        df_delta_login = self.compute_delta_login()
        df_user_avg = df_user_avg.join(df_delta_login, on=['userId','level'], how='full')
        
        # time per session
        df_listen_session = self.compute_listen_session()
        df_user_avg = df_user_avg.join(df_listen_session, on=['userId','level'], how='full')
        
        # action per session
        df_action_session = self.compute_action_session()
        df_user_avg = df_user_avg.join(df_action_session, on=['userId','level'], how='full')
        
        return df_user_avg
    
    #replace_na
    def replace_na(self, df):
        """
            Sets default values for null values when calculating count and average
        """
        # replace null value of time_inter_login to 7 (number of days 7)
        df_filled = df.na.fill({'time_inter_login':'7'})
        df_filled = df_filled.na.fill(0)
        
        return df_filled
    
    
    
    def get_week_summary(self):
        """
            여기서 최종적으로 week_summary준다 
            1. last_interaction time
            2. count(song, ads,login, artist distcint, song repeat)
            3. avg()
            4. join(df_avg,on=['userId','level'])
            5. after join, replace nan value
        """
        # 최근 활동시간 
        week_last_interaction = self.get_last_interaction()
        # 여러가지 count 
        week_summary_count = self.compute_count_merge()
        # join
        week_summary = week_last_interaction.join(week_summary_count, on=['userId','level'], how='full')
        # avg 
        week_summary_avg = self.compute_avg()
        # summary.join(avg)
        week_summary = week_summary.join(week_summary_avg, on=['userId','level'], how='full')
        # replace nan value 
        week_summary = self.replace_na(week_summary)
        
        return week_summary

#### UserSummary:
    - 매주 새롭게 이벤트 데이터가 들어오므로 
        => 기존의 summary에서 update summary로 업데이트 하는 
    - 
    - 
    - 
    

In [None]:
class UserSummary:
    """
    
    """
    
    def __init__(self, df, table_name):
        self.df = df 
        self.table_name = table_name
        self.user_summary = None
    
    def create_column_with_type(self,column_list, target_type):
        """
            Convert Type on column_list to target_type
        """
        for column in column_list:
            df = self.df.withColumn(column,lit(None))\
                .withColumn(column, F.col(column).cast(target_type))

        return df
    
    def load_summary(self):
        """
            Load existing summary. if there is no existing table, create empty dataframe with specified column type.
        """
         try:
            loaded_summary = spark.read.parquet('./spark-warehouse/{}'.format(self.table_name))
            return loaded_summary
        
        except:
            # create an empty dataframe 
            # 1. using StructField 
            user_summary = spark.createDataFrame([],StructType([]))
            
            str_cols = ['userId','level','gender','state','last_interaction']
            int_cols = ['churn_service','churn_paid','count_song','count_song_weekday','count_song_weekend','count_distinct_artist','count_login','count_ads','count_repeat','day_from_reg']
            float_cols = ['time_inter_login','avg_listen_time_session','avg_num_action_session']
            
            # apply create_column_type
            user_summary = create_column_with_type(user_summary,str_cols,StringType())
            user_summary = create_column_with_type(user_summary,int_cols,IntegerType())
            user_summary = create_column_with_type(user_summary,float_cols,FloatType())
            # 2. columns = []
            # schema= StructType(columns)
            #. user_summary = spark.createDataFrame(data=[],schema=schema)
            # convert_type, integeer_cols, string_cols, float_cols
            
            
            return user_summary
    
    def save_summary(self,mode=""):
        """
            Save summary as data warehouse
        """
        self.user_summary.write\
            .mode(mode)\
            .parquet("./spark-warehouse/{}".format(self.table_name))
        
        print("Saved summary!")
        
    
    # update 1.day_from_reg
    def compute_day_from_reg(self, updated_summary):
        """
            Computes day from reg using last_interaction
        """
        regi_summary = self.df.select(['userId','level','regi_date']).distinct()
        
        #join input_df + reg_summary
        updated_summary = updated_summary.join(regi_summary,on=['userId','level'])
        
        # calculate day_from_reg, with F.datediff().
        updated_summary = updated_summary.withColumn('day_from_reg',\
                                                    F.datediff('last_interaction','regi_date'))
        
        # drop regi_date
        updated_summary = updated_summary.drop('regi_date')
        
        return updated_summary 
    
    
    # init_user_summary 
    def init_user_summary(self, updated_summary):
        """
            create 'day_from_registration' column using compute_day_from_reg function
        """
        updated_summary = self.compute_day_from_reg(updated_summary)
        
        return updated_summary
    
    def get_last_summary(self):
        """
            return last version of the summary(version saved) and rename the columns
        """
        # rename으로 구분
        
        user_summary = self.user_summary.createOrReplaceTempView("user_summary")
        user_summary = spark.sql("SELECT * FROM user_summary")
        
        last_user_summary = spark.sql("""
            SELECT * FROM (
                SELECT *, MAX(last_interaction) OVER (PARTITION BY userId,level) AS max_last FROM user_summary
            ) table1 \
            WHERE last_interaction = table1.max_last
        """)
        
        # rename count_cols => last_count_cols 
        count_cols =['count_song','count_song_weekday','count_song_weekend',\
                    'count_distinct_artist','count_login','count_ads','count_repeat',\
                    'time_inter_login','avg_listen_time_session','avg_num_action_session']
        # for loop
        # apply rename column
        for col in count_cols:
            last_user_summary = last_user_summary.withColumnRenamed(col, 'last_{}'.format(col))
            
        return last_user_summary
    
    # update old summary 
    def update_old_summary(self, updated_summary):
        """
            Appends new rows for each user for the next week, to prev loaded summary.
            
        """
        last_user_summary = self.get_last_summary()
        
        # drop last cols
        drop_old_cols = ['last_interaction','gender','state','churn_service','churn_paid','max_last']
        last_user_summary = last_user_summary.drop(drop_old_cols)
        
        # split the user_summary into 2df: one with knownuser, one with new user 
        # for new user, apply instantiation of the summary 
        new_user_summary = updated_summary.join(last_user_summary, on=['userId','level'],\
                                               how='left_anti')
        
        new_user_summary = new_user_summary.select(['userId','level','last_interaction',\
                                                   'count_song','count_song_weekday','count_song_weekend',\
                                                   'count_distinct_artist','count_login','count_ads','count_repeat',\
                                                   'time_inter_login','avg_listen_time_session','avg_num_action_session',\
                                                   'gender','state','churn_service','churn_paid'])
        
        # create new user summary  with calculatting day_from_reg
        new_user_summary = self.init_user_summary(user_summary_new)
        
        # for old user too, day_from_reg
        old_user_summary = updated_summary.join(last_user_summary, on=['userId','level'],how='inner')
        
        # drop column last_{} count column 
        count_cols = ['count_song', 'count_song_weekday', 'count_song_weekend',\
                     'count_distinct_artist', 'count_login', 'count_ads', 'count_repeat',\
                     'time_inter_login', 'avg_listen_time_session', 'avg_num_action_session']
        
        for column in count_cols:
            last_count_col='last_{}'.format(column)
            old_user_summary = old_user_summary.drop(last_count_col)
        
        # add day_from_reg column 
        old_user_summary = self.compute_time_from_reg(old_user_summary)
        
        # union new + old
        updated_summary = old_user_summary.union(new_user_summary)
        
        return udpated_summary
      
    # final update
    def update_user_summary(self, week_sum):
        """
            update user summary, compute again day from registration and last_interaction
        """
        week_sum = week_sum.createOrReplaceTempView("week_sum")
        week_sum = spark.sql("SELECT * FROM week_sum")
        
        # instantiate the update
        user_info = self.df.select(['userId','level','gender','state','churn_service','churn_paid']).distinct()
        updated_summary = week_sum.join(user_info, on=['userId','level'], how='inner')
        
        # load the existing summary 
        user_summary = self.load_summary()
        self.user_summary = user_summary
        
        if self.user_summary.count() >0:
            # compute updated counts 
            new_user_summary = self.updated_old_summary(updated_summary)
            self.user_summary = new_user_summary 
            
            # append save 
            self.save_summary('append')
        else:
            # no count, first week?
            
            updated_summary = self.init_user_summary(updated_summary)
            self.user_summary = updated_summary 
            
            self.save_summary('overwrite')

##### MonthSummary; 
    -
    - 
    - 
    -
    -
    -
    