# Entrega 3

## 1. Objetivo
+ Este notebook tem como objetivo responder:
    + `O que fizeram os clientes darem Churn?`
    + `Qual a importância dos eventos (push, compra, acesso, entre outros) ao longo da vida do cliente?`
    + `Dado um comportamento de compra no mês atual, qual será o volume de pedidos no próximo mês?`

+ As bases originais são:
    + `ORDERS - Informações sobre os pedidos realizados.`
    + `MARKETING PUSH FULL - Notificações PUSH ao longo de 6 meses (Junho-Dezembro/2019).`
    + `CUSTOMER SEGMENTATION - Segmentação do cliente.`
    + `ORDERS WITH COST REVENUE - Informações sobre o pedido relacionado, onde verifica-se se o mesmo gerou custo ou receita.`
    + `SESSION VISITS - Comportamento de uso do app.`

    
Criado por Jaime Mishima e Ariel Vicente </br>

**PROBLEMA 1**: Para a pergunta de Churn, optamos por criar um modelo preditivo usando um classificador (ajustando 5 técnicas de Machine Learning - e posteriormente elegendo a(s) melhor(es)). A importância das variáveis no modelo nos ajudaram a responder a pergunta.</br>
**PROBLEMA 2**: Para o problema de **importância dos eventos**, optamos por fazer uma análise descritiva.</br></br>

**PROBLEMA 3**: Prever a valor total gasto com pedidos (order_total) por cliente no mês seguinte.</br>

**Ideia:** Criar um modelo preditivo para, através de regressão, prever o total gasto por um cliente no próximo mês.

**Motivação:** Pelo fato de se ter uma base histórica em mãos, a ideia foi explorar a possibilidade antecipar uma informação valiosa de alguém já conhecido e, assim, ganhar tempo no desenho de estratégias/ações e na tomada de decisão.

**Impacto potencial para o business:**
- Otimizar pushs: Sabendo-se o total gastos para os clientes de um determinado perfil, o Ifood consegue antecipar a quantidade ideal de pushes no mês.
- Otimizar alocação de entregadores: Sabendo-se o total gasto com pedidos e onde esses pedidos serão entregues, o Ifood pode trabalhar na otimização do posicionamento dos entregadores (através de incentivos, por exemplo).
- Otimizar parcerias com restaurantes: Sabendo-se o total gasto com pedidos, onde esses pedidos serão entregues e qual o tipo de comida (que será entregue e a favorita do comprador), o Ifood ganha informações para estimular/alinhar promoções em determinadas regiões e/ou com determinados tipos de restaurante.
- Identificar potenciais Churn/Inativos: Com a previsão do total gasto com pedidos, consequentemente antecipa-se também o ifood_status do cliente.
- Antecipar Marlin tag: Sabendo-se o total gasto com pedidos, o Ifood consegue antecipar a classificação da qualidade do cliente (marlin_tag), podendo criar promoções/ações/pushes específicos para cara um dos tipos de cliente em potencial para fidelizar ainda mais os melhores e estimular (se fizer sentido) a evolução dos demais.

## 2. Imports

In [6]:
from pyspark.sql.functions import udf, count, when, isnull, col, mean, sum, max, avg, min, stddev, count, trim, lower, split, explode
from pyspark.sql.functions import *
from pyspark.sql.functions import collect_list
from pyspark.mllib.stat import Statistics

# tratamento de datas
from pyspark.sql.functions import datediff, to_date, to_timestamp, from_utc_timestamp, round, dayofweek, month

# para o groupby e lag column
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit

# para a remoção de missing:
from functools import reduce

# para ajuste de type de arrays
from pyspark.sql.types import ArrayType, StringType

# para correlacao
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# Modeling
from pyspark.ml.stat import Correlation
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor, GeneralizedLinearRegression, RandomForestRegressionModel, GeneralizedLinearRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler, VectorSlicer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.mllib.util import MLUtils

## 3. Help Functions

In [8]:
# Percentual de Missings
from pyspark.sql.functions import count, when, isnull, col
def contar_missing(df):
  """Realiza a contagem da quantidade de missing que existe dentro de um dataframe.
  Args:
    df - Dataframe Spark
  Returns:
    Data Frame spark com apenas 1 linha com a contagem de missing para cada variável.
  """
  aux = []
  for c in df.columns:
    aux.append(count(when(isnull(c), c)).alias(c))
  return df.select(aux)

def percent_missing(df):
  total_linhas = df.count()
  df = contar_missing(df)
  colunas = df.columns
  total_missing = list(df.first().asDict().values())
  valores = zip(colunas, total_missing)
  
  df_aux = spark.createDataFrame(valores, ['variaveis', 'total_missing'])
  df_aux = df_aux.withColumn('perc_missing', col('total_missing') *100/total_linhas)
  return df_aux

# Returns a groupby count of a dataframe (df) column (col)
def percentByCol(df, group_column):
  """Retorna o groupby de uma coluna `col` de um dataframe `df`
  Args:
    df - Dataframe Spark
    group_column - Nome da coluna do dataframe df
  Returns:
    Data Frame spark com o groupby da coluna `col` e uma coluna com o percentual
  """
  return df.groupby(group_column)\
           .count()\
           .withColumnRenamed('count', 'cnt_per_group')\
           .withColumn('percent', f.col('cnt_per_group')*100/f.sum('cnt_per_group').over(Window.partitionBy()))\
           .orderBy('percent', ascending=False)

# Join two dataframes and remove duplicate columns
# Disclaimer: baseado em https://stackoverflow.com/questions/46944493/removing-duplicate-columns-after-a-df-join-in-spark
def join_removing_repeated(df1, df2, cond, how='left'):
    """Retorna o dataframe resultado do join de `df1` e `df2`
    Args:
      df1 - dataframe 1
      df2 - dataframe 2
      cond - chaves para realizar o join
      how - tipo de join (default left)
    Returns:
      Data Frame resultado do join removendo as colunas repetidas
    """
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df
  
# Clears the received string x from unwanted characters
def limpeza(x):
  """Retorna a string x após eliminação de caracteres indesejados
    Args:
      x - string
    Returns:
      String tratada
    """
  return x.replace('"', '').replace('\\', '').replace('[', '').replace(']', '')

udf_limpeza = udf(limpeza, StringType()) # create an udf based on limpeza function

# Classifica o número de pushes por ranges
from pyspark.sql.functions import udf
push_range = udf(lambda pushes: '1- < 20' if pushes < 20 else 
                                 '2- 20-60' if (pushes >= 20 and pushes < 60) else
                                 '3- 60-100' if (pushes >= 60 and pushes < 100) else
                                 '4- 100-140' if (pushes >= 100 and pushes < 140) else
                                 '5- 140-180' if (pushes >= 140 and pushes < 180) else
                                 '6- 180-220' if (pushes >= 180 and pushes < 220) else
                                 '7- 220-260' if (pushes >= 220 and pushes < 260) else
                                 '8- 260-300' if (pushes >= 260 and pushes < 300) else
                                 '9- 300+'  if (pushes >= 300) else '')

# Buckets por número de pedidos
from pyspark.sql.functions import udf
order_range = udf(lambda orders: '1- < 5' if orders < 5 else 
                                 '2- 5-10' if (orders >= 5 and orders < 10) else
                                 '3- 0-15' if (orders >= 10 and orders < 15) else
                                 '4- 15-20' if (orders >= 15 and orders < 20) else
                                 '5- 20-25' if (orders >= 20 and orders < 25) else
                                 '6- 25-30' if (orders >= 25 and orders < 30) else
                                 '7- 30-35' if (orders >= 30 and orders < 35) else
                                 '8- 35-40' if (orders >= 35 and orders < 40) else
                                 '9- 40+'  if (orders >= 40) else '')

# Heatmap matriz de correlação
import seaborn as sns
def plot_corr_matrix(correlations,attr,fig_no, figsize=[15,10]):
  """Retorna o heatmap de uma lista de listas com as correlacoes de variaveis
  Args:
    correlations - lista de lista com as correlacoes
    attr - lista com os nomes das variaveis
    fig_no - If not provided, a new figure will be created, and the figure number will be incremented (para o plt.figure)
    figsize 0- tamanho do heatmap
  Returns:
    Heatmap com a matriz de correlacao
    """
  fig=plt.figure(fig_no, figsize=figsize)
  ax=fig.add_subplot(111)
  ax.set_title("Correlacao Variaveis")
  ax = sns.heatmap(correlations, cmap="YlGnBu")
  indice = list(range(1, len(attr)+1))
  indice = [str(s) + ' - ' for s in indice]
  res = [i + j for i, j in zip(indice, attr)] 
  #ax.set_xticks(range(len(filter_colunas_order)))
  ax.set_yticklabels(res)
  plt.yticks(rotation=0) 
  plt.show()
  
# Função para computar o MAPE
def compute_mape(df, y_true='label', y_pred='prediction'):
  mape = df.withColumn('abserror', col(y_true) - col(y_pred))
  mape = mape.withColumn('relerror', abs(col('abserror') / col(y_true)))
  mape = mape.select(round(mean(col('relerror')) * 100, 4).alias('mape'))
  
  return mape.collect()[0][0]

# Mode function to get most frequent values of a dataframe
# Reference: https://stackoverflow.com/questions/45880089/how-to-get-most-frequent-values-of-a-dataframe-in-pyspark
@f.udf
def most_common_udf(x):
    from collections import Counter
    return Counter(x).most_common(1)[0][0]

# Returns feature importances of model
# Reference: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
  
# Função auxiliar para cálculo e exibição de métricas de avaliação dos modelos de classificação:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def class_model_evaluator(predict, labelCol="target"):
  
  accuracy = MulticlassClassificationEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='accuracy').evaluate(predict)
  weightedPrecision = MulticlassClassificationEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='weightedPrecision').evaluate(predict)
  weightedRecall = MulticlassClassificationEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='weightedRecall').evaluate(predict)
  f1 = MulticlassClassificationEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='f1').evaluate(predict)
  
  areaUnderROC = BinaryClassificationEvaluator(labelCol=labelCol, metricName='areaUnderROC').evaluate(predict)

  print(f'AUC ROC: {areaUnderROC}')
  print(f'Accuracy: {accuracy}')
  print(f'Precision: {weightedPrecision}')
  print(f'Recall: {weightedRecall}')
  print(f'F1-score: {f1}')
  
  if 1==1:
    pass
  else:
    return None
  
  
# Função auxiliar para cálculo e exibição de métricas de avaliação dos modelos de regressão:
from pyspark.ml.evaluation import RegressionEvaluator
def reg_model_evaluator(predict, labelCol="target"):
  
  mae = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='mae').evaluate(predict)
  rmse = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='rmse').evaluate(predict)
  r2 = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName='r2').evaluate(predict)

  print(f'MAE: {mae}')
  print(f'RMSE: {rmse}')
  print(f'r2 Ajustado: {r2}')
  if 1==1:
    pass
  else:
    return None
  
  
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Reference: https://stackoverflow.com/questions/52847408/pyspark-extract-roc-curve
# Scala version implements .roc() and .pr()
# Python: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/common.html
# Scala: https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def _to_list(self, rdd):
        points = []
        # Note this collect could be inefficient for large datasets 
        # considering there may be one probability per datapoint (at most)
        # The Scala version takes a numBins parameter, 
        # but it doesn't seem possible to pass this from Python to Java
        for row in rdd.collect():
            # Results are returned as type scala.Tuple2, 
            # which doesn't appear to have a py4j mapping
            points += [(float(row._1()), float(row._2()))]
            
        # Separa em FPR e TPR
        fpr = []
        tpr = []
        for f, t in points:
          fpr.append(f)
          tpr.append(t)
        
        return fpr, tpr

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        return self._to_list(rdd)

def timeline_plot_sum(vars):
  
  t = np.arange(6, 13, 1)
  ax = []
  count = 0
  for var1, var2 in vars:
    # Create some mock data
    data_1 = df_desc_filtro.groupby('segmentation_month').agg(sum(var1).alias(var1)).orderBy('segmentation_month')
    data_2 = df_desc_filtro.groupby('segmentation_month').agg(sum(var2).alias(var2)).orderBy('segmentation_month')

    data1 = np.array(data_1.select(var1).collect())
    data2 = np.array(data_2.select(var2).collect())

    fig, ax_aux = plt.subplots()
    ax.append(ax_aux)

    color = 'tab:red'
    ax[count].set_xlabel('timeline')
    ax[count].set_ylabel(var1, color=color)
    ax[count].plot(t, data1, color=color)
    ax[count].tick_params(axis='y', labelcolor=color)

    ax.append(ax[count].twinx())  # instantiate a second axes that shares the same x-axis

    color = 'tab:blue'
    ax[count+1].set_ylabel(var2, color=color)  # we already handled the x-label with ax1
    ax[count+1].plot(t, data2, color=color)
    ax[count+1].tick_params(axis='y', labelcolor=color)
    
    count = count + 2
  
  aux = list(range(1, len(charts_list) * 2, 2))
  ax[1].get_shared_y_axes().join([ax[x] for x in aux])

  fig.tight_layout()  # otherwise the right y-label is slightly clipped
  plt.show()
  
  
def timeline_plot_avg(vars):
  
  t = np.arange(6, 13, 1)
  ax = []
  count = 0
  for var1, var2 in vars:
    # Create some mock data
    data_1 = df_desc_filtro.groupby('segmentation_month').agg(avg(var1).alias(var1)).orderBy('segmentation_month')
    data_2 = df_desc_filtro.groupby('segmentation_month').agg(avg(var2).alias(var2)).orderBy('segmentation_month')

    data1 = np.array(data_1.select(var1).collect())
    data2 = np.array(data_2.select(var2).collect())

    fig, ax_aux = plt.subplots()
    ax.append(ax_aux)

    color = 'tab:red'
    ax[count].set_xlabel('timeline')
    ax[count].set_ylabel(var1, color=color)
    ax[count].plot(t, data1, color=color)
    ax[count].tick_params(axis='y', labelcolor=color)

    ax.append(ax[count].twinx())  # instantiate a second axes that shares the same x-axis

    color = 'tab:blue'
    ax[count+1].set_ylabel(var2, color=color)  # we already handled the x-label with ax1
    ax[count+1].plot(t, data2, color=color)
    ax[count+1].tick_params(axis='y', labelcolor=color)
    
    count = count + 2
  
  aux = list(range(1, len(charts_list) * 2, 2))
  ax[1].get_shared_y_axes().join([ax[x] for x in aux])

  fig.tight_layout()  # otherwise the right y-label is slightly clipped
  plt.show()
  
