# 1.Introdução

# Etapas do Processo

    1. Introdução
        Explicação sobre os dados e o processo a ser realizado.
        
    2. Importação das Bibliotecas
        Importação das bibliotecas que serão utilizadas no processo.
        
    3. Criação do Ambiente Spark
        A criação de um ambiente Spark é o processo de configurar um ambiente de computação que utiliza o Apache Spark, um framework de processamento de dados em larga escala. Este ambiente pode incluir a instalação do Spark, configuração de variáveis de ambiente, e a integração com ferramentas como Jupyter Notebook, que permitem a interação com os dados de forma eficiente e intuitiva.
        Os motivos para a decisão de trabalhar com spark são:
        
            * Processamento Distribuído: O Apache Spark é projetado para processar grandes volumes de dados de forma distribuída, o que significa que ele pode realizar operações em paralelo em clusters de computação, resultando em maior eficiência e velocidade.
            
           *  Flexibilidade: O Spark suporta diversas linguagens de programação, como Python, Java, Scala e R. Isso permite que os desenvolvedores escolham a linguagem com a qual estão mais confortáveis, facilitando a integração e a implementação de soluções de dados.
            
            * Diversas Funcionalidades: O ambiente Spark oferece uma gama de bibliotecas e módulos, como Spark SQL, MLlib (para aprendizado de máquina), Spark Streaming (para processamento de dados em tempo real) e GraphX (para processamento de gráficos), permitindo uma variedade de aplicações em diferentes áreas.
            
            * Integração com Big Data: O Spark é frequentemente utilizado em conjunto com sistemas de armazenamento de dados grandes, como Hadoop, Apache Cassandra e Amazon S3, permitindo o acesso e o processamento de dados em larga escala.
            
            * Desempenho: O Spark é otimizado para desempenho, com características como execução em memória, o que reduz o tempo necessário para ler e escrever dados em disco. Isso o torna mais rápido em comparação com outras soluções de processamento de dados.
        
            * Análise de Dados Interativa: A criação de um ambiente Spark, especialmente em conjunto com ferramentas como Jupyter Notebook, facilita a análise interativa de dados. Os usuários podem executar comandos e visualizar resultados em tempo real, promovendo uma melhor exploração dos dados.        
            
    4. Importação dos Dados
        Nesta etapa serão importados os dados a serem trabalhados neste notebook.
        
    5. Análise Exploratória dos Dados
        A análise exploratória dos dados é a etapa em que as tabelas importadas anteriormente serão analisadas quanto ao tipo dos metadados, à quantidade de dados, aos dados faltantes, às suas distribuições, entre outras informações fundamentais para iniciar qualquer tipo de trabalho.
        
    6. Tratamento dos Dados
        Nesta etapa iremos realizar a feature selection que é o processo de identificar e selecionar as características (features) mais relevantes de um conjunto de dados para a construção de um modelo de machine learning. Essa etapa é crucial no pré-processamento dos dados, pois impacta diretamente na qualidade e desempenho do modelo. Os principais motivos de realizar a feature selection são: Redução da Dimensionalidade, Melhoria da Performance do Modelo, Interpretação e Explicabilidade, Aumento da Generalização e Facilitação da Visualização.

    7. Criação das Variáveis
        Nesta etapa iremos criar as variáveis explicativas e variáveis históricas.
        A cada variável criada serão realizados as seguintes etapas:

            * Criação da variável
            * Armazenamento do df em .parquet
            * Criação da função desta variável no arquivo feature_store_churn
            * Uso da função neste notebook
        
        7.1. Variáveis explicativas:

    * Tempo desde a Última Transação: A diferença entre a data mais recente no conjunto de dados e a última data de compra de cada cliente.
    * Frequência de Compra: Quantidade de compras que o cliente fez dividida pelo número de meses desde sua inscrição.
    * Total Gasto: Soma de todos os valores gastos pelo cliente em todas as suas transações.
    * Categoria Favorita: Categoria de produto em que o cliente gastou a maior quantia.
    * Gasto Médio por Transação: Total gasto dividido pelo número total de transações.
    * Duração da Assinatura: Número de dias desde que o cliente se inscreveu até a data mais recente no conjunto de dados.
    * Número de Categorias Compradas: Quantidade de categorias diferentes das quais o cliente comprou.
    * Usou Suporte antes da Primeira Compra: Indicador (1 ou 0) se o cliente usou o suporte antes de fazer sua primeira compra.
    * Dias entre Inscrição e Primeira Compra: Diferença em dias entre a data de inscrição do cliente e sua primeira transação.
    * Frequência de Transações por Plano: Número de transações do cliente dividido pelos meses de inscrição, segmentado por plano (Básico, Intermediário, Avançado)

        7.2. Variáveis históricas - 3 meses:

    * Média do Valor Gasto em Esportes nos Últimos 3 Meses: Esta variável calcula a média dos gastos do cliente na categoria "Esportes" nos últimos três meses.
    * Média do Valor Gasto em Eletrônicos nos Últimos 3 Meses: Semelhante à anterior, mas focada na categoria "Eletrônicos".
    * Média do Valor Gasto em Roupas nos Últimos 3 Meses: Foca na categoria "Roupas", representando a média dos gastos do cliente nos últimos três meses.
    * Média do Valor Gasto em Alimentos nos Últimos 3 Meses: Representa a média dos gastos do cliente na categoria "Alimentos" nos últimos três meses.
    * Média do Valor Gasto em Livros nos Últimos 3 Meses: Foca na categoria "Livros", calculando a média dos gastos do cliente nos últimos três meses.
    
        7.3. Repita o passo anterior considerando - últimos 6 meses
        7.4. Repita o passo anterior considerando - últimos 9 meses
        7.5. Repita o passo anterior considerando - últimos 12 meses
    
    8. Criação de Novo Arquivo
        Ao final do processo, iremos gerar novos arquivos no formato .csv e .parquet.

