## Imports

In [0]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import DataFrame,SparkSession

## Spine Inicial

In [0]:
base = (spark.table('sand_riscos_pm_pf.T789778_spine_target_dm_v3')
        #.filter(~F.col('plan_list_price').isin(0))
        .filter(~F.col('actual_amount_paid').isin(0))
        .filter(~F.col('payment_plan_days').isin(0))
        )
base

## Construção de Variáveis de Janelas Temporais

In [0]:
base = (base
       .withColumn('interaction', F.col("num_25") + F.col("num_50") + F.col("num_75") + F.col("num_985") + F.col("num_100") + F.col("num_unq"))
       .withColumn('avg_music_time', F.round(F.col('total_secs')/F.col('interaction'),2))
       .withColumn('completion_rate_25', F.round((F.col('num_25')*(.25)/ F.col('avg_music_time'))*100,2))
       .withColumn('completion_rate_50', F.round((F.col('num_50')*(.50)/ F.col('avg_music_time'))*100,2))
       .withColumn('completion_rate_75', F.round((F.col('num_75')*(.75)/ F.col('avg_music_time'))*100,2))
       .withColumn('completion_rate_985', F.round((F.col('num_985')*(.985)/ F.col('avg_music_time'))*100,2))
       .withColumn('completion_rate_100', F.round((F.col('num_100')/ F.col('avg_music_time'))*100,2))
       .withColumn("repeat_rate",  F.round((F.col("interaction") - F.col("num_unq")) / F.col('interaction')*100,2))
       .withColumn("diversity_score", F.round(F.col('num_unq')/F.col('interaction')*100,2))
       .withColumn("low_intensity_score", F.round(F.col("num_25") / F.col('interaction')*100,2))
       .withColumn("medium_intensity_score", F.round((F.col("num_50")+F.col("num_75")) / F.col('interaction')*100,2))
       .withColumn("hight_intensity_score", F.round((F.col("num_100")+F.col("num_985")) / F.col('interaction')*100,2))
       )

In [0]:
#Janelas Temporais
window_2_months = Window.partitionBy("msno").orderBy("safra_date").rowsBetween(-2, -1)
window_3_months = Window.partitionBy("msno").orderBy("safra_date").rowsBetween(-3, -1)
window_4_months = Window.partitionBy("msno").orderBy("safra_date").rowsBetween(-4, -1)

In [0]:
profile_members = ["num_25", "num_50", "num_75", "num_985", "num_100", "num_unq", "total_secs",
'interaction',
'avg_music_time',
'completion_rate_25',
'completion_rate_50',
'completion_rate_75',
'completion_rate_985',
'completion_rate_100',
'diversity_score',
'repeat_rate',
'low_intensity_score',
'medium_intensity_score',
'hight_intensity_score',
'plan_list_price', 'actual_amount_paid']

In [0]:
for k in profile_members:
    base.select(k).describe().display()

In [0]:
base = base.withColumn("safra_date", F.to_date(F.concat(F.col("safra").cast("string").substr(1, 4), F.lit("-"), F.col("safra").cast("string").substr(5, 2), F.lit("-01")), "yyyy-MM-dd"))

#Aplicar Janelas Temporais
for col in profile_members:
    base = base \
        .withColumn(f"{col}_min_2m", F.min(col).over(window_2_months)) \
        .withColumn(f"{col}_max_2m", F.max(col).over(window_2_months)) \
        .withColumn(f"{col}_avg_2m", F.avg(col).over(window_2_months)) \
        .withColumn(f"{col}_median_2m", F.expr(f"percentile_approx({col}, 0.5)").over(window_2_months)) \
        .withColumn(f"{col}_min_3m", F.min(col).over(window_3_months)) \
        .withColumn(f"{col}_max_3m", F.max(col).over(window_3_months)) \
        .withColumn(f"{col}_avg_3m", F.avg(col).over(window_3_months)) \
        .withColumn(f"{col}_median_3m", F.expr(f"percentile_approx({col}, 0.5)").over(window_3_months)) \
        .withColumn(f"{col}_min_4m", F.min(col).over(window_4_months)) \
        .withColumn(f"{col}_max_4m", F.max(col).over(window_4_months)) \
        .withColumn(f"{col}_avg_4m", F.avg(col).over(window_4_months)) \
        .withColumn(f"{col}_median_4m", F.expr(f"percentile_approx({col}, 0.5)").over(window_4_months))