def timeline_plot_mixed(vars):
  
  t = np.arange(6, 13, 1)
  ax = []
  count = 0
  for var1, var2 in vars:
    # Create some mock data
    data_1 = df_desc_filtro.groupby('segmentation_month').agg(sum(var1).alias("sum_of_" + var1)).orderBy('segmentation_month')
    data_2 = df_desc_filtro.groupby('segmentation_month').agg(avg(var2).alias("avg_of_" + var2)).orderBy('segmentation_month')

    data1 = np.array(data_1.select("sum_of_" + var1).collect())
    data2 = np.array(data_2.select("avg_of_" + var2).collect())

    fig, ax_aux = plt.subplots()
    ax.append(ax_aux)

    color = 'tab:red'
    ax[count].set_xlabel('timeline')
    ax[count].set_ylabel("sum_of_" + var1, color=color)
    ax[count].plot(t, data1, color=color)
    ax[count].tick_params(axis='y', labelcolor=color)

    ax.append(ax[count].twinx())  # instantiate a second axes that shares the same x-axis

    color = 'tab:blue'
    ax[count+1].set_ylabel("avg_of_" + var2, color=color)  # we already handled the x-label with ax1
    ax[count+1].plot(t, data2, color=color)
    ax[count+1].tick_params(axis='y', labelcolor=color)
    
    count = count + 2
  
  aux = list(range(1, len(charts_list) * 2, 2))
  ax[1].get_shared_y_axes().join([ax[x] for x in aux])

  fig.tight_layout()  # otherwise the right y-label is slightly clipped
  plt.show()

## 4. Base Treatment
Bases Geradas na entrega 1

### 4.1 Read Raw Bases

In [11]:
root_dir = '/dbfs/FileStore/ifood'
dbutils.fs.ls(f'{root_dir}')
df_customer_segmentation = spark.read.parquet(f'{root_dir}/customer_segmentation')
df_orders = spark.read.parquet(f'{root_dir}/orders')
df_orders_with_cost_revenue = spark.read.parquet(f'{root_dir}/orders_with_cost_revenue')
df_sessions_visits = spark.read.parquet(f'{root_dir}/sessions_visits')
df_marketing_push_full = spark.read.parquet(f'{root_dir}/marketing_push_full')

### 4.2 Treatment

In [13]:
# ---------------------------
#          PUSHES
# ---------------------------

# Missing Values Treatment
colunas_pushes_missing = percent_missing(df_marketing_push_full).orderBy('perc_missing', ascending=False)
thresholdMissingPushes = 22
df_columns_to_drop = colunas_pushes_missing.filter(colunas_pushes_missing['perc_missing'] > thresholdMissingPushes).select('variaveis')
list_columns_to_drop = list([row[0] for row in df_columns_to_drop.collect()])
df_mpf = df_marketing_push_full.drop(*list_columns_to_drop)
df_mpf = df_mpf.dropDuplicates()

# Para as colunas que nao remover, separar em categoricas e numericas
df_columns_missing = colunas_pushes_missing.filter(
                                                   (colunas_pushes_missing['perc_missing'] <= thresholdMissingPushes) &
                                                   (colunas_pushes_missing['perc_missing'] > 0)
                                                  ).select('variaveis')
list_df_columns_missing = list([row[0] for row in df_columns_missing.collect()])

filter_colunas_numericas = [x[0] for x in df_mpf[list_df_columns_missing].dtypes if x[1] in ('double', 'int', 'long')]
filter_colunas_categoricas = [x[0] for x in df_mpf[list_df_columns_missing].dtypes if x[1] not in ('double', 'int', 'long')]

# Inputar unknown para colunas categoricas
for coluna in filter_colunas_categoricas:
  df_mpf = df_mpf.fillna('unknown', subset=[coluna])

# Inputar media para colunas numericas
for coluna in filter_colunas_numericas:
  media = df_mpf.agg(mean(coluna)).collect()[0][0]
  df_mpf = df_mpf.fillna(media, subset=[coluna])

# Date Treatment
# importante: transformar primeiro para timestamp, depois para date!!!
df_mpf = df_mpf.withColumn('event_time_utc3', from_utc_timestamp('event_time_utc3', 'UTC'))\
               .withColumn('event_date', from_utc_timestamp('event_date', 'UTC'))\
               .withColumn('event_date', to_date('event_date', 'YYYY-MM-DD'))\
               .withColumn('event_month', month('event_date'))\
               .withColumn('event_dayofweek', dayofweek('event_date'))


# ---------------------------
#    CUSTOMER SEGMENTATION
# ---------------------------

# Remove duplicates
df_customer_segmentation = df_customer_segmentation.distinct()

# Missing Treatment
perc_miss = percent_missing(df_customer_segmentation) # Calcula o percentual de missings para todas as colunas
col_drop = perc_miss.filter((perc_miss['perc_missing'] <= 0.1) & (perc_miss['perc_missing'] > 0)).select('variaveis').rdd.flatMap(lambda x: x).collect() # Seleciona colunas com menos de 0.1% de missings
col_drop.append('customer_id') # Acrescenta a coluna de customer_id na lista de colunas com missing
aux1 = df_customer_segmentation.select(col_drop) # Criar dataframe auxiliar apenas com as colunas em col_drop
# Cria lista com o customer_id de quem tem algum dado nulo para alguma das colunas em col_drop:
c_id_drop = aux1.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in aux1.columns))).select('customer_id').distinct().rdd.flatMap(lambda x: x).collect() 
# Exclui de df_customer_segmentation todos os registros de quem teve informação nula identificada: (Decisão tomada por conta da base ser histórica e o impacto em número absoluto de resgistros ser pequeno)
df_customer_segmentation = df_customer_segmentation.filter(~df_customer_segmentation.customer_id.isin(c_id_drop))

# Array Treatment
#Ajuste coluna preferred_dishes (tipo array):
df_customer_segmentation = df_customer_segmentation.withColumn('aux_1', udf_limpeza(col('preferred_dishes')))
df_customer_segmentation = df_customer_segmentation.withColumn('preferred_dishes_ar', split(col('aux_1'), ',').cast(ArrayType(StringType())))
#Ajuste coluna top_3_merchants_code (tipo array):
df_customer_segmentation = df_customer_segmentation.withColumn('aux_2', udf_limpeza(col('top_3_merchants_code')))
df_customer_segmentation = df_customer_segmentation.withColumn('top_3_merchants_code_ar', split(col('aux_2'), ',').cast(ArrayType(StringType())))

list_columns_to_drop = ['aux_1','aux_2']
df_customer_segmentation = df_customer_segmentation.drop(*list_columns_to_drop) # Exclusão de colunas auxiliares

# Date Treatment
df_customer_segmentation = df_customer_segmentation.withColumn('registration_date', from_utc_timestamp('registration_date', 'UTC'))\
                                                   .withColumn('registration_date', to_date('registration_date', 'YYYY-MM-DD'))\
                                                   .withColumn('last_valid_order_date', from_utc_timestamp('last_valid_order_date', 'UTC'))\
                                                   .withColumn('last_valid_order_date', to_date('last_valid_order_date', 'YYYY-MM-DD'))\
                                                   .withColumn('last_invalid_order_date', from_utc_timestamp('last_invalid_order_date', 'UTC'))\
                                                   .withColumn('last_invalid_order_date', to_date('last_invalid_order_date', 'YYYY-MM-DD'))\
                                                   .withColumn('first_order_date', from_utc_timestamp('first_order_date', 'UTC'))\
                                                   .withColumn('first_order_date', to_date('first_order_date', 'YYYY-MM-DD'))\
                                                   .withColumn('last_order_date', from_utc_timestamp('last_order_date', 'UTC'))\
                                                   .withColumn('last_order_date', to_date('last_order_date', 'YYYY-MM-DD'))\
                                                   .withColumn('segmentation_month', from_utc_timestamp('segmentation_month', 'UTC'))\
                                                   .withColumn('segmentation_month', to_date('segmentation_month', 'YYYY-MM-DD'))\
                                                   .withColumn('registration_month', month('registration_date'))\
                                                   .withColumn('registration_dayofweek', dayofweek('registration_date'))\
                                                   .withColumn('first_order_month', month('first_order_date'))\
                                                   .withColumn('first_order_dayofweek', dayofweek('first_order_date'))\
                                                   .withColumn('segmentation_month_month', month('segmentation_month'))\
                                                   .withColumn('segmentation_month_dayofweek', dayofweek('segmentation_month'))\
                                                   .withColumn('last_order_month', month('last_order_date'))\
                                                   .withColumn('last_order_dayofweek', dayofweek('last_order_date'))


# ---------------------------
#  ORDERS + ORDERS w/ COSTS
# ---------------------------

# Join Orders and Orders with Cost/Revenue bases
df_orders_total = join_removing_repeated(df_orders, df_orders_with_cost_revenue, df_orders.order_number == df_orders_with_cost_revenue.order_number, 'left')

# Missing Treatment
colunas_orders_missing = percent_missing(df_orders_total).orderBy('perc_missing', ascending=False)
# Para as colunas com missing, separar em categoricas e numericas
df_columns_orders_missing = colunas_orders_missing.filter(colunas_orders_missing['perc_missing'] > 0
                                                  ).select('variaveis')
list_df_columns_orders_missing = list([row[0] for row in df_columns_orders_missing.collect()])

filter_colunas_orders_numericas = [x[0] for x in df_orders_total[list_df_columns_orders_missing].dtypes if x[1] in ('double', 'long')]
filter_colunas_orders_numericas_int = [x[0] for x in df_orders_total[list_df_columns_orders_missing].dtypes if x[1] in ('int')]
filter_colunas_orders_categoricas = [x[0] for x in df_orders_total[list_df_columns_orders_missing].dtypes if x[1] not in ('double', 'int', 'long', 'boolean')]
filter_colunas_orders_booleanas = [x[0] for x in df_orders_total[list_df_columns_orders_missing].dtypes if x[1] in ('boolean')]

# Inputar unknown para colunas categoricas
for coluna in filter_colunas_orders_categoricas:
  df_orders_total = df_orders_total.fillna('unknown', subset=[coluna])

# Transformar colunas booleanas para string, depois inputar unknown
for coluna in filter_colunas_orders_booleanas:
  df_orders_total = df_orders_total.withColumn(coluna,col(coluna).cast('string'))
  df_orders_total = df_orders_total.fillna('unknown', subset=[coluna])

# Inputar media para colunas numericas
for coluna in filter_colunas_orders_numericas:
  media = df_orders_total.agg(mean(coluna)).collect()[0][0]
  df_orders_total = df_orders_total.fillna(media, subset=[coluna])
  
# Inputar media para colunas numericas inteiras arredondando para o integer mais perto
for coluna in filter_colunas_orders_numericas:
  media = df_orders_total.agg(mean(coluna)).collect()[0][0]
  df_orders_total = df_orders_total.fillna(media, subset=[coluna])
  df_orders_total = df_orders_total.withColumn(coluna, f.round(df_orders_total[coluna], 0))

# Date Treatment  
# importante: transformar primeiro para timestamp, depois para date!!!
df_orders_total = df_orders_total.withColumn('order_timestamp_local', from_utc_timestamp('order_timestamp_local', 'UTC'))\
                                 .withColumn('last_status_date_local', from_utc_timestamp('last_status_date_local', 'UTC'))\
                                 .withColumn('scheduled_creation_date_local', from_utc_timestamp('scheduled_creation_date_local', 'UTC'))\
                                 .withColumn('order_date_local', from_utc_timestamp('order_date_local', 'UTC'))\
                                 .withColumn('cohort_month', from_utc_timestamp('cohort_month', 'UTC'))\
                                 .withColumn('first_order_date', from_utc_timestamp('first_order_date', 'UTC'))\
                                 .withColumn('order_date_local', to_date('order_date_local', 'YYYY-MM-DD'))\
                                 .withColumn('cohort_month', to_date('cohort_month', 'YYYY-MM-DD'))\
                                 .withColumn('first_order_date', to_date('first_order_date', 'YYYY-MM-DD'))\
                                 .withColumn('order_date_local_month', month('order_date_local'))\
                                 .withColumn('order_date_local_dayofweek', dayofweek('order_date_local'))

# ---------------------------
#           VISITS
# ---------------------------

# Missing Treatment
colunas_visits_missing = percent_missing(df_sessions_visits).orderBy('perc_missing', ascending=False)
# Para as colunas com missing, separar em categoricas e numericas
df_columns_visits_missing = colunas_visits_missing.filter(colunas_visits_missing['perc_missing'] > 0
                                                  ).select('variaveis')
list_df_columns_visits_missing = list([row[0] for row in df_columns_visits_missing.collect()])

filter_colunas_visits_numericas = [x[0] for x in df_sessions_visits[list_df_columns_visits_missing].dtypes if x[1] in ('double', 'long')]
filter_colunas_visits_numericas_int = [x[0] for x in df_sessions_visits[list_df_columns_visits_missing].dtypes if x[1] in ('int')]
filter_colunas_visits_categoricas = [x[0] for x in df_sessions_visits[list_df_columns_visits_missing].dtypes if x[1] not in ('double', 'int', 'long', 'boolean')]
filter_colunas_visits_booleanas = [x[0] for x in df_sessions_visits[list_df_columns_visits_missing].dtypes if x[1] in ('boolean')]

# Inputar unknown para colunas categoricas
for coluna in filter_colunas_visits_categoricas:
  df_sessions_visits = df_sessions_visits.fillna('unknown', subset=[coluna])

# Transformar colunas booleanas para string, depois inputar unknown
for coluna in filter_colunas_visits_booleanas:
  df_sessions_visits = df_sessions_visits.withColumn(coluna,col(coluna).cast('string'))
  df_sessions_visits = df_sessions_visits.fillna('unknown', subset=[coluna])
  
# Inputar media para colunas numericas inteiras arredondando para o integer mais perto
for coluna in filter_colunas_visits_numericas:
  media = df_sessions_visits.agg(mean(coluna)).collect()[0][0]
  df_sessions_visits = df_sessions_visits.fillna(media, subset=[coluna])
  df_sessions_visits = df_sessions_visits.withColumn(coluna, f.round(df_sessions_visits[coluna], 0))
  

# Dates Conversion
df_sessions_visits = df_sessions_visits.withColumn('session_started_at_amsp', from_utc_timestamp('session_started_at_amsp', 'UTC'))\
                                       .withColumn('session_ended_at_amsp', from_utc_timestamp('session_ended_at_amsp', 'UTC'))\
                                       .withColumn('session_started_at_utc0', from_utc_timestamp('session_started_at_utc0', 'UTC'))\
                                       .withColumn('session_ended_at_utc0', from_utc_timestamp('session_ended_at_utc0', 'UTC'))\
                                       .withColumn('session_started_at_utc0', from_utc_timestamp('session_started_at_utc0', 'UTC'))\
                                       .withColumn('session_started_date', to_date('session_started_at_amsp', 'YYYY-MM-DD'))\
                                       .withColumn('session_started_month', month('session_started_date'))\
                                       .withColumn('session_started_dayofweek', dayofweek('session_started_date'))