# 2. Importação das Bibliotecas

In [1]:
# !pip install pyspark
# !pip install findspark
# !pip install fastparquet

# !pip install nbimporter
import nbimporter

In [2]:
import findspark

from pyspark.sql import SparkSession, functions as F
from pyspark.sql import Window

# Classe com a feature store
from feature_store_churn import FeatureStoreChurn

findspark.init()

# 3. Configurações do Notebook

## 3.1. Configuração do Ambiente Spark

In [3]:
spark = SparkSession.builder \
    .appName("Feature Selection") \
    .master("local[*]") \
    .getOrCreate()

spark

# 3.2. Funções

In [4]:
def rename_columns(columns_dic, df):
    columns = columns_dic
    
    # Renomear as colunas no DataFrame
    for old_column, new_column in columns.items():
        df = df.withColumnRenamed(old_column, new_column)
    return df

In [5]:
def join_dfs(df_01, df_02, join_condition, join_type):
    df_new = df_01.join(df_02
                                   , on = join_condition
                                   , how = join_type
                                  )
    return df_new

In [6]:
def df_to_parquet(df, file_path):
    df_p = df.toPandas()
    df_p.to_parquet(file_path, index=False)
    print("DF Salvo!")

# 4. Importação dos Dados

In [7]:
base_file_path = "../data/base_churn.csv"
transaction_file_path = "../data/base_transacoes.csv"

df_base = spark.read.csv(base_file_path, header=True, inferSchema=True)
df_transaction = spark.read.csv(transaction_file_path, header=True, inferSchema=True)

df_base.show(5)
df_transaction.show(5)

+---+-----+------+----------------------+------------+-------------+-----+
| ID|Idade|Gênero|Dias desde a Inscrição|Usou Suporte|        Plano|Churn|
+---+-----+------+----------------------+------------+-------------+-----+
|  1|   21|     F|                  1331|           1|Intermediário|    1|
|  2|   21|     M|                  1160|           0|Intermediário|    0|
|  3|   62|     M|                   454|           1|       Básico|    0|
|  4|   64|     M|                   226|           1|Intermediário|    0|
|  5|   61|     M|                   474|           1|     Avançado|    0|
+---+-----+------+----------------------+------------+-------------+-----+
only showing top 5 rows