In [0]:
base = (base
        .withColumn("safra_init_date", F.to_date(F.concat(F.col("safra_init").substr(1, 4), F.lit("-"), F.col("safra_init").substr(5, 2), F.lit("-01")), "yyyy-MM-dd"))
        .withColumn("account_time", F.floor(F.months_between("safra_date", "safra_init_date") / 12))
        .withColumn("current_age", F.col("bd") + F.col("account_time"))
        .withColumn("age_group",
                   F.when((F.col("current_age") >= 13) & (F.col("current_age") <= 18), "13-18")
                    .when((F.col("current_age") >= 19) & (F.col("current_age") <= 29), "19-29")
                    .when((F.col("current_age") >= 30) & (F.col("current_age") <= 45), "30-45")
                    .when((F.col("current_age") >= 46) & (F.col("current_age") <= 65), "46-65")
                    .when((F.col("current_age") >= 66) & (F.col("current_age") <= 76), "66-76")
                    .when((F.col("current_age") >= 77) & (F.col("current_age") <= 88), "77-88")
                    .otherwise("Desconhecido")
                )
           )

base = (base.withColumn("gender", F.when(F.col("gender").isin("male"), F.lit("M"))
                    .when(F.col("gender").isin("female"), F.lit("F"))
                    .otherwise(F.lit("Desconhecido"))
        ))

profile_change = ["payment_method_id", 'payment_plan_days', 'plan_list_price', 'is_auto_renew', 'city', 'actual_amount_paid', 'age_group', 'current_age', 'gender', 'account_time']

for col in profile_change:
    window_spec = Window.partitionBy("msno").orderBy("safra_date")

    base = (base
        .withColumn(f"{col}_1m", F.lag(col, 1).over(window_spec))
        )

In [0]:
len(base.columns)

## Excluir Variáveis - Data Leakage

Como temos fechamentos mensais, o que queremos neste processo de modelagem é prever o chrun no mês atual usando variáveis de meses anteriores, pois não temos o fechamento do mês atual. Ou seja, para prever o churn de um usuário no mês 2021601, eu não terei as variáveis fechadas referentes a 201601, somente as variáveis anteriores a 201512 (<=201512).

Basicamente, excluir variáveis do M0

In [0]:
drop = ['payment_method_id',
 'payment_plan_days',
 'plan_list_price',
 'actual_amount_paid',
 'is_auto_renew',
 'transaction_date',
 'membership_expire_date',
 'is_cancel',
 'num_25',
 'num_50',
 'num_75',
 'num_985',
 'num_100',
 'num_unq',
 'total_secs',
 'registration_init_time',
 'city',
 'bd',
 'gender',
 'registered_via',
#  'is_ativo',
 'safra_init',
  'safra_date']+profile_members+profile_change

In [0]:
base_spine = base.drop(*drop)
base_spine

In [0]:
len(base_spine.columns)

## Variáveis Categóricas

In [0]:
init_vars = base_spine.columns
init_vars.remove('msno')
init_vars.remove('safra')
init_vars.remove('safra_init_date')
init_vars.remove('target')

ident_vars = ['msno', 'safra']

In [0]:
base_spine = base_spine.withColumn('target', F.col('target').cast('int')).select(ident_vars+init_vars+['target'])

In [0]:
num_vars = [k for k in base_spine.columns if k.startswith('num_')]+[k for k in base_spine.columns if k.startswith('total')]+\
 [k for k in base_spine.columns if k.startswith('completion')]+[k for k in base_spine.columns if k.startswith('repeat_')]+ \
 [k for k in base_spine.columns if k.startswith('diversity')]+[k for k in base_spine.columns if k.startswith('intensity')]+ \
 [k for k in base_spine.columns if k.startswith('plan_list')]+[k for k in base_spine.columns if k.startswith('actual_amount')]+ \
 [k for k in base_spine.columns if k.startswith('payment_plan_days')]+ [k for k in base_spine.columns if k.startswith('account')]+ \
 [k for k in base_spine.columns if k.startswith('plan_list_price')]


categorical_columns = [k for k in base_spine.columns if k.startswith('payment_method')]+[k for k in base_spine.columns if k.startswith('city')]+\
[k for k in base_spine.columns if k.startswith('is_auto_renew')]+[k for k in base_spine.columns if k.startswith('age_group')]+[k for k in base_spine.columns if k.startswith('gender')]

In [0]:
for var in categorical_columns:
    base_spine.groupby(var).count().display()

In [0]:
for col in num_vars:
  base_spine = base_spine.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col)))
  base_spine = base_spine.withColumn(col, F.col(col).cast("double"))

In [0]:
for col in categorical_columns:
    base_spine = base_spine.withColumn(col, F.when(F.col(col).isNull(), 'Desconhecido').otherwise(F.col(col)))

## Salvar Base

In [0]:
# #VERSÃO OFICIAL
# spark.sql('drop table if exists sand_riscos_pm_pf.T789778_spine_final_variaveis_dm')
# base_spine.write.mode("overwrite").saveAsTable('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm')
# print('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm')

In [0]:
#VERSÃO V2
# spark.sql('drop table if exists sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v2')
# base_spine.write.mode("overwrite").saveAsTable('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v2')
print('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v2')

In [0]:
#VERSÃO V3
spark.sql('drop table if exists sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v3')
base_spine.write.mode("overwrite").saveAsTable('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v3')
print('sand_riscos_pm_pf.T789778_spine_final_variaveis_dm_v3')