### 4.3 Save Tables

In [15]:
%fs rm -r /dbfs/FileStore/treated/

In [16]:
# Create folder
treat_dir = '/dbfs/FileStore/treated'
dbutils.fs.mkdirs(f'{treat_dir}')

df_mpf.write.parquet('/dbfs/FileStore/treated/df_mpf.parquet')
df_customer_segmentation.write.parquet('/dbfs/FileStore/treated/df_customer_segmentation.parquet')
df_orders_total.write.parquet('/dbfs/FileStore/treated/df_orders_total.parquet')
df_sessions_visits.write.parquet('/dbfs/FileStore/treated/df_sessions_visits.parquet')

## 5. ABT Generation

In [18]:
# Read from Parquet
root_dir = '/dbfs/FileStore/treated'
dbutils.fs.ls(f'{root_dir}')
df_customer_segmentation = spark.read.parquet(f'{root_dir}/df_customer_segmentation.parquet')
df_orders_total_tratado = spark.read.parquet(f'{root_dir}/df_orders_total.parquet')
df_sessions_visits_tratado = spark.read.parquet(f'{root_dir}/df_sessions_visits.parquet')
df_mpf_tratado = spark.read.parquet(f'{root_dir}/df_mpf.parquet')

### Pushes Grouped by 'external_user_id' and 'event_month'
grouping_cols = ["external_user_id", "event_month"]
other_cols = [c for c in df_mpf_tratado.columns if c not in grouping_cols]
exclude_cols = ['event_channel', 'event_name', 'brand', 'sample_type', 'user_id', 'campaign_id', 'event_dayofweek', 'message_variation_channel']
mpf_cols = list(set(other_cols) - set(exclude_cols))

# Group Pushes base counting distinct events of a customer in a month
df_mpf_grouped = df_mpf_tratado.filter(df_mpf_tratado['event_name'] == 'received')\
                               .groupBy(grouping_cols).agg(*[countDistinct(c).alias("pushes_count_distinct_"+c) for c in mpf_cols])
df_mpf_grouped = df_mpf_grouped.withColumn('pushes_changed_platform',\
                                           when(df_mpf_grouped.pushes_count_distinct_platform >= 2,1).otherwise(0))
df_mpf_grouped = df_mpf_grouped[[list(set(df_mpf_grouped.columns) - set(['pushes_count_distinct_platform']))]]

# Most common platform in a month where a user received most pushes
w = Window.partitionBy("external_user_id", "event_month")
most_common_platform = df_mpf_tratado.filter(df_mpf_tratado['event_name'] == 'received')\
                                     .groupBy("external_user_id", "event_month", "platform")\
                                     .agg(countDistinct("event_time_utc3").alias("pushes_count"))\
                                     .withColumn("rn",row_number().over(w.orderBy(col("pushes_count").desc())))\
                                     .where(col("rn") == 1)\
                                     .drop("rn")\
                                     .withColumnRenamed('platform', 'most_common_platform')\
                                     .select(["external_user_id", "event_month", "most_common_platform"])
cond = [df_mpf_grouped.external_user_id == most_common_platform.external_user_id, df_mpf_grouped.event_month == most_common_platform.event_month]
df_mpf_grouped = join_removing_repeated(df_mpf_grouped, most_common_platform, cond, 'left')



### Orders and Sessions join via session_id
df_orders_and_sessions = join_removing_repeated(df_orders_total_tratado, df_sessions_visits_tratado, df_orders_total_tratado.session_id == df_sessions_visits_tratado.session_id, 'left')
df_orders_and_sessions = df_orders_and_sessions.withColumn('order_timestamp_local', from_utc_timestamp('order_timestamp_local', 'UTC'))\
                                               .withColumn('order_timestamp_local', to_date('order_timestamp_local', 'YYYY-MM-DD'))\
                                               .withColumn('order_timestamp_local_month', month('order_timestamp_local'))
# Orders and Sessions grouped by 'customer_id' and 'order_timestamp_local_month'
grouping_cols = ['customer_id','order_timestamp_local_month'] # 'payment_method', 'platform', 'device_model']
sum_cols = ['order_total'
            ,'credit'
            ,'paid_amount'
            ,'distance_merchant_customer'
            ,'promo_is_promotion'
            ,'normal_items_quantity'
            ,'promo_items_quantity'
            ,'session_duration_seconds'
            ,'sum_event_open'
            ,'sum_view_restaurant_screen'
            ,'sum_view_dish_screen'
            ,'sum_click_add_item'
            ,'sum_view_checkout'
            ,'sum_callback_purchase'
            ,'order_session_quantity'
            ,'valid_order'
            ,'general_net_profit']
avg_cols = ['session_duration_seconds'
            ,'distance_merchant_customer'
            ,'general_net_profit'
            ,'order_total'
            ,'credit'
            ,'paid_amount'
            ,'sum_event_open'
            ,'sum_view_restaurant_screen'
            ,'sum_view_dish_screen'
            ,'sum_click_add_item'
            ,'sum_view_checkout'
            ,'sum_callback_purchase']

# Group Orders/Sessions base summing events of a customer in a month
df_orders_and_sessions_group = df_orders_and_sessions.groupBy(grouping_cols).agg(*[sum(c).alias("sum_"+c) for c in sum_cols],\
                                                                                 *[avg(c).alias("avg_"+c) for c in avg_cols],\
                                                                                 count('order_id').alias('number_of_orders'))

# Most common categorical features on Orders/Sessions base
import pyspark.sql.functions as f
@f.udf
def most_common_udf(x):
    from collections import Counter
    return Counter(x).most_common(1)[0][0]

grouping_cols = ['customer_id','order_timestamp_local_month']  
cols = ['order_shift'
        , 'delivery_type'
        #, 'device_type'
        , 'device_platform'
        , 'payment_method'
        , 'customer_state_label'
        , 'customer_seg_recency_bucket'
        , 'customer_seg_merchant_offer_bucket'
        , 'customer_seg_benefits_sensitivity_bucket'
        , 'customer_seg_frequency_bucket'
        , 'customer_seg_gross_income_bucket'
        , 'merchant_dish_type'
       ]
agg_expr = [most_common_udf(f.collect_list(col)).alias('most_common_'+col) for col in cols]
most_common_categorical = df_orders_and_sessions.groupBy(grouping_cols).agg(*agg_expr)

cond = [df_orders_and_sessions_group.customer_id == most_common_categorical.customer_id, 
        df_orders_and_sessions_group.order_timestamp_local_month == most_common_categorical.order_timestamp_local_month]
df_orders_and_sessions_group = join_removing_repeated(df_orders_and_sessions_group, most_common_categorical, cond, 'left')


# Unifies Pushes, Orders and Sessions user actions in a month
cond = [df_orders_and_sessions_group.customer_id == df_customer_segmentation.customer_id, 
        df_orders_and_sessions_group.order_timestamp_local_month == df_customer_segmentation.segmentation_month_month]
df_join = join_removing_repeated(df_customer_segmentation, df_orders_and_sessions_group, cond, 'left')

cond2 = [df_join.customer_id == df_mpf_grouped.external_user_id,
        df_join.segmentation_month_month == df_mpf_grouped.event_month]
df_final = join_removing_repeated(df_join, df_mpf_grouped, cond2, how='left')

In [19]:
# Saves table
#df_final.write.saveAsTable('df_final_200609')

In [20]:
df_final = spark.table("df_final_200609")

# Cria a Target com 1 para cliente que foi Churn. 0 caso contrário
df_final = df_final.withColumn('target_current',when(df_final.ifood_status == 'Churn',1).otherwise(0))

df_final = df_final.withColumn('target',
                  f.lag(df_final['target_current'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month_month"))))


# Para o lagging do problema 3, o churn/inativo do mês seguinte significa que o cliente nao fez nenhum pedido. Por exemplo, quem ficou inativo/churn tem sum_order_total null. Nesse caso, queremos prever que esse usuário vai ter um sum_order_total de zero.
df_final = df_final.fillna(0, subset=['sum_order_total'])
df_final = df_final.withColumn('target_3',
                  f.lag(df_final['sum_order_total'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month_month"))))

# df_final = df_final.fillna(0, subset=['number_of_orders'])
# df_final = df_final.withColumn('target_3',
#                   f.lag(df_final['number_of_orders'])
#                    .over(Window.partitionBy("customer_id")
#                    .orderBy(desc("segmentation_month_month"))))

df_final = df_final.filter(~df_final.ifood_status.isin(['Churn', 'Inactive'])) # como o objetivo é prever churn, mantemos somente quem pode dar churn

In [21]:
# PREPARATION:
# Listas de variaveis por tipo: categorico, numerico e data
categorical_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] not in ('double', 'long', 'int', 'bigint', 'date')]).columns
numerical_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] in ('double', 'long', 'int', 'bigint')]).columns
date_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] in ('date')]).columns

# Exclusão de colunas com dados referentes ao mês.
excluded_columns = [
  'last_invalid_order_date'
  ,'preferred_shift_bucket_description' # redundante pois preferred_shift_bucket foi tratado
  ,'external_user_id' # chave da tabela de pushes
  ,'event_month' # chave da tabela de pushes
  ,'days_to_reorder_at_concluded' # variavel da tabela de segmentation que optamos por nao usar
  ,'days_to_reorder_at_datasource' # variavel da tabela de segmentation que optamos por nao usar
  ,'registration_month'
  ,'registration_dayofweek'
  ,'first_order_month'
  ,'first_order_dayofweek'
  ,'segmentation_month_dayofweek'
  ,'segmentation_month' # redundante
  ,'last_order_month'
  ,'last_order_dayofweek'
  ,'order_timestamp_local_month' # chave do join da base de order
  ,'count_distinct_event_dayofweek' # nao tem uma interpretacao. Um 7 diz somente se um usuario recebeu num mes pushes todos os dias da semana
  ,'event_month' # chave do join da base de pushes
  ,'pushes_count_distinct_event_time_utc3' # timestamp da base de pushes
  ,'sum_distance_merchant_customer' # from Orders+Visits: Nulo não faz sentido
  ,'avg_distance_merchant_customer'
  ,'most_common_merchant_dish_type'
  ,'most_common_customer_state_label'
  ,'most_common_platform'
]

included_categorical = [
 'marlin_tag',
 'last_nps',
 'benefits_sensitivity_bucket',
 'merchant_variety_bucket',
 'most_common_order_shift',
 'most_common_delivery_type',
 'most_common_device_platform',
 'most_common_payment_method',
 'most_common_customer_seg_recency_bucket',
 'most_common_customer_seg_merchant_offer_bucket',
 'most_common_customer_seg_benefits_sensitivity_bucket',
 'most_common_customer_seg_frequency_bucket',
 'most_common_customer_seg_gross_income_bucket'
]

excluded_columns = list(set(excluded_columns + date_columns + categorical_columns))
included_columns = list(set(df_final.columns) - set(excluded_columns)) + ['customer_id'] + included_categorical

df_filtrado = df_final[included_columns]

# Inputar zero para colunas numericas de pushes (clientes ativos que nao receberam push no mes) [~10050]
pushes_fillna_list = [
  'pushes_changed_platform'
  ,'pushes_count_distinct_event_date'
  ,'pushes_count_distinct_campaign_name'
]
for coluna in pushes_fillna_list:
  df_filtrado = df_filtrado.fillna(0, subset=[coluna])

# Drop em variaveis relacionadas a pedidos/sessões de clientes que nao fizeram compras (~3903)
df_filtrado = df_filtrado.dropna()

In [22]:
# Temos uma base para modelagem com 110k registros
df_filtrado.count()

In [23]:
df_filtrado.write.saveAsTable('df_filtrado_200610')

## 6. [PROBLEMA 1] O que fizeram os clientes darem Churn?

### 6.1 Pipeline

In [26]:
included_categorical = [
 'marlin_tag',
 'last_nps',
 'benefits_sensitivity_bucket',
 'merchant_variety_bucket',
 'most_common_order_shift',
 'most_common_delivery_type',
 'most_common_device_platform',
 'most_common_payment_method',
 'most_common_customer_seg_recency_bucket',
 'most_common_customer_seg_merchant_offer_bucket',
 'most_common_customer_seg_benefits_sensitivity_bucket',
 'most_common_customer_seg_frequency_bucket',
 'most_common_customer_seg_gross_income_bucket'
]

In [27]:
# PIPELINE BUILDING:
# Based on https://gist.github.com/colbyford/83978917799dbcab6293521a60f29e94

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler, MinMaxScaler, Normalizer

df_filtrado = spark.table("df_filtrado_200610")

categoricalColumns = included_categorical

numericalColumns = list(set(df_filtrado.columns) - set(['customer_id','segmentation_month_month','ifood_status','ifood_status_last_month','target','target_3']) - set(categoricalColumns))

categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

stages = []

for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

# Convert label into label indices using the StringIndexer
#label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
#stages += [label_stringIndexer]

# Transform all features into a vector using VectorAssembler
assemblerInputs = categoricalColumnsclassVec + numericalColumns 
# assembler only considers 'classVec' columns (it already did not consider stringIndexer)
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")
# assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="featuresAssembled")
stages += [assembler]

prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(df_filtrado)
dataset = pipelineModel.transform(df_filtrado)

#dataset.write.saveAsTable('dataset_grupo_15')
#dataset = spark.table("dataset_grupo_15")

# scaler = MinMaxScaler(inputCol="featuresAssembled", outputCol="features")
# scalerModel = scaler.fit(dataset_assembled)
# dataset = scalerModel.transform(dataset_assembled)

In [28]:
dataset.write.saveAsTable('dataset_churn_200610')

###6.2 Modelagem

#### 6.2.1 Modelo 1: Regressão Logística

In [31]:
# TREINAMENTO E TESTE DO MODELO:
dataset = spark.table("dataset_churn_200610")

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
reglog = LogisticRegression(labelCol='target')

modelo_rl = reglog.fit(treino)
predicoes_rl = modelo_rl.transform(teste)

In [32]:
#em caso de erro no import abaixo, instalar o pacote
#!pip install handyspark

In [33]:
# AUC ROC:
# !pip install handyspark
from handyspark import *
targetColumn = 'target'
#predicoes_rl.toHandy().cols[['probability', 'prediction', targetColumn]][:5]

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'target'
bcm_rl = BinaryClassificationMetrics(predicoes_rl, scoreCol='probability', labelCol=targetColumn)

# We still can get the same metrics as the evaluator...
print("Area under ROC Curve: {:.4f}".format(bcm_rl.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm_rl.areaUnderPR))

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_rl.plot_roc_curve(ax=axs[0])
bcm_rl.plot_pr_curve(ax=axs[1])

# # We can also get all metrics (FPR, Recall and Precision) by threshold
# bcm.getMetricsByThreshold().filter('fpr between 0.19 and 0.21').toPandas().head(5)
# # And get the confusion matrix for any threshold we want
# bcm.print_confusion_matrix(.415856)