+------------+----------+--------------------+------------------+-----------+
|ID Transação|ID Cliente|                Data|             Valor|  Categoria|
+------------+----------+--------------------+------------------+-----------+
|           1|         1|2022-11-25 13:50:...|57.287427536330505| 

# 5. Análise Exploratória dos Dados

In [8]:
""" 
##### Lista de coisas a serem feitas #####

Ajustar os nomes de cada coluna
, checar se os valores conferem com o tipo de cada coluna ok
, padronizar os nomes das colunas conforme seus tipos de metadados
, 
, 
"""

' \n##### Lista de coisas a serem feitas #####\n\nAjustar os nomes de cada coluna\n, checar se os valores conferem com o tipo de cada coluna ok\n, padronizar os nomes das colunas conforme seus tipos de metadados\n, \n, \n'

In [9]:
df_base.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Idade: integer (nullable = true)
 |-- Gênero: string (nullable = true)
 |-- Dias desde a Inscrição: integer (nullable = true)
 |-- Usou Suporte: integer (nullable = true)
 |-- Plano: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [10]:
df_transaction.printSchema()

root
 |-- ID Transação: integer (nullable = true)
 |-- ID Cliente: integer (nullable = true)
 |-- Data: timestamp (nullable = true)
 |-- Valor: double (nullable = true)
 |-- Categoria: string (nullable = true)



In [11]:
print("df_base:", df_base.count())
print("df_transaction:", df_transaction.count())

df_base: 1000
df_transaction: 10171


In [12]:
df_base.describe().show()

+-------+-----------------+-----------------+------+----------------------+-------------------+-------------+------------------+
|summary|               ID|            Idade|Gênero|Dias desde a Inscrição|       Usou Suporte|        Plano|             Churn|
+-------+-----------------+-----------------+------+----------------------+-------------------+-------------+------------------+
|  count|             1000|             1000|  1000|                  1000|               1000|         1000|              1000|
|   mean|            500.5|           43.038|  NULL|               920.844|              0.536|         NULL|             0.515|
| stddev|288.8194360957494|15.05227979832714|  NULL|     514.1036064668114|0.49895185434363604|         NULL|0.5000250243988043|
|    min|                1|               18|     F|                     1|                  0|     Avançado|                 0|
|    max|             1000|               69|     M|                  1822|                  1|In

In [13]:
df_transaction.describe().show()

+-------+-----------------+------------------+------------------+---------+
|summary|     ID Transação|        ID Cliente|             Valor|Categoria|
+-------+-----------------+------------------+------------------+---------+
|  count|            10171|             10171|             10171|    10171|
|   mean|           5086.0|505.00629239996067|102.66469491915382|     NULL|
| stddev|2936.259127983541|291.90660532737974| 56.36791146684839|     NULL|
|    min|                1|                 1| 5.018187264142361|Alimentos|
|    max|            10171|              1000|199.98043843894678|   Roupas|
+-------+-----------------+------------------+------------------+---------+



In [14]:
# Para o df_base
df_base.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_base.columns]).show()

# Para o df_transaction
df_transaction.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_transaction.columns]).show()


+---+-----+------+----------------------+------------+-----+-----+
| ID|Idade|Gênero|Dias desde a Inscrição|Usou Suporte|Plano|Churn|
+---+-----+------+----------------------+------------+-----+-----+
|  0|    0|     0|                     0|           0|    0|    0|
+---+-----+------+----------------------+------------+-----+-----+

+------------+----------+----+-----+---------+
|ID Transação|ID Cliente|Data|Valor|Categoria|
+------------+----------+----+-----+---------+
|           0|         0|   0|    0|        0|
+------------+----------+----+-----+---------+



# 6. Tratamento dos Dados

In [15]:
columns_dic = {"ID": "NB_ID_CLIENTE"
               , "Idade": "NB_IDADE"
               , "Gênero": "TX_GENERO"
               , "Dias desde a Inscrição": "NB_DIAS_INSCRITO"
               , "Usou Suporte": "NB_SUPORTE"
               , "Plano": "TX_PLANO"
               , "Churn": "FL_CHURN"}
df_base_01 = rename_columns(columns_dic, df_base)

In [16]:
columns_dic = {"ID Transação": "NB_ID_TRANSACAO"
               , "ID Cliente": "NB_ID_CLIENTE"
               , "Data": "DT_DATA"
               , "Valor": "NB_VALOR"
               , "Categoria": "TX_CATEGORIA"}
df_transaction_01 = rename_columns(columns_dic, df_transaction)

In [17]:
df_base_01.show(5)

+-------------+--------+---------+----------------+----------+-------------+--------+
|NB_ID_CLIENTE|NB_IDADE|TX_GENERO|NB_DIAS_INSCRITO|NB_SUPORTE|     TX_PLANO|FL_CHURN|
+-------------+--------+---------+----------------+----------+-------------+--------+
|            1|      21|        F|            1331|         1|Intermediário|       1|
|            2|      21|        M|            1160|         0|Intermediário|       0|
|            3|      62|        M|             454|         1|       Básico|       0|
|            4|      64|        M|             226|         1|Intermediário|       0|
|            5|      61|        M|             474|         1|     Avançado|       0|
+-------------+--------+---------+----------------+----------+-------------+--------+
only showing top 5 rows



In [18]:
df_transaction_01.show(5)

+---------------+-------------+--------------------+------------------+------------+
|NB_ID_TRANSACAO|NB_ID_CLIENTE|             DT_DATA|          NB_VALOR|TX_CATEGORIA|
+---------------+-------------+--------------------+------------------+------------+
|              1|            1|2022-11-25 13:50:...|57.287427536330505|    Esportes|
|              2|            1|2020-01-19 12:27:...| 97.07199340552512|   Alimentos|
|              3|            1|2021-12-28 12:33:...|169.10581012381087|      Livros|
|              4|            1|2022-02-05 01:39:...|199.38694865538451|      Roupas|
|              5|            1|2022-11-16 23:06:...|160.00228343317622| Eletrônicos|
+---------------+-------------+--------------------+------------------+------------+
only showing top 5 rows



# 7. Feature Enginnering

## 7.1. Variáveis Explicativas

Criar as seguintes variáveis explicativas:

    * Tempo desde a Última Transação: A diferença entre a data mais recente no conjunto de dados e a última data de compra de cada cliente.
    * Frequência de Compra: Quantidade de compras que o cliente fez dividida pelo número de meses desde sua inscrição.
    * Total Gasto: Soma de todos os valores gastos pelo cliente em todas as suas transações.
    * Categoria Favorita: Categoria de produto em que o cliente gastou a maior quantia.
    * Gasto Médio por Transação: Total gasto dividido pelo número total de transações.
    * Duração da Assinatura: Número de dias desde que o cliente se inscreveu até a data mais recente no conjunto de dados.
    * Número de Categorias Compradas: Quantidade de categorias diferentes das quais o cliente comprou.
    * Usou Suporte antes da Primeira Compra: Indicador (1 ou 0) se o cliente usou o suporte antes de fazer sua primeira compra.
    * Dias entre Inscrição e Primeira Compra: Diferença em dias entre a data de inscrição do cliente e sua primeira transação.
    * Frequência de Transações por Plano: Número de transações do cliente dividido pelos meses de inscrição, segmentado por plano (Básico, Intermediário, Avançado)

Variáveis históricas - 3 meses:

    * Média do Valor Gasto em Esportes nos Últimos 3 Meses: Esta variável calcula a média dos gastos do cliente na categoria "Esportes" nos últimos três meses.
    * Média do Valor Gasto em Eletrônicos nos Últimos 3 Meses: Semelhante à anterior, mas focada na categoria "Eletrônicos".
    * Média do Valor Gasto em Roupas nos Últimos 3 Meses: Foca na categoria "Roupas", representando a média dos gastos do cliente nos últimos três meses.
    * Média do Valor Gasto em Alimentos nos Últimos 3 Meses: Representa a média dos gastos do cliente na categoria "Alimentos" nos últimos três meses.
    * Média do Valor Gasto em Livros nos Últimos 3 Meses: Foca na categoria "Livros", calculando a média dos gastos do cliente nos últimos três meses.
    Repita o passo anterior considerando - últimos 6 meses, 9 meses e 12 meses.