In [34]:
# MATRIZ DE CONFUSÃO:

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_true = predicoes_rl.select('target')
y_true = y_true.toPandas()

y_pred = predicoes_rl.select('prediction')
y_pred = y_pred.toPandas()
cm = confusion_matrix(y_true, y_pred)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

In [35]:
# MÉTRICAS:

print('Acuracia Treino: ', modelo_rl.summary.accuracy)
print('Precision Treino: ', modelo_rl.summary.precisionByLabel)
print('Recall Treino: ', modelo_rl.summary.recallByLabel)
print('areaUnderRoc Treino: ', modelo_rl.summary.areaUnderROC)

resultado_teste = modelo_rl.evaluate(teste)

print('Acuracia: ', resultado_teste.accuracy)
print('Precision: ', resultado_teste.precisionByLabel)
print('Recall: ', resultado_teste.recallByLabel)
print('areaUnderRoc: ', resultado_teste.areaUnderROC)

#### 6.2.2 Modelo 2: Decision Tree

In [37]:
# TREINAMENTO E TESTE DO MODELO:

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
decTree = DecisionTreeClassifier(labelCol='target')

modelo_dt = decTree.fit(treino)

predicoes_dt = modelo_dt.transform(teste)
resultado_dt = predicoes_dt.select("target", "prediction", "probability")

In [38]:
ExtractFeatureImp(modelo_dt.featureImportances, predicoes_dt, "features")

Unnamed: 0,idx,name,score
10,56,qtt_orders_last_year,0.901243
33,79,number_of_orders,0.035849
56,1,marlin_tagclassVec_2. Tilapia,0.021835
12,58,qtt_valid_orders,0.019388
3,49,recency_days,0.012742
16,62,customer_lifetime_months,0.004361
58,3,last_npsclassVec_Sem Avaliacoes,0.003663
5,51,pushes_count_distinct_event_date,0.000919
0,46,avg_order_total,0.000000
69,14,most_common_order_shiftclassVec_weekend lunch,0.000000


In [39]:
# AUC ROC:

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'target'
bcm_dt = BinaryClassificationMetrics(predicoes_dt, scoreCol='probability', labelCol=targetColumn)

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_dt.plot_roc_curve(ax=axs[0])
bcm_dt.plot_pr_curve(ax=axs[1])

In [40]:
# MATRIZ DE CONFUSÃO:

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_trueDT = predicoes_dt.select('target')
y_trueDT = y_trueDT.toPandas()

y_predDT = predicoes_dt.select('prediction')
y_predDT = y_predDT.toPandas()
cm = confusion_matrix(y_trueDT, y_predDT)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

In [41]:
# MÉTRICAS:

evaluator = BinaryClassificationEvaluator(labelCol='target', metricName='areaUnderROC')
class_model_evaluator(predicoes_dt, labelCol='target')

#### 6.2.3 Modelo 3 - Random Forest

In [43]:
# TREINAMENTO E TESTE DO MODELO:

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
rf = RandomForestClassifier(labelCol='target', featuresCol="features", numTrees=10)

modelo_rf = rf.fit(treino)

predicoes_rf = modelo_rf.transform(teste)
resultado_rf = predicoes_rf.select("target", "prediction", "probability")

In [44]:
# AUC ROC:

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'target'
bcm_rf = BinaryClassificationMetrics(predicoes_rf, scoreCol='probability', labelCol=targetColumn)

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_rf.plot_roc_curve(ax=axs[0])
bcm_rf.plot_pr_curve(ax=axs[1])

In [45]:
# MATRIZ DE CONFUSÃO:

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_trueRF = predicoes_rf.select('target')
y_trueRF = y_trueRF.toPandas()

y_predRF = predicoes_rf.select('prediction')
y_predRF = y_predRF.toPandas()
cm = confusion_matrix(y_trueRF, y_predRF)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

In [46]:
# MÉTRICAS:

class_model_evaluator(predicoes_rf, labelCol='target')

#### 6.2.4 Modelo 4 - Gradient Boosted Tree

In [48]:
# TREINAMENTO E TESTE DO MODELO:

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
gbt = GBTClassifier(labelCol="target", featuresCol="features", maxIter=10)
modelo_gbt = gbt.fit(treino)
predicoes_gbt = modelo_gbt.transform(teste)
resultado_gbt = predicoes_gbt.select("target", "prediction", "probability")

In [49]:
# AUC ROC:

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'target'
bcm_gbt = BinaryClassificationMetrics(predicoes_gbt, scoreCol='probability', labelCol=targetColumn)

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_gbt.plot_roc_curve(ax=axs[0])
bcm_gbt.plot_pr_curve(ax=axs[1])

In [50]:
# MATRIZ DE CONFUSÃO:

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_trueGB = predicoes_gbt.select('target')
y_trueGB = y_trueGB.toPandas()

y_predGB = predicoes_gbt.select('prediction')
y_predGB = y_predGB.toPandas()
cm = confusion_matrix(y_trueGB, y_predGB)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

In [51]:
# MÉTRICAS:

class_model_evaluator(predicoes_gbt, labelCol='target')

#### 6.2.5 Modelo 5 - Naive Bayes

In [53]:
# TREINAMENTO E TESTE DO MODELO:

# Reference: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
scalerModel = scaler.fit(dataset)
scalerData = scalerModel.transform(dataset)
treino, teste = scalerData.randomSplit([0.8, 0.2], seed = 42)

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol='features_scaled', labelCol='target')
modelo_nb = nb.fit(treino)
predicoes_nb = modelo_nb.transform(teste)
resultado_nb = predicoes_nb.select("target", "prediction", "probability")

In [54]:
# AUC ROC:

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'target'
bcm_nb = BinaryClassificationMetrics(predicoes_nb, scoreCol='probability', labelCol=targetColumn)

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_nb.plot_roc_curve(ax=axs[0])
bcm_nb.plot_pr_curve(ax=axs[1])

In [55]:
# MATRIZ DE CONFUSÃO:

In [56]:
# MÉTRICAS:

class_model_evaluator(predicoes_nb, labelCol='target')

### 6.3 Comparação dos Modelos

In [58]:
import matplotlib.pyplot as plt

plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')


plt.plot(modelo_rl.summary.roc.select('FPR').collect(),
         modelo_rl.summary.roc.select('TPR').collect(), label='Regressão Logistica')

# Decision Tree
preds_dt = predicoes_dt.select('target','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))
fpr_dt, tpr_dt = CurveMetrics(preds_dt).get_curve('roc')
plt.plot(fpr_dt, tpr_dt, label='Árvore Decisão')

# Random Forest
preds_rf = predicoes_rf.select('target','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))
fpr_rf, tpr_rf = CurveMetrics(preds_rf).get_curve('roc')
plt.plot(fpr_rf, tpr_rf, label='Random Forest')

# Gradient Boosting Tree
preds_gbt = predicoes_gbt.select('target','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))
fpr_gbt, tpr_gbt = CurveMetrics(preds_gbt).get_curve('roc')
plt.plot(fpr_gbt, tpr_gbt, label='Gradient Boosting')

# Naive Bayes
preds_nb = predicoes_nb.select('target','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))
fpr_nb, tpr_nb = CurveMetrics(preds_nb).get_curve('roc')
plt.plot(fpr_nb, tpr_nb, label='Naive Bayes')

plt.title('ROC')
plt.legend(loc="best")
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

In [59]:
print("Area under ROC: ")
print("Logistic Regression   - Area under ROC Curve: {:.4f}".format(bcm_rl.areaUnderROC))
print("Decision Tree         - Area under ROC Curve: {:.4f}".format(bcm_dt.areaUnderROC))
print("Random Forest         - Area under ROC Curve: {:.4f}".format(bcm_rf.areaUnderROC))
print("Gradient Boosting Tree - Area under ROC Curve: {:.4f}".format(bcm_gbt.areaUnderROC))
print("Naive Bayes           - Area under ROC Curve: {:.4f}".format(bcm_nb.areaUnderROC))
print("")
print("Area under PR: ")
print("Logistic Regression   - Area under PR Curve: {:.4f}".format(bcm_rl.areaUnderPR))
print("Decision Tree         - Area under PR Curve: {:.4f}".format(bcm_dt.areaUnderPR))
print("Random Forest         - Area under PR Curve: {:.4f}".format(bcm_rf.areaUnderPR))
print("Gradient Boosting Tree - Area under PR Curve: {:.4f}".format(bcm_gbt.areaUnderPR))
print("Naive Bayes           - Area under PR Curve: {:.4f}".format(bcm_nb.areaUnderPR))

**Resultados:** Portanto, baseando-se na *AUC ROC* vê-se que a **Regressão Logística** e o **Gradiente Boosting Tree** foram os modelos com melhores desempenhos (0.8062 e 0.8048 respectivamente). O modelo com o pior desempenho foi a árvore de decisão (0.7075) e, inclusive, salta aos olhos sua curva no gráfico comparativo recentemente plotado (Cmd 59). Em termos de *Acurácia*, **Regressão Logística** e **Gradiente Boosting Tree** também foram os modelos com o melhor desempenho (0.8038 e 0,8045 respectivamente).

Com isso, a seguir aplicaremos técnicas de seleção de variáveis, otimização dos hiperparâmetros e validação cruzada nos dois modelos selecionados na expectativa de melhorar ainda mais os resultados obtidos.

### 6.4 Feature Selection using Random Forest

In [62]:
# Reference: http://people.stat.sc.edu/haigang/improvement.html
# https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
# Random Forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

dataset = spark.table("dataset_churn_200610")
treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
rf = RandomForestClassifier(labelCol='target', featuresCol="features", numTrees=10)

modelo_rf = rf.fit(treino)

predicoes_rf = modelo_rf.transform(teste)
resultado_rf = predicoes_rf.select("target", "prediction", "probability")

#class_model_evaluator(predicoes_rf,labelCol='target')

ExtractFeatureImp(modelo_rf.featureImportances, predicoes_rf, "features").head(10)
varlist = ExtractFeatureImp(modelo_rf.featureImportances, predicoes_rf, "features")
varidx = [x for x in varlist['idx'][0:75]]

varlist[varlist['score'] > 0].count()

In [63]:
# Plot feature importances (Para slide)
import seaborn as sns

fig=plt.figure(figsize=[10,5])
ax=fig.add_subplot(111)
ax.set_title("Top 10 Features Random Forest")
ax = sns.barplot(x='score', y='name',data=varlist.head(10), color=(0.2, 0.4, 0.6, 0.6))
plt.xticks(rotation=0) 
plt.show()

### 6.5 Hyperparameters and Cross Validation

#### 6.5.1 Logistic Regression

In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorSlicer
#import mlflow

dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)

vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
modeloRegLog = LogisticRegression(featuresCol='features_subset', labelCol='label')

treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

paramGrid = ParamGridBuilder() \
    .addGrid(modeloRegLog.regParam, [0.1, 0.01])\
    .addGrid(modeloRegLog.maxIter, [30, 60])\
    .build()

evaluator = BinaryClassificationEvaluator()

crossval = CrossValidator(estimator=modeloRegLog,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Deve demorar ~16 min
import time
ti = time.time()
cvModelo = crossval.fit(treino_subset)
tf = time.time()
print("Demorou {} segundos".format(tf - ti))

melhorModelo = cvModelo.bestModel
print("Max Iter - ", melhorModelo._java_obj.parent().getMaxIter())
print("Reg Param - ", melhorModelo._java_obj.parent().getRegParam())

cvPrevisoes_RL = melhorModelo.transform(teste_subset)
print("areaUnderROC:", evaluator.setMetricName("areaUnderROC").evaluate(cvPrevisoes_RL))
print("areaUnderPR:", evaluator.setMetricName("areaUnderPR").evaluate(cvPrevisoes_RL))

In [67]:
# Removendo versao antiga do modelo salvo
modelpath = "/dbfs/FileStore/models/model_problema1_cv_lr"
dbutils.fs.rm(modelpath, True)
modelpath = "/dbfs/FileStore/models/model_problema1_cv_lr"
melhorModelo.write().overwrite().save(modelpath)

In [68]:
from pyspark.ml.classification import LogisticRegressionModel
dataset = spark.table("dataset_churn_200610").repartition(2).cache()
target = 'target'
dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)
vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modelpath = "/dbfs/FileStore/models/model_problema1_cv_lr"
saved_melhorModelo_lr = LogisticRegressionModel.load(modelpath)
cv_predicoes_lr = saved_melhorModelo_lr.transform(teste_subset)

In [69]:
# MATRIZ DE CONFUSÃO:
# rodei no notebook cópia dessa entrega em: https://community.cloud.databricks.com/?o=763780990988853#notebook/2051424721805341/command/1208499454968453
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_true = cvPrevisoes_RL.select('label')
y_true = y_true.toPandas()

y_pred = cvPrevisoes_RL.select('prediction')
y_pred = y_pred.toPandas()
cm = confusion_matrix(y_true, y_pred)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

print('Acurácia:', accuracy_score(y_true, y_pred))
print('Acurácia Balanceada:', balanced_accuracy_score(y_true, y_pred))
print(classification_report(y_true, y_pred))

In [70]:
from pyspark.ml.classification import LogisticRegressionModel
dataset = spark.table("dataset_churn_200610").repartition(2).cache()
target = 'target'
dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)
vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modelpath = "/dbfs/FileStore/models/model_problema1_cv_lr"
saved_melhorModelo_lr = LogisticRegressionModel.load(modelpath)
cv_predicoes_lr = saved_melhorModelo_lr.transform(teste_subset)

#### 6.5.2 Gradient Boosting Tree

In [72]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import GBTClassifier
import time

dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)

vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
gbt = GBTClassifier(labelCol="label", featuresCol="features_subset", maxIter=10)

treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [6])
             .addGrid(gbt.maxBins, [30])
             .addGrid(gbt.maxIter, [15])
             .build())

# paramGrid = (ParamGridBuilder()
#              .addGrid(gbt.maxDepth, [2, 4, 6])
#              .addGrid(gbt.maxBins, [20, 30])
#              .addGrid(gbt.maxIter, [10, 15])
#              .build())

evaluator = BinaryClassificationEvaluator()

cv_gb = CrossValidator(estimator=gbt
                       ,estimatorParamMaps=paramGrid
                       ,evaluator=evaluator
                       ,numFolds=5)

ti = time.time()
cvModelo_gb = cv_gb.fit(treino_subset)
tf = time.time()
print("Demorou {} segundos".format(tf - ti))

melhorModelo_gb = cvModelo_gb.bestModel
print("MaxDepth - ", melhorModelo_gb._java_obj.getMaxDepth())
print("MaxBins - ", melhorModelo_gb._java_obj.getMaxBins())
print("MaxBins - ", melhorModelo_gb._java_obj.getMaxIter())