### Tempo desde a Última Transação

In [19]:
# Tempo desde a Última Transação: A diferença entre a data mais recente no conjunto de dados e a última data de compra de cada cliente.

max_data_value = df_transaction_01.agg(F.max("DT_DATA")).collect()[0][0]

df_feat_eng_00 = (df_transaction_01.groupBy("NB_ID_CLIENTE")
               .agg(
                    F.datediff(F.lit(max_data_value), F.max(F.col("DT_DATA"))).alias("NB_TEMPO_ULTIMA_TRANSACAO")
               )
              ).orderBy(F.col("NB_ID_CLIENTE"))

In [20]:
df_feat_eng_00.show()

+-------------+-------------------------+
|NB_ID_CLIENTE|NB_TEMPO_ULTIMA_TRANSACAO|
+-------------+-------------------------+
|            1|                       13|
|            2|                      389|
|            3|                       40|
|            4|                       19|
|            5|                        9|
|            6|                      116|
|            7|                      103|
|            8|                      104|
|            9|                      123|
|           10|                       91|
|           11|                      135|
|           12|                      212|
|           13|                       75|
|           14|                      500|
|           15|                      492|
|           16|                       73|
|           17|                       48|
|           18|                       71|
|           19|                      199|
|           20|                      100|
+-------------+-------------------

#### Armazenando feature em parquet

In [21]:
# file_path = "../data/feature_store/FEATURE_NB_TEMPO_ULTIMA_TRANSACAO.parquet"
# df_to_parquet(df_feat_eng, file_path)

#### Função

In [22]:
fs = FeatureStoreChurn()

df_feat_eng_00 = fs.feat_tempo_ultima_transacao(df_transaction)
df_feat_eng_00.show()

+-------------+-------------------------+
|NB_ID_CLIENTE|NB_TEMPO_ULTIMA_TRANSACAO|
+-------------+-------------------------+
|            1|                       13|
|            2|                      389|
|            3|                       40|
|            4|                       19|
|            5|                        9|
|            6|                      116|
|            7|                      103|
|            8|                      104|
|            9|                      123|
|           10|                       91|
|           11|                      135|
|           12|                      212|
|           13|                       75|
|           14|                      500|
|           15|                      492|
|           16|                       73|
|           17|                       48|
|           18|                       71|
|           19|                      199|
|           20|                      100|
+-------------+-------------------

### Frequência de Compra


In [23]:
# Frequência de Compra: Quantidade de compras que o cliente fez dividida pelo número de meses desde sua inscrição.
columns_dic = {"ID": "NB_ID_CLIENTE"
               , "Idade": "NB_IDADE"
               , "Gênero": "TX_GENERO"
               , "Dias desde a Inscrição": "NB_DIAS_INSCRITO"
               , "Usou Suporte": "NB_SUPORTE"
               , "Plano": "TX_PLANO"
               , "Churn": "FL_CHURN"}
df_base_01 = rename_columns(columns_dic, df_base)

columns_dic = {"ID Transação": "NB_ID_TRANSACAO"
               , "ID Cliente": "NB_ID_CLIENTE"
               , "Data": "DT_DATA"
               , "Valor": "NB_VALOR"
               , "Categoria": "TX_CATEGORIA"}
df_transaction_01 = rename_columns(columns_dic, df_transaction)

df_max_compras = df_base_01.withColumn("DT_PRIMEIRO_DIA"
                                           , F.date_sub(
                                               F.current_timestamp()
                                               , F.col("NB_DIAS_INSCRITO")
                                           )
                                      )

df_max_compras = (df_max_compras.withColumn("NB_MESES_INSCRITO"
                                           , F.months_between(
                                               F.current_timestamp()
                                               , F.col("DT_PRIMEIRO_DIA")
                                           ).cast("int")
                                          )
                 )

df_qtd_compras = (df_transaction_01.groupBy("NB_ID_CLIENTE")
                                    .agg(F.count("*")
                                    .alias("NB_TOTAL_COMPRAS")
                                    )
                 )

df_max_compras_01 = join_dfs(df_max_compras, df_qtd_compras, "NB_ID_CLIENTE", "left")

df_max_compras_01 = (df_max_compras_01.withColumn("NB_FREQ_COMPRAS"
                                                 , F.round(
                                                     F.col("NB_TOTAL_COMPRAS") / F.col("NB_MESES_INSCRITO")
                                                 , 2)
                                                )
                    ).select("NB_ID_CLIENTE", "NB_FREQ_COMPRAS")

In [24]:
df_max_compras_01.show()

+-------------+---------------+
|NB_ID_CLIENTE|NB_FREQ_COMPRAS|
+-------------+---------------+
|            1|            0.4|
|            2|           0.26|
|            3|           0.64|
|            4|           1.14|
|            5|            1.0|
|            6|            1.0|
|            7|            0.4|
|            8|           0.25|
|            9|           0.27|
|           10|            0.1|
|           11|           0.12|
|           12|           0.17|
|           13|           0.21|
|           14|           0.06|
|           15|           0.02|
|           16|           0.29|
|           17|           0.27|
|           18|            5.0|
|           19|           0.21|
|           20|           0.32|
+-------------+---------------+
only showing top 20 rows



In [25]:
# Frequência de Compra: Quantidade de compras que o cliente fez dividida pelo número de meses desde sua inscrição.
"""
Esta seria uma maneira simplificada de realizar o mesmo código acima. 
Deixando aqui para poder ter como referência.

df_max_compras_01 = (
    df_base_01
    .join(
        df_transaction_02.groupBy("NB_ID_CLIENTE")
        .agg(F.count("*").alias("NB_TOTAL_COMPRAS")),
        on="NB_ID_CLIENTE",
        how="left"
    )
    .withColumn(
        "DT_PRIMEIRO_DIA",
        F.date_sub(F.current_timestamp(), F.col("NB_DIAS_INSCRITO"))
    )
    .withColumn(
        "NB_MESES_INSCRITO",
        F.months_between(F.current_timestamp(), F.col("DT_PRIMEIRO_DIA")).cast("int")
    )
    .withColumn(
        "NB_FREQ_COMPRAS",
            F.round(F.col("NB_TOTAL_COMPRAS") / F.col("NB_MESES_INSCRITO"), 2)
        )
    ).select("NB_ID_CLIENTE", "NB_FREQ_COMPRAS")

# Unir df_max_compras_01 com df_base_01
df_base_02 = df_base_01.join(df_max_compras_01, on="NB_ID_CLIENTE", how="left")
"""


'\nEsta seria uma maneira simplificada de realizar o mesmo código acima. \nDeixando aqui para poder ter como referência.\n\ndf_max_compras_01 = (\n    df_base_01\n    .join(\n        df_transaction_02.groupBy("NB_ID_CLIENTE")\n        .agg(F.count("*").alias("NB_TOTAL_COMPRAS")),\n        on="NB_ID_CLIENTE",\n        how="left"\n    )\n    .withColumn(\n        "DT_PRIMEIRO_DIA",\n        F.date_sub(F.current_timestamp(), F.col("NB_DIAS_INSCRITO"))\n    )\n    .withColumn(\n        "NB_MESES_INSCRITO",\n        F.months_between(F.current_timestamp(), F.col("DT_PRIMEIRO_DIA")).cast("int")\n    )\n    .withColumn(\n        "NB_FREQ_COMPRAS",\n            F.round(F.col("NB_TOTAL_COMPRAS") / F.col("NB_MESES_INSCRITO"), 4)\n        )\n    ).select("NB_ID_CLIENTE", "NB_FREQ_COMPRAS")\n\n# Unir df_max_compras_01 com df_base_01\ndf_base_02 = df_base_01.join(df_max_compras_01, on="NB_ID_CLIENTE", how="left")\n'