cvPrevisoes_gb = melhorModelo_gb.transform(teste_subset)
print("areaUnderROC:", evaluator.setMetricName("areaUnderROC").evaluate(cvPrevisoes_gb))
print("areaUnderPR:", evaluator.setMetricName("areaUnderPR").evaluate(cvPrevisoes_gb))

In [73]:
# Removendo versao antiga do modelo salvo
modelpath = "/dbfs/FileStore/models/model_problema1_cv_gbt"
dbutils.fs.rm(modelpath, True)

In [74]:
modelpath = "/dbfs/FileStore/models/model_problema1_cv_gbt"
melhorModelo_gb.write().overwrite().save(modelpath)

In [75]:
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
dataset = spark.table("dataset_churn_200610").repartition(2).cache()
target = 'target'
dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)
vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modelpath = "/dbfs/FileStore/models/model_problema1_cv_gbt"
saved_melhorModelo_gbt = GBTClassificationModel.load(modelpath)
cv_predicoes_gbt = saved_melhorModelo_gbt.transform(teste_subset)

In [76]:
# https://stackoverflow.com/questions/42549200/how-to-get-all-parameters-of-estimator-in-pyspark
{param[0].name: param[1] for param in saved_melhorModelo_gbt.extractParamMap().items()}

In [77]:
# Plot feature importances (Para slide)
import seaborn as sns
varlist = ExtractFeatureImp(saved_melhorModelo_gbt.featureImportances, cv_predicoes_gbt, "features_subset")

fig=plt.figure(figsize=[10,5])
ax=fig.add_subplot(111)
ax.set_title("Top 10 Features Random Forest")
ax = sns.barplot(x='score', y='name',data=varlist.head(15), color=(0.2, 0.4, 0.6, 0.6))
plt.xticks(rotation=0) 
plt.show()

In [78]:
# AUC ROC:

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output from the classifier
targetColumn = 'label'
bcm_gbt_cv = BinaryClassificationMetrics(cvPrevisoes_gb, scoreCol='probability', labelCol=targetColumn)

# But now we can PLOT both ROC and PR curves!
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm_gbt_cv.plot_roc_curve(ax=axs[0])
bcm_gbt_cv.plot_pr_curve(ax=axs[1])

In [79]:
# MATRIZ DE CONFUSÃO:

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, balanced_accuracy_score, roc_curve
import seaborn as sns

y_true = cvPrevisoes_gb.select('label')
y_true = y_true.toPandas()

y_pred = cvPrevisoes_gb.select('prediction')
y_pred = y_pred.toPandas()
cm = confusion_matrix(y_true, y_pred)

sns.set(font_scale=1.4)
sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt='g')

print('Acurácia:', accuracy_score(y_true, y_pred))
print('Acurácia Balanceada:', balanced_accuracy_score(y_true, y_pred))
print(classification_report(y_true, y_pred))

#### 6.5.3 Comparação Resultados

In [81]:
import matplotlib.pyplot as plt

plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')

plt.plot(melhorModelo.summary.roc.select('FPR').collect(),
         melhorModelo.summary.roc.select('TPR').collect(), label='Regressão Logistica')

# Gradient Boosting Tree
labelColumn = 'label' # pois renomeei a 'target' para 'label' para o cv funcionar
cv_preds_gbt = cvPrevisoes_gb.select(labelColumn,'probability').rdd.map(lambda row: (float(row['probability'][1]), float(row[labelColumn])))
fpr_gbt_cv, tpr_gbt_cv = CurveMetrics(cv_preds_gbt).get_curve('roc')
plt.plot(fpr_gbt_cv, tpr_gbt_cv, label='Gradient Boosting')

plt.title('ROC')
plt.legend(loc="best")
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

In [82]:
print("Area under ROC: ")
print("Logistic Regression CV   - Area under ROC Curve: {:.4f}".format(bcm_rl_cv.areaUnderROC))
print("Gradient Boosted Tree CV - Area under ROC Curve: {:.4f}".format(bcm_gbt_cv.areaUnderROC))
print("")
print("Area under PR: ")
print("Logistic Regression CV   - Area under PR Curve: {:.4f}".format(bcm_rl_cv.areaUnderPR))
print("Gradient Boosted Tree CV - Area under PR Curve: {:.4f}".format(bcm_gbt_cv.areaUnderPR))

### 6.6 Conclusão e Resposta do Problema

**Resultados:** Por fim, os dois modelos tiveram desempenhos bastante parecidos, com ligeira vantagem para o **Gradiente Boosting Tree**:
- GBT: AUC ROC = 0.8074.
- Regressão Logística: AUC ROC = 0.8025.

A otimização também gerou em uma melhoria mínima de 0.0026. Isso se deu principalmente por conta de limitações dos recursos computacionais disponíveis. Entretanto, dado que o objetivo principal dessa abordagem foi identificar as variáveis mais importantes para o modelo para responder a pergunta do problema, o obtido foi suficiente.
  
**Conclusão e Resposta para o problema inicial:**

Recaptulando, para responder a pergunta do que fez os clientes darem churn, decidiu-se criar um modelo para prever o churn e então usar essencialmente as *features importances* do melhor modelo para como os fatores que mais influenciaram no churn. Com o modelo criado e otimizado, o próximo *Cmd* trás então um plot das 15 features mais importantes para o modelo GBT. No nosso caso, esses então são os principais fatores que influenciaram para os clientes darem churn.

Indo mais a fundo nos resultado, pode-se ver que o modelo identificou como principais variáveis *orders_last_91d, recency_days* e *qtt_orders_last_year*, e isso faz total sentido dado que elas dão contexto de sazonalidade. Por exemplo, como *recency_days* é a diferença em dias entre hoje e a última compra do usuário, faz sentido pensar que se alguém tem poucas compras nos últimos 91 dias o indivíduo demonstra estar menos engajado com a plataforma do que alguém com muitos pedidos nesse mesmo período. Nesse caso, além de intuitivamente, vê-se que esse tipo de informação é um indicativo forte para alguém que vai dar churn.

Em seguida, vemos que as variáveis quantitativas relacionadas ao comportamento de compra no mês (*sum_order_total, sum_paid_amount*) também são relevantes para prever se um cliente vira churn no mês seguinte.

É interessante notar também que variáveis qualitativas são bem relevantes para o modelo, tais como a *marlin_tagclassVec_4. Retention Carp*.

In [85]:
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
dataset = spark.table("dataset_churn_200610").repartition(2).cache()
target = 'target'
dataset_renamed = dataset.withColumnRenamed('target','label')
treino, teste = dataset_renamed.randomSplit([0.8, 0.2], seed = 42)
vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modelpath = "/dbfs/FileStore/models/model_problema1_cv_gbt"
saved_melhorModelo_gbt = GBTClassificationModel.load(modelpath)
cv_predicoes_gbt = saved_melhorModelo_gbt.transform(teste_subset)
# Plot feature importances (Para slide)
import seaborn as sns
varlist = ExtractFeatureImp(saved_melhorModelo_gbt.featureImportances, cv_predicoes_gbt, "features_subset")

fig=plt.figure(figsize=[10,5])
ax=fig.add_subplot(111)
ax.set_title("Top Features Gradient Boosted Tree")
ax = sns.barplot(x='score', y='name',data=varlist.head(15), color=(0.2, 0.4, 0.6, 0.6))
plt.xticks(rotation=0) 
plt.show()

## 7. [PROBLEMA 2] Qual a importância dos eventos ao longo da vida do cliente?

**Idéia:** Dado que o conceito de importância não foi previamente estabelecido, decidimos então inicialmente explorar a relação entre os eventos ao longo do tempo e, partindo daí, tentar responder algumas questões levantadas na etapada de exploração (nessa a seguir e algumas até mesmo na Entrega 1). Tais exploração e questões serão abordados ao longo do código.
  
**Definições e Assumptions:**
- *Importância*: partindo do pressuposto de que o Ifood deseja que seus clientes permaneçam ativos e comprando e gastando cada vez mais, define-se como a importância de um evento o quanto ele demonstra ter colaborado para que o Ifood atinja seus objetivos citados com seus clientes (individual ou coletivamente) ao longo de suas vidas.

**Observações:**
- É sabido que só se tem informações de visitas/acessos que geraram pedidos. Isso faz com que a relação entre visitas e pedidos seja proporcional e, dado o conceito tomado de importância, impossibilita argumentações muito mais específicas.

### 7.1 Attribution Window

Como primeiro ponto de partida, queremos saber o quanto um push influencia em uma compra e o quanto isso varia sob um olhar de diferentes segmentos da base do Ifood. </br></br>
Assim, abaixo montamos uma tabela que associa para todo push recebido a um pedido/sessão em uma janela de até 30min (1800s). Por exemplo, se um push foi enviado as 12h toda sessão que se iniciou até 30min após o horário de envio do push (até 12h30) vai ser atribuido ao push. </br> </br>
Idealmente seria necessário ter a base de sessões que não geraram pedidos para termos uma visão mais completa sobre o quanto o excesso de pushes causa de percepção negativa no usuário.

In [90]:
# Read from Parquet
root_dir = '/dbfs/FileStore/treated'
dbutils.fs.ls(f'{root_dir}')
df_customer_segmentation = spark.read.parquet(f'{root_dir}/df_customer_segmentation.parquet')
df_orders_total_tratado = spark.read.parquet(f'{root_dir}/df_orders_total.parquet')
df_sessions_visits_tratado = spark.read.parquet(f'{root_dir}/df_sessions_visits.parquet')
df_mpf_tratado = spark.read.parquet(f'{root_dir}/df_mpf.parquet')

pushes_received = df_mpf_tratado[df_mpf_tratado['event_name'] == 'received'].select('external_user_id', 'event_time_utc3')
df_orders_and_sessions = join_removing_repeated(df_orders_total_tratado, df_sessions_visits_tratado, 
                                                df_orders_total_tratado.session_id == df_sessions_visits_tratado.session_id, 'left')
app_sessions = df_orders_and_sessions.withColumn('order_timestamp_local', from_utc_timestamp('order_timestamp_local', 'UTC'))\
                                     .withColumn('order_timestamp_local', to_date('order_timestamp_local', 'YYYY-MM-DD'))\
                                     .withColumn('order_timestamp_local_month', month('order_timestamp_local'))\
                                     .select('customer_id'
                                             ,'order_timestamp_local_month'
                                             ,'session_started_at_amsp'
                                             ,'session_ended_at_amsp'
                                             ,'session_started_at_utc0'
                                             ,'session_ended_at_utc0'
                                             ,'customer_seg_marlin_tag'
                                             ,'customer_seg_gross_income_bucket'
                                             ,'customer_seg_benefits_sensitivity_bucket'
                                            )

# Associa toda sessao que foi iniciada até 30min depois de um envio de push
groupedByCommunications = pushes_received.join(app_sessions, [(pushes_received.external_user_id == app_sessions.customer_id),\
                                                              (app_sessions.session_started_at_amsp > pushes_received.event_time_utc3),\
                                                              (f.col("session_started_at_amsp").cast('long') - \
                                                               f.col("event_time_utc3").cast('long')
                                                              < 1800)]
                                               , how = 'left')
# Pushes diarios e pushes diários que geraram pedidos
aggregated = groupedByCommunications.withColumn('push_date', to_date('event_time_utc3', 'YYYY-MM-DD'))\
 .withColumn('push_month', month('push_date'))\
 .groupBy("external_user_id", "push_date", "push_month")\
 .agg(count('event_time_utc3').alias('daily_pushes'),\
      count('session_started_at_amsp').alias('pushes_generated_order')
     )

# get last customer_seg status in a month
window = Window.partitionBy('customer_id','order_timestamp_local_month').\
                             orderBy(col('session_started_at_amsp').desc())
customer_segment_month = app_sessions.withColumn("rn",row_number().over(window).alias('rn'))\
                                     .where(col("rn") == 1)\
                                     .drop("rn")\
                                     .select('customer_id'
                                             ,'order_timestamp_local_month'
                                             ,'customer_seg_marlin_tag'
                                             ,'customer_seg_gross_income_bucket'
                                             ,'customer_seg_benefits_sensitivity_bucket'
                                            )

# Associa os customer_segments (marlin tag, income_bucket e benefits sensitivity do usuario) do mes do push
cond = [aggregated.external_user_id == customer_segment_month.customer_id, aggregated.push_month == customer_segment_month.order_timestamp_local_month]
aggregatedWithSegmentation = aggregated.join(customer_segment_month, cond\
                                             , how = 'left')

Aqui temos uma base que mostra para cada mês e segmentos escolhidos (marlin_tag, benefits_sensitivity) qual foi a efetividade dos pushes numa janela de 30min. </br>
**`Efectiveness_rate`** = `(# Pushes que geraram pedido até 30 min após envio do push)`/`(# Pushes enviados)`

In [92]:
comparisonSegmentationMonthly = aggregatedWithSegmentation\
 .groupBy("push_month","customer_seg_marlin_tag","customer_seg_benefits_sensitivity_bucket")\
 .agg(sum('daily_pushes').alias('pushes'),
      sum('pushes_generated_order').alias('push_attributed_orders')
     )\
 .withColumn("efectiveness_rate", (col('push_attributed_orders')/col('pushes')) *100)\
 .withColumnRenamed('customer_seg_marlin_tag', 'marlin_tag')\
 .withColumnRenamed('customer_seg_benefits_sensitivity_bucket', 'benefits_sensitivity')
display(comparisonSegmentationMonthly)

push_month,marlin_tag,benefits_sensitivity,pushes,push_attributed_orders,efectiveness_rate
9,2. Tilapia,Baixa,31721,82,0.258503830270168
12,1. Marlin,Alta,149974,2937,1.958339445503888
12,unknown,Alta,402687,4618,1.1467963952151423
9,4. Retention Carp,Media,22407,78,0.348105502744678
6,4. Retention Carp,Alta,251285,1175,0.4675965537139104
7,1. Marlin,Media,150849,1198,0.7941716550988074
10,3. Subsidy Carp,Media,1085,5,0.4608294930875576
8,4. Retention Carp,Alta,88990,385,0.4326328800988875
12,,,495045,0,0.0
9,1. Marlin,Media,172691,1157,0.6699828016515047


In [93]:
df = comparisonSegmentationMonthly.toPandas()

In [94]:
# Removing pushes of users that were inactive in the respective month
sensitivity_bucket = ['Baixa', 'Alta', 'Media']
marlin_tag = ['2. Tilapia','1. Marlin', '4. Retention Carp','3. Subsidy Carp']
df_filtered = df[(df['benefits_sensitivity'].isin(sensitivity_bucket)) &\
                  (df['marlin_tag'].isin(marlin_tag))]