#### Salvando feature em parquet

In [26]:
# file_path = "../data/feature_store/FEATURE_NB_FREQ_COMPRAS.parquet"
# df_to_parquet(df_max_compras_01, file_path)

#### Função

In [27]:
fs = FeatureStoreChurn()

df_feat_eng_01 = fs.feat_freq_compras(df_transaction, df_base)
df_feat_eng_01.show()

+-------------+---------------+
|NB_ID_CLIENTE|NB_FREQ_COMPRAS|
+-------------+---------------+
|            1|            0.4|
|            2|           0.26|
|            3|           0.64|
|            4|           1.14|
|            5|            1.0|
|            6|            1.0|
|            7|            0.4|
|            8|           0.25|
|            9|           0.27|
|           10|            0.1|
|           11|           0.12|
|           12|           0.17|
|           13|           0.21|
|           14|           0.06|
|           15|           0.02|
|           16|           0.29|
|           17|           0.27|
|           18|            5.0|
|           19|           0.21|
|           20|           0.32|
+-------------+---------------+
only showing top 20 rows



### Total Gasto

In [None]:
# Total Gasto: Soma de todos os valores gastos pelo cliente em todas as suas transações.

window_spec = Window.partitionBy("NB_ID_CLIENTE")
df_gasto_total = df_transaction_01.withColumn("NB_GASTO_TOTAL_CLIENTE", 
                                                 F.round(F.sum("NB_VALOR").over(window_spec), 2)
                                             ).select("NB_ID_CLIENTE", "NB_GASTO_TOTAL_CLIENTE")

df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_gasto_total, "NB_ID_CLIENTE", "left")

In [None]:
# Categoria Favorita: Categoria de produto em que o cliente gastou a maior quantia.

window_spec = Window.partitionBy("NB_ID_CLIENTE", "TX_CATEGORIA")
df_rank_gasto = df_transaction_01.withColumn("NB_GASTO_TOTAL_CLI_CAT", 
                                                 F.round(F.sum("NB_VALOR").over(window_spec), 2))

window_rank = Window.partitionBy("NB_ID_CLIENTE").orderBy(F.desc("NB_GASTO_TOTAL_CLI_CAT"))

df_rank_gasto = df_rank_gasto.withColumn("NB_RANK_GASTO_CLI_CAT", F.row_number().over(window_rank))

df_rank_gasto = df_rank_gasto.filter(F.col("NB_RANK_GASTO_CLI_CAT") == 1).select("NB_ID_CLIENTE", "TX_CATEGORIA")

df_rank_gasto = df_rank_gasto.withColumnRenamed("TX_CATEGORIA", "TX_MAIOR_CATEGORIA")

df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_rank_gasto, "NB_ID_CLIENTE", "left")

In [None]:
"""
Esta seria uma maneira simplificada de realizar o mesmo código acima. 
Deixando aqui para poder ter como referência.
"""

# window_spec = Window.partitionBy("NB_ID_CLIENTE", "TX_CATEGORIA")
# window_rank = Window.partitionBy("NB_ID_CLIENTE").orderBy(F.desc("NB_GASTO_TOTAL_CLI_CAT"))

# df_rank_gasto = (df_transaction_02
#                  .withColumn("NB_GASTO_TOTAL_CLI_CAT", F.round(F.sum("NB_VALOR").over(window_spec), 2))
#                  .withColumn("NB_RANK_GASTO_CLI_CAT", F.row_number().over(window_rank))
#                  .filter(F.col("NB_RANK_GASTO_CLI_CAT") == 1)
#                  .select("NB_ID_CLIENTE", F.col("TX_CATEGORIA").alias("TX_MAIOR_CATEGORIA"))
#                 )

# # Juntar os resultados com df_transaction_02
# df_transaction_02 = df_transaction_02.join(df_rank_gasto, on="NB_ID_CLIENTE", how="left")

In [None]:
# Gasto Médio por Transação: Total gasto dividido pelo número total de transações.

df_media_gasto = df_feat_eng.withColumn("NB_MEDIA_GASTO_CLI"
                                              , F.round(
                                                  F.col("NB_GASTO_TOTAL_CLIENTE") / F.col("NB_TOTAL_COMPRAS")
                                                  , 2)
                                             ).select("NB_ID_CLIENTE", "NB_MEDIA_GASTO_CLI")

df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_media_gasto, "NB_ID_CLIENTE", "left")

In [None]:
# Duração da Assinatura: Número de dias desde que o cliente se inscreveu até a data mais recente no conjunto de dados.

df_feat_eng = df_feat_eng.withColumn("NB_DIAS_ASSINATURA"
                                                   , F.datediff(
                                                           F.col("DT_PRIMEIRO_DIA"), F.lit(max_data_value)
                                                       ).cast("int")
                                                   )

In [None]:
# Número de Categorias Compradas: Quantidade de categorias diferentes das quais o cliente comprou.
df_num_cat = (df_transaction_01.groupBy("NB_ID_CLIENTE")
                                    .agg(F.countDistinct("TX_CATEGORIA").alias("NB_TOTAL_CATEGORIAS")
                                        )
             )
df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_num_cat, "NB_ID_CLIENTE", "left")

In [None]:
df_feat_eng.show()

In [None]:
# Usou Suporte antes da Primeira Compra: Indicador (1 ou 0) se o cliente usou o suporte antes de fazer sua primeira compra.
df_compra = (df_feat_eng
              .join(df_base_01, on='NB_ID_CLIENTE', how='inner')
              .join(df_transaction_01, on='NB_ID_CLIENTE', how='left')
              .groupBy("NB_ID_CLIENTE", "DT_PRIMEIRO_DIA", "NB_SUPORTE")
              .agg(F.min("DT_DATA").alias("DT_MIN_COMPRA"))
              .withColumn("FL_NUNCA_COMPROU", F.when(F.col("DT_MIN_COMPRA").isNull(), 1).otherwise(0))
              .withColumn("FL_SUPORTE_ANTES_COMPRA", F.when((F.col("FL_NUNCA_COMPROU") == 1) & (F.col("NB_SUPORTE") == 1), 1).otherwise(0))
            ).select("NB_ID_CLIENTE", "FL_SUPORTE_ANTES_COMPRA", "DT_MIN_COMPRA")

df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_compra, "NB_ID_CLIENTE", "left")

In [None]:
# Dias entre Inscrição e Primeira Compra: Diferença em dias entre a data de inscrição do cliente e sua primeira transação.
df_diff_insc_compra = (df_feat_eng
                       .join(df_transaction_01, on='NB_ID_CLIENTE', how='left')
                       .groupBy("NB_ID_CLIENTE", "DT_PRIMEIRO_DIA", "DT_MIN_COMPRA")
                       .withColumn("NB_DIFF_INSC_COMPRA", F.date_diff(F.to_date(F.col("DT_MIN_COMPRA")), F.col("DT_PRIMEIRO_DIA")))

)

df_feat_eng = join_dfs(df_feat_eng, df_feat_eng, df_diff_insc_compra, "NB_ID_CLIENTE", "left")

In [None]:
# Frequência de Transações por Plano: Número de transações do cliente dividido pelos meses de inscrição, segmentado por plano (Básico, Intermediário, Avançado)
"NB_MESES_INSCRITO"


## 7.2. Variáveis Históricas

### 7.2.1. 3 Meses

### 7.2.2. 6 Meses

### 7.2.3. 9 Meses

### 7.2.4. 12 Meses

# 8. Criação de Novo Arquivo

In [None]:
"""
RETIRAR A COLUNA NB_TOTAL_COMPRAS, "DT_PRIMEIRO_DIA" de df_transaction ao final...
"""

## 8.1. Criando CSV

## 8.2. Criando PARQUET