O heatmap abaixo mostra o percentual de sessões/pedidos que foram realizado após 30 min de um envio de push. </br>Podemos ver que alguns grupos tem maior sensibilidade ao envio dos pushes. Destaque para `3. Subsidy Carp` com sensibilidade a benefícios `alta`, *as expected*. </br>
Interessante observar o efeito de sazonalidade nos meses de `Novembro` e `Dezembro` em que tiveram altas no índice de efetividade do push chegando a `6.4%` da base `1 Marlin` com sensibilidade a benefícios `Média`.

In [96]:
pt = pd.pivot_table(df_filtered, values='efectiveness_rate', 
                    index=['marlin_tag','benefits_sensitivity'],
                    columns=['push_month'])
fig, ax = plt.subplots(figsize=(10,10))
ax = plt.axes()
ax = sns.heatmap(pt, cmap="YlGnBu", annot=True, linewidths=.5, fmt='.2g', ax=ax)
ax.set_title('Percentual de Pushes que geraram pedidos numa janela de 30min',fontsize=16, pad=15)
plt.show()

Vale entender o que aconteceu nos meses de Novembro e Dezembro e o quanto isso afetou as receitas/lucros do Ifood.

### 7.2 EDA

#### 7.2.1 Análise de evolução

O objetivo dessa etapa é identificar o comportamento das variáveis ao longo do tempo e, através de como elas se relacionam, extrair insights e hipóteses que serão úteis para responder a pergunta do problema.

In [100]:
df_desc = spark.table("df_final_200609")

df_desc_filtro = df_desc.select(col('customer_id')
                                ,col('segmentation_month_month').alias('segmentation_month')
                                ,col('ifood_status')
                                ,col('ifood_status_last_month')
                                ,col('marlin_tag')
                                ,col('last_nps').alias('nps')
                                ,col('number_of_orders').alias('orders') # Compras
                                ,col('sum_order_total').alias('payments') # Pagamento
                                ,col('sum_promo_is_promotion').alias('promotional_orders')
                                ,col('sum_normal_items_quantity')
                                ,col('sum_promo_items_quantity')
                                ,col('sum_general_net_profit').alias('profit')
                                ,col('sum_credit').alias('payment_discount')
                                ,col('pushes_count_distinct_campaign_name').alias('campaingns') # Campanhas
                                ,col('pushes_count_distinct_event_time_utc3').alias('pushes') # Pushs
                                ,col('sum_sum_event_open').alias('visits') # Acessos
                                                  )

In [101]:
charts_list = [['orders', 'pushes']
              ,['orders', 'payments']
              ,['orders', 'campaingns']
              ,['profit', 'payment_discount']
              ,['orders', 'visits']
              ,['visits', 'campaingns']
              ,['visits', 'pushes']]

timeline_plot_sum(charts_list)

**Highlights**:
1. O total de pedidos por mês foi caindo com o passar dos meses;
2. O total de pushes e campanhas mensal está aumentando com o passar dos meses;
3. O total de payments, o montante gasto, foi caindo com o passar dos meses.
4. Verifica-se, como antecipado, que o conceito de visitas está estritamente relacionado com o conceito de orders, uma vez que só se tem os dados das visitas que se tornaram pedidos.

**Insights**:
1. Dado o highlight 1, o número de pessoas fazendo pedidos pode estar proporcionalmente diminuindo com o passar dos meses;

In [103]:
charts_list = [['orders', 'pushes']
              ,['orders', 'payments']
              ,['orders', 'campaingns']
              ,['profit', 'payment_discount']
              ,['orders', 'payment_discount']
              ,['orders', 'visits']
              ,['visits', 'campaingns']
              ,['visits', 'pushes']]

timeline_plot_avg(charts_list)

**Highlights**:
1. A quantidade média de pedidos por usuário está aumentando;
2. A quantidade média de pushes e campanhas por usuário está aumentando com o passar dos meses;
3. O receita mensal média por usuário (payments) está aumentando.

**Insights**:
1. O montante total de pedidos está diminuindo, mas a quantidade média de pedidos está aumentando. Portanto, o cenário que se desenha é o de menos pessoas comprando, porém, quem está comprando, está comprando mais em quantidade. Mas o que está gerando isso? 

**Hipóteses**: 
1. usuário naturalmente está se engajando mais com o Ifood (fora de escopo, validar com pesquisa)
2. migração de serviço (ex. troca do Uber Eats para Ifood)  (fora de escopo, validar com pesquisa)
3. Usuário está recebendo mais promoções e incentivos. Como isso afeta a margem de lucro? E receita?

In [105]:
charts_list = [['orders', 'orders']
              ,['payments', 'payments']
              ,['campaingns', 'campaingns']
              ,['profit', 'profit']
              ,['payment_discount', 'payment_discount']
              ,['visits', 'visits']
              ,['pushes', 'pushes']]

timeline_plot_mixed(charts_list)

In [106]:
df_desc_filtro_aux = df_desc_filtro.filter((df_desc_filtro.orders > 0) & (df_desc_filtro.payments > 0))
df_ticket = df_desc_filtro_aux.groupby('segmentation_month').agg(sum('orders').alias('orders'),sum('payments').alias('payments'),count('customer_id').alias('customers'))
df_ticket = df_ticket.withColumn('ticket_medio',col('payments')/col('orders'))
display(df_ticket.orderBy('segmentation_month'))

segmentation_month,orders,payments,customers,ticket_medio
6,88052,4882892.110000031,29726,55.45464168900231
7,69838,3829942.490000008,18411,54.84038045190309
8,67281,3753502.740000007,17301,55.78845052838108
9,65700,3654919.8700000057,16839,55.63043942161348
10,66549,3689285.3200000087,16606,55.437126327969
11,69398,3894036.630000004,16761,56.111654946828494
12,64588,3730970.560000005,16250,57.765692698334135


#### 7.2.2 Validação de idéias e insights

In [108]:
df_desc_filtro = df_desc_filtro.fillna(0, subset=['payments'])
df_desc_filtro = df_desc_filtro.fillna(0, subset=['promotional_orders'])
df_desc_filtro = df_desc_filtro.fillna(0, subset=['orders'])


df_desc_filtro = df_desc_filtro.withColumn('payments_last_month',
                  f.lag(df_desc_filtro['payments'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy("segmentation_month")))

df_desc_filtro = df_desc_filtro.withColumn('marlin_tag_last_month',
                  f.lag(df_desc_filtro['marlin_tag'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy("segmentation_month")))

df_desc_filtro = df_desc_filtro.withColumn('ifood_status_group',\
                                           when(df_desc_filtro.ifood_status == 'Inactive','Inactive').when(df_desc_filtro.ifood_status == 'Churn','Inactive').otherwise('Active'))

df_desc_filtro = df_desc_filtro.withColumn('marlin_tag_group',\
                                           when(df_desc_filtro.marlin_tag == '1. Marlin','Marlin').otherwise('Other'))

df_desc_filtro = df_desc_filtro.withColumn('ifood_status_group_next_month',
                  f.lag(df_desc_filtro['ifood_status_group'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month"))))

df_desc_filtro = df_desc_filtro.withColumn('nps_next_month',
                  f.lag(df_desc_filtro['nps'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month"))))

df_desc_filtro = df_desc_filtro.withColumn('normal_orders',\
                                           col('orders') - col('promotional_orders'))

##### Ifood Status

Foi levantada a hipótese de que a quantidade de pessoas comprando está diminuindo e o primeiro gráfico abaixo comprova isso, dado que **vê-se a proporção de pessoas Inativas aumentar com o passar dos meses**.

Já o segundo e terceiro gráficos abaixo mostram que a quantidade de pushes e campanhas recebidas pelos ativos é maior para ativos do que para inativos. Isso parece fazer sentido quando o objetivo é fazer com que as pessoas se mantenham ativas. Por outro lado, mesmo não entrando no mérito da quantidade ideal de pushes/campanhas, faz sentido o Ifood querer que os inativos voltem a ser ativos. De qualquer maneira, nesse caso, isso parece estar relacionado com decisões de negócio do Ifood e não diretamente com a vida do cliente.

Por fim, mas definitivamente não menos importante, omparando quem é ativos no mês atual e que se tornarão inativos no mês seguinte com quem ativo no atual e se mantém ativo no mês seguinte, vemos que o primeiro grupo (curva em laranja) recebe, em média, menos pushes que o segundo grupo em todos os meses. Isso é, os **clientes ativos que permanecem ativos rebecem mais pushes que os que decidem dar Churn**.

In [110]:
display(df_desc_filtro.groupby('ifood_status_group','segmentation_month').count().orderBy('segmentation_month'))

ifood_status_group,segmentation_month,count
Active,6,29282
Inactive,6,444
Active,7,18253
Inactive,7,11710
Active,8,17018
Inactive,8,12945
Inactive,9,13245
Active,9,16718
Inactive,10,13357
Active,10,16606


In [111]:
display(df_desc_filtro.groupby('ifood_status_group','segmentation_month').agg(avg('pushes')).orderBy('segmentation_month'))

ifood_status_group,segmentation_month,avg(pushes)
Inactive,6,34.86479591836735
Active,6,46.434839141095175
Active,7,73.35063703703703
Inactive,7,54.12698587819947
Active,8,56.26104981185024
Inactive,8,27.12707182320442
Inactive,9,29.773767505408173
Active,9,58.62583102046085
Active,10,78.48642136112015
Inactive,10,43.31853020739405


In [112]:
display(df_desc_filtro.groupby('ifood_status_group','segmentation_month').agg(avg('campaingns')).orderBy('segmentation_month'))

ifood_status_group,segmentation_month,avg(campaingns)
Inactive,6,31.742346938775512
Active,6,36.33096812541793
Inactive,7,48.50794351279788
Active,7,55.05232592592593
Active,8,37.55284138019006
Inactive,8,23.135244014732965
Inactive,9,25.77536149379483
Active,9,43.07603433808817
Inactive,10,39.61451758340848
Active,10,64.57225659394334


In [113]:
data_1 = df_desc_filtro.filter((df_desc_filtro.ifood_status_group != 'Inactive') & (df_desc_filtro.ifood_status_group_next_month == 'Inactive')).groupby('segmentation_month').agg(avg('pushes').alias('pushes_active_inactive')).orderBy('segmentation_month')
data_2 = df_desc_filtro.filter((df_desc_filtro.ifood_status_group != 'Inactive') & (df_desc_filtro.ifood_status_group_next_month != 'Inactive')).groupby('segmentation_month').agg(avg('pushes').alias('pushes_active_active')).orderBy('segmentation_month')

df_aux_status = join_removing_repeated(data_1,data_2,data_1.segmentation_month == data_2.segmentation_month,'left')

display(df_aux_status)

pushes_active_inactive,segmentation_month,pushes_active_active
36.53080111828786,6,52.64424297370807
63.16398865784499,7,76.7604207862058
45.6720741599073,8,59.25059295002863
46.32355665328805,9,61.87759099069692
62.18986938515451,10,82.67395219384414
58.71873985060085,11,70.55897815015607


**Promotions**

Foi levantada a hipótese de que o aumento na quantidade média e total de campanhas/pushes poderia estar contribuindo para a diminuição total e média de payments. Vê-se que **a proporção de pedidos promocionais (promotional_orders) aumentou 7p.p (25%) com o passar do tempo**.

In [115]:
timeline_plot_avg([['promotional_orders','normal_orders']])

In [116]:
display(df_desc_filtro.groupby('segmentation_month').agg(sum('promotional_orders').alias('promotional_orders'),sum('normal_orders').alias('normal_orders')).orderBy('segmentation_month'))

segmentation_month,promotional_orders,normal_orders
6,24231.0,63821.0
7,20510.0,49328.0
8,21882.0,45399.0
9,22539.0,43161.0
10,23022.0,43527.0
11,24613.0,44785.0
12,21778.0,42810.0


**Marlin Tag**

Dado o conceito de que os Marlins são os melhores clientes, decidiu-se testar se eles fazem mais pedidos e gastam mais que os demais para justificar a tag recebida. E sim, os dois primeiros gráficos abaixo evidenciam justamente isso. Já o terceito gráfico mostra que, **além de gastar e pedir mais, os Marlin também recebem mais pushes/campanhas**.

Não há evidências para afirmar se são esses pushes que os incentivam a comprar/gastar mais, ou se eles gastarem/comprarem mais é que influencia o Ifood a enviar mais pushes e campanhas para eles.

In [118]:
display(df_desc_filtro.groupby('marlin_tag_group','segmentation_month').agg(avg('orders')).orderBy('segmentation_month'))

marlin_tag_group,segmentation_month,avg(orders)
Marlin,6,6.294819949932601
Other,6,2.2566746830799334
Other,7,1.4858355119358848
Marlin,7,5.304524886877828
Marlin,8,5.050147856086742
Other,8,1.203551975099556
Marlin,9,4.583664090244412
Other,9,1.0699887194075235
Other,10,1.0411755930180515
Marlin,10,4.628780190785468


In [119]:
display(df_desc_filtro.groupby('marlin_tag_group','segmentation_month').agg(avg('payments')).orderBy('segmentation_month'))

marlin_tag_group,segmentation_month,avg(payments)
Other,6,118.13597399421198
Marlin,6,382.1802927017137
Other,7,74.33763596622781
Marlin,7,316.0516485671189
Other,8,59.74296837094322
Marlin,8,301.6631481025136
Marlin,9,271.5816868602465
Other,9,51.73362107018482
Marlin,10,272.91229247006305
Other,10,49.72935451787747


In [120]:
display(df_desc_filtro.groupby('marlin_tag_group','segmentation_month').agg(avg('pushes')).orderBy('segmentation_month'))

marlin_tag_group,segmentation_month,avg(pushes)
Other,6,43.72164402892926
Marlin,6,58.73284419507984
Marlin,7,75.73600271462504
Other,7,63.95664754302819
Marlin,8,58.40099972229936
Other,8,40.61712787649287
Marlin,9,60.25962104272096
Other,9,41.68622853159263
Other,10,56.93451304572462
Marlin,10,80.96843188836783


**NPS**

Comparando quem é classificado como promotor no mês atual e detrator no mês seguinte com quem é promotor no atual e se mantém promotor no mês seguinte, vemos que o primeiro grupo (curva em azul) recebe, em média, mais pushes que o segundo grupo em praticamente todos os meses. Isso é, **receber pushes não parece ter feito com que os clientes ficassem mais felizes, pelo contrário**.

In [122]:
data_1 = df_desc_filtro.filter((df_desc_filtro.nps != 'Sem Avaliacoes') & (df_desc_filtro.nps != 'Detractor')& (df_desc_filtro.nps_next_month == 'Detractor')).groupby('segmentation_month').agg(avg('pushes').alias('pushes_promoter_detractor')).orderBy('segmentation_month')
data_2 = df_desc_filtro.filter((df_desc_filtro.nps != 'Sem Avaliacoes') & (df_desc_filtro.nps != 'Detractor') & (df_desc_filtro.nps_next_month != 'Detractor') & (df_desc_filtro.nps_next_month != 'Sem Avaliacoes')).groupby('segmentation_month').agg(avg('pushes').alias('pushes_promoter_promoter')).orderBy('segmentation_month')

df_aux_nps = join_removing_repeated(data_1,data_2,data_1.segmentation_month == data_2.segmentation_month,'left')

display(df_aux_nps)

pushes_promoter_detractor,segmentation_month,pushes_promoter_promoter
53.13461538461539,6,49.19781410893325
73.34482758620689,7,68.25280952838344
52.57303370786517,8,49.63928029481899
58.025,9,51.573493126542125
69.75342465753425,10,70.35210571816722
68.39344262295081,11,63.49813941949888


### 7.3 Respostas e conclusões

**Introdução**: Como destacado inicialmente, o conceito de importância não está definido na pergunta, então buscamos formas de identificar como os eventos se relacionam e quais as principais argumentações que poderiam ser propostas para o conceito de importância que tomamos. Vale evidenciar também que não foram realizados testes estatísticos/matemáticos para testar/validar as hipóteses levantadas dado que isso, de fato, não é o que nos propusemos a trazer com essa análise, além das limitações relacionadas aos dados disponíveis.

**Principais Insights**:
- O total de pedidos por mês foi caindo com o passar dos meses porém a quantidade média de medidos por usuário aumentou. Ou seja, com o passar do tempo os que compram acabam fazendo mais pedidos.
- Podemos observar um comportamento semelhante em receita: apesar do total de payments e o montante gasto cair com o passar dos meses, a receita mensal média por usuário aumentou. Porém, o ticket médio se manteve constante. Ou seja, o aumento de receita ocorre pelo aumento de pedidos médios do usuário no mês.
- Na perspectiva de pushes, o total de pushes e campanhas mensal (total e médio) está aumentando com o passar dos meses. Além disso, o usuário também está recebendo mais promoções e incentivos.
- Olhando pela taxa de efetividade do push (percentual de pushes que geraram um pedido em uma janela de até 30min), vemos que em junho a taxa é maior e com o passar dos meses ela cai. Há uma recuperação de outubro para frente.
- A partir de outubro vemos um aumento de mais +33% no total e na média de pushes mensais. Além disso, em novembro e dezembro há um aumento no total e na média de desconto no mês (de ~R$420k para ~R$440k, ou ~R$25 para quase R$27 de desconto/mês).
- Em termos de lucro, isso se refletiu positivamente em novembro, com uma média de lucro por cliente de ~+R$32. Já em dezembro, temos a máxima histórica de pushes (+1.8M, média de 75 pushes/mês). A curva de lucro acompanhou novembro pois o mês teve maior quantidade de inativos (46%), uma média de pedidos/usuário menor (de 4.2 para 4) e um desconto médio maior (~R$27).



**Apêndice**:
- É possível também ver para todos os meses que usuários que se tornaram inativos no mês seguinte, receberam em média menos pushes no mês atual.
- Isso implica que devemos enviar mais pushes? Não necessariamente, pois usuários que se tornaram detratores receberam em média mais pushes no mês atual. Há um trade-off em relação a percepção de marca e serviço (clientes se tornam detratores) e o churn.

## 8. [PROBLEMA 3] Prever a valor total de pedidos por cliente no mês seguinte.

### 8.1 Pipeline

In [127]:
df_final = spark.table("df_final_200609")

# Cria a Target com 1 para cliente que foi Churn. 0 caso contrário
df_final = df_final.withColumn('target_current',when(df_final.ifood_status == 'Churn',1).otherwise(0))

df_final = df_final.withColumn('target',
                  f.lag(df_final['target_current'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month_month"))))

# Para o lagging do problema 3, o churn/inativo do mês seguinte significa que o cliente nao fez nenhum pedido. Por exemplo, quem ficou inativo/churn tem sum_order_total null. Nesse caso, queremos prever que esse usuário vai ter um sum_order_total de zero.
df_final = df_final.fillna(0, subset=['sum_order_total'])
df_final = df_final.withColumn('target_3',
                  f.lag(df_final['sum_order_total'])
                   .over(Window.partitionBy("customer_id")
                   .orderBy(desc("segmentation_month_month"))))

# df_final = df_final.fillna(0, subset=['number_of_orders'])
# df_final = df_final.withColumn('target_3',
#                   f.lag(df_final['number_of_orders'])
#                    .over(Window.partitionBy("customer_id")
#                    .orderBy(desc("segmentation_month_month"))))

df_final = df_final.filter(~df_final.ifood_status.isin(['Churn', 'Inactive'])) # como o objetivo é prever churn, mantemos somente quem pode dar churn

# PREPARATION:
# Listas de variaveis por tipo: categorico, numerico e data
categorical_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] not in ('double', 'long', 'int', 'bigint', 'date')]).columns
numerical_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] in ('double', 'long', 'int', 'bigint')]).columns
date_columns = df_final.select(*[x[0] for x in df_final.dtypes if x[1] in ('date')]).columns

# Exclusão de colunas com dados referentes ao mês.
excluded_columns = [
  'last_invalid_order_date'
  ,'preferred_shift_bucket_description' # redundante pois preferred_shift_bucket foi tratado
  ,'external_user_id' # chave da tabela de pushes
  ,'event_month' # chave da tabela de pushes
  ,'days_to_reorder_at_concluded' # variavel da tabela de segmentation que optamos por nao usar
  ,'days_to_reorder_at_datasource' # variavel da tabela de segmentation que optamos por nao usar
  ,'registration_month'
  ,'registration_dayofweek'
  ,'first_order_month'
  ,'first_order_dayofweek'
  ,'segmentation_month_dayofweek'
  ,'segmentation_month' # redundante
  ,'last_order_month'
  ,'last_order_dayofweek'
  ,'order_timestamp_local_month' # chave do join da base de order
  ,'count_distinct_event_dayofweek' # nao tem uma interpretacao. Um 7 diz somente se um usuario recebeu num mes pushes todos os dias da semana
  ,'event_month' # chave do join da base de pushes
  ,'pushes_count_distinct_event_time_utc3' # timestamp da base de pushes
  ,'sum_distance_merchant_customer' # from Orders+Visits: Nulo não faz sentido
  ,'avg_distance_merchant_customer'
  ,'most_common_platform'
  ,'most_common_merchant_dish_type'
  ,'most_common_customer_state_label'
]

included_categorical = [
 'marlin_tag',
 'last_nps',
 'benefits_sensitivity_bucket',
 'merchant_variety_bucket',
 'most_common_order_shift',
 'most_common_delivery_type',
 'most_common_device_platform',
 'most_common_payment_method',
 'most_common_customer_seg_recency_bucket',
 'most_common_customer_seg_merchant_offer_bucket',
 'most_common_customer_seg_benefits_sensitivity_bucket',
 'most_common_customer_seg_frequency_bucket',
 'most_common_customer_seg_gross_income_bucket'
]

excluded_columns = list(set(excluded_columns + date_columns + categorical_columns))
included_columns = list(set(df_final.columns) - set(excluded_columns)) + ['customer_id'] + included_categorical

df_filtrado = df_final[included_columns]

# Inputar zero para colunas numericas de pushes (clientes ativos que nao receberam push no mes) [~10050]
pushes_fillna_list = [
  'pushes_changed_platform'
  ,'pushes_count_distinct_event_date'
  ,'pushes_count_distinct_campaign_name'
]
for coluna in pushes_fillna_list:
  df_filtrado = df_filtrado.fillna(0, subset=[coluna])

# Drop em variaveis relacionadas a pedidos/sessões de clientes que nao fizeram compras (~3903)
df_filtrado = df_filtrado.dropna()

In [128]:
# df_filtrado_200610: target_3 is sum_order_total next month
# df_filtrado_200610b: target_3 is number_of_orders next month
# df_filtrado_200614: version with state and favorite dishes.
df_filtrado.write.saveAsTable('df_filtrado_200614')

In [129]:
included_categorical = [
 'marlin_tag',
 'last_nps',
 'benefits_sensitivity_bucket',
 'merchant_variety_bucket',
 'most_common_order_shift',
 'most_common_delivery_type',
 'most_common_device_platform',
 'most_common_payment_method',
 'most_common_customer_seg_recency_bucket',
 'most_common_customer_seg_merchant_offer_bucket',
 'most_common_customer_seg_benefits_sensitivity_bucket',
 'most_common_customer_seg_frequency_bucket',
 'most_common_customer_seg_gross_income_bucket'
]

In [130]:
df_filtrado = spark.table("df_filtrado_200610")

In [131]:
stop_aux = 4
count_aux = 1
#excluido_count = 999999

#while (count_aux <= stop_aux) | (excluido_count == 0):
while count_aux <= stop_aux:
  
  count_aux = count_aux + 1
  
  resumo = df_filtrado.select('target_3').summary().collect()
  q1 = float(resumo[4][1])
  q3 = float(resumo[6][1])

  lim_inf = q1 - 1.5 * (q3 - q1)
  lim_sup = q3 + 1.5 * (q3 - q1)

  df_filtrado = df_filtrado.filter(
    (df_filtrado['target_3'] < lim_sup) &
    (df_filtrado['target_3'] > lim_inf)
  )

  resumo = df_filtrado.select('sum_order_total').summary().collect()
  q1 = float(resumo[4][1])
  q3 = float(resumo[6][1])

  lim_inf = q1 - 1.5 * (q3 - q1)
  lim_sup = q3 + 1.5 * (q3 - q1)

  df_filtrado = df_filtrado.filter(
    (df_filtrado['sum_order_total'] < lim_sup) &
    (df_filtrado['sum_order_total'] > lim_inf)
  )

In [132]:
df_filtrado.count()

In [133]:
# PIPELINE BUILDING:
# Based on https://gist.github.com/colbyford/83978917799dbcab6293521a60f29e94

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler, MinMaxScaler, Normalizer

#df_filtrado = spark.table("df_filtrado_200610")

categoricalColumns = included_categorical

numericalColumns = list(set(df_filtrado.columns) - set(['customer_id','segmentation_month_month','ifood_status','ifood_status_last_month','target','target_3']) - set(categoricalColumns))

categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

stages = []

for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

# Convert label into label indices using the StringIndexer
#label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
#stages += [label_stringIndexer]

# Transform all features into a vector using VectorAssembler
assemblerInputs = categoricalColumnsclassVec + numericalColumns 
# assembler only considers 'classVec' columns (it already did not consider stringIndexer)
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")
# assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="featuresAssembled")
stages += [assembler]

prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(df_filtrado)
dataset = pipelineModel.transform(df_filtrado)

In [134]:
# dataset_churn_200610: target_3 is sum_order_total next month
# dataset_churn_200610b: target_3 is number_of_orders next month
# dataset_churn_200614: version with states and favorite dishes
# dataset_churn_200617_sem_outliers: version without states and favorite dishes, removing outliers
dataset.write.saveAsTable('dataset_churn_200617_sem_outliers')

### 8.2 Modelagem

In [136]:
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers").repartition(2).cache()

#### 8.2.1 Regressão Logística

In [138]:
# TREINAMENTO E TESTE DO MODELO:
from pyspark.ml.regression import LinearRegression

treino, teste = dataset.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(labelCol=target)

modelo_lr = lr.fit(treino)

print('==============================')
print('Métricas no conjunto de TREINO')
print('==============================')
#print(f'Coeficientes: {modelo_lr.coefficients}')
#print(f'pValues: {modelo.summary.pValues}')
print(f'Intercepto: {modelo_lr.intercept}')
print(f'MAE: {modelo_lr.summary.meanAbsoluteError}')
print(f'RMSE: {modelo_lr.summary.rootMeanSquaredError}')
print(f'r2 Ajustado: {modelo_lr.summary.r2adj}')

predictions_lr = modelo_lr.transform(treino)

print('=============================')
print('Métricas no conjunto de TESTE')
print('=============================')
resultado_teste = modelo_lr.evaluate(teste)

print(f'MAE: {resultado_teste.meanAbsoluteError}')
print(f'RMSE: {resultado_teste.rootMeanSquaredError}')
print(f'r2 Ajustado: {resultado_teste.r2adj}')

predictions_lr = modelo_lr.transform(teste)
#mape = compute_mape(predicoes, 'number_of_orders')
#print(f'MAPE: {mape}')

In [139]:
mape = compute_mape(predictions_lr, y_true=target, y_pred='prediction')
print(f'MAPE: {mape}')

Com outliers </br>
MAE: 107.17590288465296</br>
RMSE: 168.8219963148754</br>
r2 Ajustado: 0.6077634317813403</br>

In [141]:
reg_model_evaluator(predictions_lr, labelCol=target)

#### 8.2.2 Decision Tree Regressor

In [143]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.util import MLUtils

treino, teste = dataset.randomSplit([0.8, 0.2], seed=42)

dt = DecisionTreeRegressor(labelCol=target)

modelo_dt = dt.fit(treino)

print('==============================')
print('Métricas no conjunto de TREINO')
print('==============================')

predictions_dt = modelo_dt.transform(treino)

reg_model_evaluator(predictions_dt, labelCol=target)

print(modelo_dt)

print('=============================')
print('Métricas no conjunto de TESTE')
print('=============================')

predictions_dt = modelo_dt.transform(teste)

# Exemplos de predições feitas:
predictions_dt.select("prediction", target, "features").show(5)

reg_model_evaluator(predictions_dt, labelCol=target)

Sem outliers:</br>
MAE: 112.54892549073273</br>
RMSE: 181.87758983896944</br>
r2 Ajustado: 0.5468231792861152

In [145]:
ExtractFeatureImp(modelo_dt.featureImportances, predictions_dt, "features").head(10)

Unnamed: 0,idx,name,score
7,53,orders_last_91d,0.541936
47,93,sum_paid_amount,0.176984
10,56,qtt_orders_last_year,0.161193
48,94,sum_order_total,0.052837
12,58,qtt_valid_orders,0.047376
26,72,avg_aov_last_91d,0.009982
3,49,recency_days,0.009691
71,16,most_common_order_shiftclassVec_weekend dawn,0.0
68,13,most_common_order_shiftclassVec_weekday lunch,0.0
69,14,most_common_order_shiftclassVec_weekend lunch,0.0


#### 8.2.3 Random Forest Regressor

In [147]:
from pyspark.ml.regression import RandomForestRegressor

treino, teste = dataset.randomSplit([0.8, 0.2], seed=42)

rf = RandomForestRegressor(labelCol=target)

modelo_rf = rf.fit(treino)

print('==============================')

print('Métricas no conjunto de TREINO')
print('==============================')

predictions_rf = modelo_rf.transform(treino)

reg_model_evaluator(predictions_rf, labelCol=target)

print(modelo_rf)

print('=============================')
print('Métricas no conjunto de TESTE')
print('=============================')

predictions_rf = modelo_rf.transform(teste)

reg_model_evaluator(predictions_rf, labelCol=target)

# Exemplos de predições feitas:
predictions_rf.select("prediction", target, "features").show(5)
print(modelo_rf)

In [148]:
ExtractFeatureImp(modelo_rf.featureImportances, predictions_rf, "features").head(10)

Unnamed: 0,idx,name,score
10,56,qtt_orders_last_year,0.229773
7,53,orders_last_91d,0.174459
48,94,sum_order_total,0.139125
47,93,sum_paid_amount,0.127746
38,84,freq_last_91d,0.077298
12,58,qtt_valid_orders,0.055115
18,64,maturity_orders,0.050119
35,81,sum_valid_order,0.031323
33,79,number_of_orders,0.030156
27,73,sum_general_net_profit,0.019965


#### 8.2.4 Gradient-boosted Tree Regression

In [150]:
from pyspark.ml.regression import GBTRegressor

treino, teste = dataset.randomSplit([0.8, 0.2], seed=42)

gbt = GBTRegressor(labelCol=target, maxIter=10)

modelo_gbt = gbt.fit(treino)

print('==============================')
print('Métricas no conjunto de TREINO')
print('==============================')

predictions_gbt = modelo_gbt.transform(treino)

reg_model_evaluator(predictions_gbt, labelCol=target)

print(modelo_gbt)

print('=============================')
print('Métricas no conjunto de TESTE')
print('=============================')

predictions_gbt = modelo_gbt.transform(teste)

# Exemplos de predições feitas:
predictions_gbt.select("prediction", target, "features").show(5)

reg_model_evaluator(predictions_gbt, labelCol=target)

print(modelo_gbt)

In [151]:
ExtractFeatureImp(modelo_gbt.featureImportances, predictions_gbt, "features").head(10)

Unnamed: 0,idx,name,score
3,49,recency_days,0.127832
26,72,avg_aov_last_91d,0.112334
7,53,orders_last_91d,0.091387
47,93,sum_paid_amount,0.041
48,94,sum_order_total,0.040154
10,56,qtt_orders_last_year,0.040108
31,77,avg_sum_view_checkout,0.036223
38,84,freq_last_91d,0.034188
27,73,sum_general_net_profit,0.03017
25,71,avg_paid_amount,0.030046


#### 8.2.5 Generalize Linear Regression

In [153]:
from pyspark.ml.regression import GeneralizedLinearRegression

treino, teste = dataset.randomSplit([0.8, 0.2], seed=42)

glr = GeneralizedLinearRegression(labelCol=target, family="gaussian", link="identity", maxIter=10, regParam=0.3)

modelo_glr = glr.fit(treino)

print('==============================')
print('Métricas no conjunto de TREINO')
print('==============================')

predictions_glr = modelo_glr.transform(treino)

reg_model_evaluator(predictions_glr, labelCol=target)

print(modelo_glr)

print('=============================')
print('Métricas no conjunto de TESTE')
print('=============================')

predictions_glr = modelo_glr.transform(teste)

# Exemplos de predições feitas:
predictions_glr.select("prediction", target, "features").show(5)

reg_model_evaluator(predictions_glr, labelCol=target)

print(modelo_glr)

#### 8.2.6 Comparação dos Modelos

In [155]:
print('Métricas Linear Regression')
print('=============================')
reg_model_evaluator(predictions_lr, labelCol=target)

print('Métricas Decision Tree')
print('=============================')
reg_model_evaluator(predictions_dt, labelCol=target)

print('Métricas Random Forest')
print('=============================')
reg_model_evaluator(predictions_rf, labelCol=target)

print('Métricas Gradient Boosted Tree')
print('=============================')
reg_model_evaluator(predictions_gbt, labelCol=target)

print('Métricas Generalized Linear Regression')
print('=============================')
reg_model_evaluator(predictions_glr, labelCol=target)

### 8.3 Feature Selection using Random Forest

In [157]:
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers").repartition(2).cache()
dataset.count()

In [158]:
# Reference: http://people.stat.sc.edu/haigang/improvement.html
# https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
# Random Forest
from pyspark.ml.regression import RandomForestRegressor

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
modelo_rf = RandomForestRegressor(labelCol=target, featuresCol="features")

# ParamGrid para Cross Validation
# DecisionTree currently only supports maxDepth &lt;= 30, but was given maxDepth = 40
paramGrid = ParamGridBuilder()\
               .addGrid(modelo_rf.maxDepth, [15])\
               .addGrid(modelo_rf.maxBins, [32])\
               .addGrid(modelo_rf.numTrees, [20])\
               .build()

# Evaluate Model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=target, metricName='rmse')

# Cria um 5-fold CrossValidator
crossval = CrossValidator(estimator=modelo_rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Deve demorar ~4 min
import time
ti = time.time()
# Roda cross validations
cvModelo = crossval.fit(treino)
tf = time.time()
print("Demorou {} segundos".format(tf - ti))

melhorModelo_rf = cvModelo.bestModel
cv_predicoes_rf = melhorModelo_rf.transform(teste)

In [159]:
# Removendo versao antiga do modelo salvo
modelpath = "/dbfs/FileStore/models/model_problema3_feature_selection_melhorModelo_rf"
dbutils.fs.rm(modelpath, True)

In [160]:
modelpath = "/dbfs/FileStore/models/model_problema3_feature_selection_melhorModelo_rf"
melhorModelo_rf.write().overwrite().save(modelpath)

In [161]:
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers").repartition(2).cache()
treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)
modelpath = "/dbfs/FileStore/models/model_problema3_feature_selection_melhorModelo_rf"
saved_melhorModelo_rf = RandomForestRegressionModel.load(modelpath)
cv_predicoes_rf = saved_melhorModelo_rf.transform(teste)

In [162]:
# https://stackoverflow.com/questions/42549200/how-to-get-all-parameters-of-estimator-in-pyspark
{param[0].name: param[1] for param in saved_melhorModelo_rf.extractParamMap().items()}

In [163]:
varlist = ExtractFeatureImp(saved_melhorModelo_rf.featureImportances, cv_predicoes_rf, "features")
varidx = [x for x in varlist['idx'][0:100]]

varlist[varlist['score'] > 0].count()

In [164]:
# Plot feature importances (Para slide)
import seaborn as sns

fig=plt.figure(figsize=[10,5])
ax=fig.add_subplot(111)
ax.set_title("Top 10 Features Random Forest")
ax = sns.barplot(x='score', y='name',data=varlist.head(10), color=(0.2, 0.4, 0.6, 0.6))
plt.xticks(rotation=0) 
plt.show()

### 8.4 Hyperparameters and Cross Validation

#### 8.4.1 Generalized Linear Regression

In [167]:
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers")#.repartition(2).cache()

In [168]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorSlicer

vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modeloGLR = GeneralizedLinearRegression(featuresCol='features_subset', labelCol=target)

paramGrid = ParamGridBuilder() \
    .addGrid(modeloGLR.family, ['gaussian'])\
    .addGrid(modeloGLR.link, ['identity'])\
    .addGrid(modeloGLR.maxIter, [25,40])\
    .addGrid(modeloGLR.regParam, [0.1])\
    .build()
#, 'gamma', 'poisson'])\
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=target, metricName='mae')

crossval = CrossValidator(estimator=modeloGLR,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Deve demorar ~4 min
import time
ti = time.time()
cvModelo = crossval.fit(treino_subset)
tf = time.time()
print("Demorou {} segundos".format(tf - ti))

melhorModelo = cvModelo.bestModel

cvPrevisoes_GLR = melhorModelo.transform(teste_subset)
reg_model_evaluator(cvPrevisoes_GLR, labelCol=target)

In [169]:
{param[0].name: param[1] for param in melhorModelo.extractParamMap().items()}

In [170]:
# Removendo versao antiga do modelo salvo
modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GLR_v1"
dbutils.fs.rm(modelpath, True)

In [171]:
# model_problema3_cvModelo_GLR_v1: com RF como método de FeatureSelction; sem estado/favorite_dishes.
modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GLR_v2"
cvModelo.write().overwrite().save(modelpath)

In [172]:
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers")
treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)

vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GLR_v2"
saved_cvModelo = CrossValidatorModel.load(modelpath)
melhorModelo = saved_cvModelo.bestModel
cvPrevisoes_GLR = melhorModelo.transform(teste_subset)
reg_model_evaluator(cvPrevisoes_GLR, labelCol=target)

Removendo Outliers: </br>
MAE: 68.6333663102972</br>
RMSE: 88.25310127166915</br>
r2 Ajustado: 0.2618314685137697

Com outliers: </br>
MAE: 107.74259499274247 </br>
RMSE: 168.929983814752 </br>
r2 Ajustado: 0.5860352598210408 </br>

#### 8.4.2 Gradient Boosted Tree

In [176]:
target = 'target_3'
dataset = spark.table("dataset_churn_200617_sem_outliers").repartition(2).cache()

In [177]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorSlicer

treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)

vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
modeloGBT = GBTRegressor(featuresCol='features_subset', labelCol=target)

treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

paramGrid = ParamGridBuilder() \
    .addGrid(modeloGBT.stepSize, [0.1])\
    .addGrid(modeloGBT.maxDepth, [5])\
    .addGrid(modeloGBT.maxIter, [20])\
    .build()
#     .addGrid(modeloGBT.maxBins, [30])\
#     .addGrid(modeloGBT.maxDepth, [10,20])\
#     .addGrid(modeloGBT.maxIter, [30,60])\

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=target, metricName='rmse')

crossval = CrossValidator(estimator=modeloGBT,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

# Deve demorar ~16 min
import time
ti = time.time()
cvModeloGBT = crossval.fit(treino_subset)
tf = time.time()
print("Demorou {} segundos".format(tf - ti))

melhorModelo = cvModeloGBT.bestModel
cvPrevisoes_GBT = melhorModelo.transform(teste_subset)
reg_model_evaluator(cvPrevisoes_GBT, labelCol=target)

Max depth = 5, maxIter = 20, cv = 2 </br>
MAE: 68.89573169900193</br>
RMSE: 88.71178330885456</br>
r2 Ajustado: 0.2541384920188542

Com maxIter = 40</br>
MAE: 73.40236201755101 </br>
RMSE: 96.2552937598589</br>
r2 Ajustado: 0.1279477498234911

Com maxIter = 30</br>
Demorou 3139.39408659935 segundos</br>
MAE: 72.85990046883227</br>
RMSE: 95.94414358577018</br>
r2 Ajustado: 0.1335765446651931

In [181]:
# Removendo versao antiga do modelo salvo
modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GBT_v2"
dbutils.fs.rm(modelpath, True)

In [182]:
# model_problema3_cvModelo_GBT_v1: com RF como método de FeatureSelction; sem estado/favorite_dishes.
modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GBT_v3"
cvModeloGBT.write().overwrite().save(modelpath)

In [183]:
dataset = spark.table("dataset_churn_200617_sem_outliers").repartition(2).cache()
treino, teste = dataset.randomSplit([0.8, 0.2], seed = 42)

modelpath = "/dbfs/FileStore/models/model_problema3_cvModelo_GBT_v3"
vector_slicer = VectorSlicer(inputCol= "features", indices= varidx, outputCol= "features_subset")
treino_subset = vector_slicer.transform(treino)
teste_subset = vector_slicer.transform(teste)

saved_cvModeloGBT = CrossValidatorModel.load(modelpath)
melhorModeloGBT = saved_cvModeloGBT.bestModel
cvPrevisoes_GBT = melhorModeloGBT.transform(teste_subset)
reg_model_evaluator(cvPrevisoes_GBT, labelCol=target)

In [184]:
# https://stackoverflow.com/questions/42549200/how-to-get-all-parameters-of-estimator-in-pyspark
{param[0].name: param[1] for param in melhorModeloGBT.extractParamMap().items()}

In [185]:
varlist = ExtractFeatureImp(melhorModeloGBT.featureImportances, cvPrevisoes_GBT, "features_subset")
varidx = [x for x in varlist['idx'][0:100]]

varlist[varlist['score'] > 0].count()
# Plot feature importances (Para slide)
import seaborn as sns

fig=plt.figure(figsize=[10,5])
ax=fig.add_subplot(111)
ax.set_title("Top 15 Features Gradient Boosted Tree")
ax = sns.barplot(x='score', y='name',data=varlist.head(15), color=(0.2, 0.4, 0.6, 0.6))
plt.xticks(rotation=0) 
plt.show()

### 8.5 Conclusão e Resposta do Problema

Abaixo temos para a variável target, `sum_order_total`, as seguintes estatísticas:
- número de pedidos mínimo: R$14
- número de pedidos máximo: R$7.6k
- mediana: R$128.7
- média: R$207.79

In [188]:
import numpy as np
def median(values_list):
    med = np.median(values_list)
    return float(med)
udf_median = f.udf(median)

df_filtrado = spark.table("df_filtrado_200610")

display(df_filtrado.agg(min('sum_order_total'),udf_median(f.collect_list(col('sum_order_total'))),max('sum_order_total'),avg('sum_order_total')))

min(sum_order_total),"median(collect_list(sum_order_total, 0, 0))",max(sum_order_total),avg(sum_order_total)
14.0,128.7,7597.38,207.7945444706828


**Resultados:** O melhor modelo para a predição do valor mensal gasto no próximo mês (resultados avaliação do modelo na base de teste) foi:
- GBT: MAE = 68.6; RSME = 88.3

Para a base, a média da target é de R$207.79  e a mediana é R$128.7. O `MAE` obtido, R$69.7, é "apenas" aproximadamente 25% maior que o ticket médio mensal (~R$55). Isto é, para uma base onde as pessoas gastam, em média, 4 tickets médios por mês, o modelo construído erra 1.25 pedidos.

O `RMSE` é uma métrica útil pois penaliza erros muito grandes. Sem o tratamento de outliers, tínhamos um MAE de R$107 e RMSE de R$170 (RMSE 58% maior que o MAE). Ao remover os outliers e fazer o CV e Hyperparâmetros, houve uma diminuição dos erros para MAE de R$69.7 e RMSE de R$89 (RMSE 28% maior que o MAE). Ou seja, tivemos um ganho de 30p.p. apenas na comparação, além de uma diminuição de ~35% do MAE.

Como o `RSME` (~R$89) ficou mais próximo do `MAE`, apesar de estarmos errando, a magnitude da distribuição dos erros não indica que temos erros muito elevados.

Sob uma perspectiva de negócios, ao prever qual vai ser o possível valor gasto no mês seguinte, como destacado na motivação da escolha por essa abordagem, podemos calibrar com maior assertividade o volume de envios de pushes de forma a evitar que um eventual excesso de envios de comunicações resulte em churn. Além disso, podemos também estimar melhor a demanda por região de forma a alocar melhor recursos e dar insights para o time comercial sobre regiões com potencial aumento de demanda.