<a href="https://colab.research.google.com/github/wagnermoraesjr/Etapa_Feature_Engineering/blob/main/Notebook_Feature_Engineering_Problema_Churn_github.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Feature Engineering**
A Feature Engineering é um processo fundamental para ciência de dados. Ela se refere ao processo de criação e transformação de variáveis para melhorar o desempenho dos modelos de Machine Learning.<br><br>
**Objetivo**

Meu objetivo aqui é colocar em prática o aprendizado sobre Feature Engineering, o processo da criação de variáveis explicativas para serem utilizadas nos algoritmos de Machine Learning.<br><br>
**Ferramentas**

Quando se trata de dados volumosos, que geralmente é o que encontramos nessa etapa do processo, o ideal é trabalhar com o Spark, devido à sua capacidade de processamento distribuído e paralelismo. Sendo assim irei trabalhar com o **Spark em conjunto com o SQL**, pois é uma interface familiar e fácil de se trabalhar.<br><br>

**Problema de Negócio**

Para esse trabalho vou usar dados fictícios que simulam as operações de uma empresa que gostaria de avaliar a perda de clientes (Churn).<br><br>
**Base de Dados**

- **Base de Churn:** Com informações cadastrais de 1.000 pessoas.
- **Base de Transações:** Contendo as transações históricas de compras feitas por essas pessoas, incluindo detalhes como data da compra, valor gasto e categoria do produto.

<br>

##**Instalação e configuração do Apache Spark em nosso ambiente do Google Colab.**

In [54]:
# Este comando instala o OpenJDK 8, que é uma implementação de código aberto da plataforma Java, necessária para executar o Apache Spark.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixa silenciosamente (sem imprimir mensagens no console) o arquivo tarball (.tgz) do Apache Spark versão 3.1.2 pré-compilado para Hadoop 3.2.
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

# Descompacta o arquivo tarball que foi baixado, extraindo os arquivos do Spark.
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

# Instala silenciosamente o pacote Python chamado "findspark". Este pacote é útil para configurar o Spark em um ambiente Python, adicionando automaticamente o Spark ao caminho do sistema.
!pip install -q findspark

In [55]:
# Configurações essenciais para que o sistema reconheça onde o Java e o Spark estão instalados, permitindo a execução correta de aplicativos Spark no ambiente Python.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [56]:
# Configura e inicializa o ambiente Spark para ser utilizado em um ambiente Python, permitindo a criação de DataFrames distribuídos e a execução de operações distribuídas usando o Apache Spark.
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

<br>

## **Leitura da nossa base de dados.**

In [57]:
# Lendo nossa base de público e criando um DataFrame no Spark.
df_publico = spark.read.csv('/content/drive/MyDrive/Projetos_Big_Data_Analytics/Ciencia_de_Dados/Etapa_Feature_Engineering/base_churn.csv', header=True, inferSchema=True)
df_publico.createOrReplaceTempView('df_publico')

In [58]:
# Lendo nossa base de transações e criando um DataFrame no Spark.
df_transacoes = spark.read.csv('/content/drive/MyDrive/Projetos_Big_Data_Analytics/Ciencia_de_Dados/Etapa_Feature_Engineering/base_transacoes.csv', header=True, inferSchema=True)
df_transacoes.createOrReplaceTempView('df_transacoes')

<br>

## **Visualização dos DataFrames criados.**

In [59]:
# Exibindo a estrutura do schema do DataFrame ´df_publico´.
df_publico.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Idade: integer (nullable = true)
 |-- Gênero: string (nullable = true)
 |-- Dias desde a Inscrição: integer (nullable = true)
 |-- Usou Suporte: integer (nullable = true)
 |-- Plano: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [60]:
# Mostrando as 10 primeiras linhas do DataFrame ´df_publico´.
df_publico.show(10,False)

+---+-----+------+----------------------+------------+-------------+-----+
|ID |Idade|Gênero|Dias desde a Inscrição|Usou Suporte|Plano        |Churn|
+---+-----+------+----------------------+------------+-------------+-----+
|1  |21   |F     |1331                  |1           |Intermediário|1    |
|2  |21   |M     |1160                  |0           |Intermediário|0    |
|3  |62   |M     |454                   |1           |Básico       |0    |
|4  |64   |M     |226                   |1           |Intermediário|0    |
|5  |61   |M     |474                   |1           |Avançado     |0    |
|6  |18   |M     |419                   |0           |Básico       |0    |
|7  |52   |M     |1334                  |0           |Básico       |0    |
|8  |44   |M     |1124                  |1           |Intermediário|0    |
|9  |52   |M     |1256                  |1           |Intermediário|0    |
|10 |64   |F     |1197                  |0           |Básico       |0    |
+---+-----+------+-------

In [61]:
# Checando quantas linhas tem o DataFrame ´df_publico´.
df_publico.count()

1000

In [62]:
# Exibindo a estrutura do schema do DataFrame ´df_transacoes´.
df_transacoes.printSchema()

root
 |-- ID Transação: integer (nullable = true)
 |-- ID Cliente: integer (nullable = true)
 |-- Data: string (nullable = true)
 |-- Valor: double (nullable = true)
 |-- Categoria: string (nullable = true)



In [63]:
# Mostrando as 10 priemeiras linhas do DataFrame ´df_transacoes´.
df_transacoes.show(10,False)

+------------+----------+-----------------------------+------------------+-----------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |
+------------+----------+-----------------------------+------------------+-----------+
|1           |1         |2022-11-25 13:50:26.548672560|57.287427536330505|Esportes   |
|2           |1         |2020-01-19 12:27:36.637168141|97.07199340552512 |Alimentos  |
|3           |1         |2021-12-28 12:33:58.938053096|169.10581012381087|Livros     |
|4           |1         |2022-02-05 01:39:49.380530968|199.38694865538451|Roupas     |
|5           |1         |2022-11-16 23:06:54.159292032|160.00228343317622|Eletrônicos|
|6           |1         |2020-04-25 12:42:28.672566372|9.842481270765422 |Alimentos  |
|7           |1         |2022-10-31 22:05:18.584070800|76.90706330227667 |Eletrônicos|
|8           |1         |2020-10-06 12:14:52.035398228|53.20607404593595 |Roupas     |
|9           |1         |2022-11-02 12:50:5

In [64]:
# Checando quantas linhas tem o DataFrame ´df_transacoes´.
df_transacoes.count()

10171

<br>

## **Criação de variáveis explicativas.**

**1. Tempo desde a Última Transação:** A diferença entre a data mais recente do conjunto de dados e a última data de compra de cada cliente.

In [65]:
df_temp_01 = spark.sql('''

SELECT
  `ID Cliente`,
  MAX(CAST(Data AS DATE)) AS UTL_DATA_COMPRA,
  DATEDIFF(current_date(), MAX(CAST(Data AS DATE))) AS DIAS_DESDE_ULT_TRANS
FROM
  df_transacoes
GROUP BY
  `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_01.createOrReplaceTempView('df_temp_01')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_01.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_01.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_01.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 2

+----------+---------------+--------------------+
|ID Cliente|UTL_DATA_COMPRA|DIAS_DESDE_ULT_TRANS|
+----------+---------------+--------------------+
|1         |2022-12-18     |332                 |
|2         |2021-12-07     |708                 |
|3         |2022-11-21     |359                 |
|4         |2022-12-12     |338                 |
|5         |2022-12-22     |328                 |
|6         |2022-09-06     |435                 |
|7         |2022-09-19     |422                 |
|8         |2022-09-18     |423                 |
|9         |2022-08-30     |442                 |
|10        |2022-10-01     |410                 |
+----------+---------------+--------------------+
only showing top 10 rows



**2. Duração da Assinatura:** Número de dias desde que o cliente fez a primeira compra até a data da compra mais recente.

In [66]:
df_temp_02 = spark.sql('''

SELECT
  `ID Cliente`,
  DATEDIFF(MAX(CAST(Data AS DATE)), MIN(CAST(Data AS DATE))) AS QTD_DIAS_DESDE_PRIM_COMPRA
FROM
  df_transacoes
GROUP BY
  `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_02.createOrReplaceTempView('df_temp_02')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_02.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_02.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_02.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 1

+----------+--------------------------+
|ID Cliente|QTD_DIAS_DESDE_PRIM_COMPRA|
+----------+--------------------------+
|1         |1064                      |
|2         |701                       |
|3         |1033                      |
|4         |837                       |
|5         |1064                      |
|6         |915                       |
|7         |975                       |
|8         |901                       |
|9         |925                       |
|10        |799                       |
+----------+--------------------------+
only showing top 10 rows



**3. Frequência de Compra:** Quantidade de compras que o cliente fez dividida pelo número de meses desde sua primeira compra.

In [67]:
df_temp_03 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(COUNT(*) / (DATEDIFF(current_date(), MIN(CAST(Data AS DATE))) / 30), 2) AS QTD_COMPRAS_MES
FROM
  df_transacoes
GROUP BY
  `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_03.createOrReplaceTempView('df_temp_03')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_03.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_03.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_03.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 1

+----------+---------------+
|ID Cliente|QTD_COMPRAS_MES|
+----------+---------------+
|1         |0.37           |
|2         |0.21           |
|3         |0.19           |
|4         |0.2            |
|5         |0.32           |
|6         |0.29           |
|7         |0.37           |
|8         |0.2            |
|9         |0.24           |
|10        |0.1            |
+----------+---------------+
only showing top 10 rows



**4. Valor Total, Valor Médio, Valor Máximo e Valor Mínimo Gasto:** Soma, média, máximo e mínimo de todos os valores gastos pelo cliente em todas as suas transações.

In [68]:
df_temp_04 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(Valor), 2) AS VL_TOT_GASTO,
  ROUND(AVG(Valor), 2) AS VL_MED_GASTO,
  ROUND(MAX(Valor), 2) AS VL_MAX_GASTO,
  ROUND(MIN(Valor), 2) AS VL_MIN_GASTO
FROM
  df_transacoes
GROUP BY
  `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_04.createOrReplaceTempView('df_temp_04')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_04.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_04.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_04.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 4

+----------+------------+------------+------------+------------+
|ID Cliente|VL_TOT_GASTO|VL_MED_GASTO|VL_MAX_GASTO|VL_MIN_GASTO|
+----------+------------+------------+------------+------------+
|1         |1976.56     |116.27      |199.39      |5.39        |
|2         |1017.8      |101.78      |192.63      |47.58       |
|3         |997.81      |110.87      |191.09      |41.91       |
|4         |1009.77     |126.22      |199.35      |41.01       |
|5         |1755.18     |117.01      |170.78      |16.25       |
|6         |1841.53     |141.66      |199.29      |8.9         |
|7         |1671.35     |98.31       |180.02      |21.47       |
|8         |841.03      |93.45       |164.54      |22.74       |
|9         |1019.13     |92.65       |196.06      |14.62       |
|10        |416.41      |104.1       |185.66      |15.92       |
+----------+------------+------------+------------+------------+
only showing top 10 rows



**5. Valor Total e Valor Médio Gasto por Categoria:** Soma e média dos valores gastos em cada categoria.

In [69]:
df_temp_05 = spark.sql('''

  SELECT
  `ID Cliente`,
    ROUND(SUM(CASE WHEN Categoria = 'Alimentos' THEN Valor ELSE 0 END),2) AS VL_TOT_GASTO_ALIMENTOS,
    ROUND(AVG(CASE WHEN Categoria = 'Alimentos' THEN Valor ELSE NULL END),2) AS VL_MED_GASTO_ALIMENTOS,
    ROUND(SUM(CASE WHEN Categoria = 'Eletrônicos' THEN Valor ELSE 0 END),2) AS VL_TOT_GASTO_ELETRONICOS,
    ROUND(AVG(CASE WHEN Categoria = 'Eletrônicos' THEN Valor ELSE NULL END),2) AS VL_MED_GASTO_ELETRONICOS,
    ROUND(SUM(CASE WHEN Categoria = 'Esportes' THEN Valor ELSE 0 END),2) AS VL_TOT_GASTO_ESPORTES,
    ROUND(AVG(CASE WHEN Categoria = 'Esportes' THEN Valor ELSE NULL END),2) AS VL_MED_GASTO_ESPORTES,
    ROUND(SUM(CASE WHEN Categoria = 'Livros' THEN Valor ELSE 0 END),2) AS VL_TOT_GASTO_LIVROS,
    ROUND(AVG(CASE WHEN Categoria = 'Livros' THEN Valor ELSE NULL END),2) AS VL_MED_GASTO_LIVROS,
    ROUND(SUM(CASE WHEN Categoria = 'Roupas' THEN Valor ELSE 0 END),2) AS VL_TOT_GASTO_ROUPAS,
    ROUND(AVG(CASE WHEN Categoria = 'Roupas' THEN Valor ELSE NULL END),2) AS VL_MED_GASTO_ROUPAS
  FROM
    df_transacoes
  GROUP BY
    `ID Cliente`
  ORDER BY
    `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_05.createOrReplaceTempView('df_temp_05')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_05.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_05.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_05.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 10

+----------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+
|ID Cliente|VL_TOT_GASTO_ALIMENTOS|VL_MED_GASTO_ALIMENTOS|VL_TOT_GASTO_ELETRONICOS|VL_MED_GASTO_ELETRONICOS|VL_TOT_GASTO_ESPORTES|VL_MED_GASTO_ESPORTES|VL_TOT_GASTO_LIVROS|VL_MED_GASTO_LIVROS|VL_TOT_GASTO_ROUPAS|VL_MED_GASTO_ROUPAS|
+----------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+
|1         |351.26                |87.81                 |396.72                  |132.24                  |226.73               |113.37               |551.06             |183.69             |450.79             |90.16              |
|2         |1

**6. Categoria Favorita:** Categoria de produto em que o cliente gastou a maior quantia.

In [70]:
df_temp_06 = spark.sql('''

WITH ranked_categories AS (
  SELECT
    `ID Cliente`,
    Categoria,
    RANK() OVER (PARTITION BY `ID Cliente` ORDER BY SUM(Valor) DESC) AS RNK
  FROM
    df_transacoes
  GROUP BY
    `ID Cliente`,
    Categoria
)
SELECT
  `ID Cliente`,
  Categoria AS CATEG_FAVORITA
FROM
  ranked_categories
WHERE
  RNK = 1
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_06.createOrReplaceTempView('df_temp_06')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_06.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_06.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_06.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 1

+----------+--------------+
|ID Cliente|CATEG_FAVORITA|
+----------+--------------+
|1         |Livros        |
|2         |Livros        |
|3         |Esportes      |
|4         |Roupas        |
|5         |Eletrônicos   |
|6         |Roupas        |
|7         |Eletrônicos   |
|8         |Eletrônicos   |
|9         |Livros        |
|10        |Alimentos     |
+----------+--------------+
only showing top 10 rows



**7. Número de Categorias Compradas:** Quantidade de categorias diferentes das quais o cliente comprou.

In [71]:
df_temp_07 = spark.sql('''

SELECT
  `ID Cliente`,
  COUNT(DISTINCT Categoria) AS QTD_CATEG_DIF
FROM
  df_transacoes
GROUP BY
  `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_07.createOrReplaceTempView('df_temp_07')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_07.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_07.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_07.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 1

+----------+-------------+
|ID Cliente|QTD_CATEG_DIF|
+----------+-------------+
|1         |5            |
|2         |4            |
|3         |4            |
|4         |5            |
|5         |5            |
|6         |5            |
|7         |5            |
|8         |5            |
|9         |4            |
|10        |4            |
+----------+-------------+
only showing top 10 rows



<br>

## **Criação de variáveis históricas (3, 6, 9 e 12 meses).**

**Criando uma flag que marca as transações feitas nos últimos 3, 6, 9 e 12 meses (janela temporal).**

Usamos **Window Functions** `MAX(Data) OVER (PARTITION BY 'ID Cliente')` para obter a data de compra mais recente para cada cliente. Comparamos então a data de cada transação com a data mais recente, para determinar se ela está dentro do intervalo dos últimos 3, 6, 9 ou 12 meses. Atribuímos 1 se a transação estiver dentro do intervado ou 0 se estiver fora.

In [72]:
df_temp_flag_hist = spark.sql('''

SELECT
  *,
  CASE WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -90) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1 ELSE 0 END AS ultimos_3_meses_flag,
  CASE WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -180) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1 ELSE 0 END AS ultimos_6_meses_flag,
  CASE WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -270) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1 ELSE 0 END AS ultimos_9_meses_flag,
  CASE WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -365) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1 ELSE 0 END AS ultimos_12_meses_flag
FROM
  df_transacoes
ORDER BY
  `ID Cliente`,
  Data

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_flag_hist.createOrReplaceTempView('df_temp_flag_hist')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_flag_hist.count()}\n')

# Mostrando o novo DataFrame.
df_temp_flag_hist.show(20, False)

Total de Linhas: 10171

+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+--------------------+---------------------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |ultimos_3_meses_flag|ultimos_6_meses_flag|ultimos_9_meses_flag|ultimos_12_meses_flag|
+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+--------------------+---------------------+
|2           |1         |2020-01-19 12:27:36.637168141|97.07199340552512 |Alimentos  |0                   |0                   |0                   |0                    |
|6           |1         |2020-04-25 12:42:28.672566372|9.842481270765422 |Alimentos  |0                   |0                   |0                   |0                    |
|10          |1         |2020-09-06 21:37:41.946902652|56.524638713138636|Alimentos  |0                   |0        

**8. Total e Média do Valor Gasto em Alimentos:** Esta variável calcula o total e a média dos gastos do cliente na categoria "Alimentos" nos últimos três, seis, nove e doze meses.

In [73]:
df_temp_08 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(CASE WHEN Categoria = 'Alimentos' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U3M_CONS_ALIMENTOS,
  ROUND(SUM(CASE WHEN Categoria = 'Alimentos' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U6M_CONS_ALIMENTOS,
  ROUND(SUM(CASE WHEN Categoria = 'Alimentos' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U9M_CONS_ALIMENTOS,
  ROUND(SUM(CASE WHEN Categoria = 'Alimentos' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U12M_CONS_ALIMENTOS,
  ROUND(AVG(CASE WHEN Categoria = 'Alimentos' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U3M_CONS_ALIMENTOS,
  ROUND(AVG(CASE WHEN Categoria = 'Alimentos' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U6M_CONS_ALIMENTOS,
  ROUND(AVG(CASE WHEN Categoria = 'Alimentos' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U9M_CONS_ALIMENTOS,
  ROUND(AVG(CASE WHEN Categoria = 'Alimentos' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U12M_CONS_ALIMENTOS
FROM
  df_temp_flag_hist
GROUP
  BY `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_08.createOrReplaceTempView('df_temp_08')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_08.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_08.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_08.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 8

+----------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+
|ID Cliente|VL_TOT_U3M_CONS_ALIMENTOS|VL_TOT_U6M_CONS_ALIMENTOS|VL_TOT_U9M_CONS_ALIMENTOS|VL_TOT_U12M_CONS_ALIMENTOS|VL_MED_U3M_CONS_ALIMENTOS|VL_MED_U6M_CONS_ALIMENTOS|VL_MED_U9M_CONS_ALIMENTOS|VL_MED_U12M_CONS_ALIMENTOS|
+----------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+
|1         |null                     |187.82                   |187.82                   |187.82                    |null                     |187.82                   |187.82                   |187.82                    |
|2         |47.58                    |109.58          

**9. Total e Média do Valor Gasto em Eletrônicos:** Esta variável calcula o total e a média dos gastos do cliente na categoria "Eletrônicos" nos últimos três, seis, nove e doze meses.

In [74]:
df_temp_09 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U3M_CONS_ELETRONICOS,
  ROUND(SUM(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U6M_CONS_ELETRONICOS,
  ROUND(SUM(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U9M_CONS_ELETRONICOS,
  ROUND(SUM(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U12M_CONS_ELETRONICOS,
  ROUND(AVG(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U3M_CONS_ELETRONICOS,
  ROUND(AVG(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U6M_CONS_ELETRONICOS,
  ROUND(AVG(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U9M_CONS_ELETRONICOS,
  ROUND(AVG(CASE WHEN Categoria = 'Eletrônicos' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U12M_CONS_ELETRONICOS
FROM
  df_temp_flag_hist
GROUP
  BY `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_09.createOrReplaceTempView('df_temp_09')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_09.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_09.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_09.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 8

+----------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_TOT_U3M_CONS_ELETRONICOS|VL_TOT_U6M_CONS_ELETRONICOS|VL_TOT_U9M_CONS_ELETRONICOS|VL_TOT_U12M_CONS_ELETRONICOS|VL_MED_U3M_CONS_ELETRONICOS|VL_MED_U6M_CONS_ELETRONICOS|VL_MED_U9M_CONS_ELETRONICOS|VL_MED_U12M_CONS_ELETRONICOS|
+----------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+
|1         |236.91                     |236.91                     |396.72                     |396.72                      |118.45                     |118.45                     |132.24                     |132.24              

**10. Total e Média do Valor Gasto em Esportes:** Esta variável calcula o total e a média dos gastos do cliente na categoria "Esportes" nos últimos três, seis, nove e doze meses.

In [75]:
df_temp_10 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(CASE WHEN Categoria = 'Esportes' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U3M_CONS_ESPORTES,
  ROUND(SUM(CASE WHEN Categoria = 'Esportes' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U6M_CONS_ESPORTES,
  ROUND(SUM(CASE WHEN Categoria = 'Esportes' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U9M_CONS_ESPORTES,
  ROUND(SUM(CASE WHEN Categoria = 'Esportes' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U12M_CONS_ESPORTES,
  ROUND(AVG(CASE WHEN Categoria = 'Esportes' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U3M_CONS_ESPORTES,
  ROUND(AVG(CASE WHEN Categoria = 'Esportes' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U6M_CONS_ESPORTES,
  ROUND(AVG(CASE WHEN Categoria = 'Esportes' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U9M_CONS_ESPORTES,
  ROUND(AVG(CASE WHEN Categoria = 'Esportes' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U12M_CONS_ESPORTES
FROM
  df_temp_flag_hist
GROUP
  BY `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_10.createOrReplaceTempView('df_temp_10')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_10.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_10.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_10.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 8

+----------+------------------------+------------------------+------------------------+-------------------------+------------------------+------------------------+------------------------+-------------------------+
|ID Cliente|VL_TOT_U3M_CONS_ESPORTES|VL_TOT_U6M_CONS_ESPORTES|VL_TOT_U9M_CONS_ESPORTES|VL_TOT_U12M_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U9M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|
+----------+------------------------+------------------------+------------------------+-------------------------+------------------------+------------------------+------------------------+-------------------------+
|1         |57.29                   |57.29                   |57.29                   |57.29                    |57.29                   |57.29                   |57.29                   |57.29                    |
|2         |57.35                   |57.35                   |169.25                  

**11. Total e Média do Valor Gasto em Livros:** Esta variável calcula o total e a média dos gastos do cliente na categoria "Livros" nos últimos três, seis, nove e doze meses.

In [76]:
df_temp_11 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(CASE WHEN Categoria = 'Livros' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U3M_CONS_LIVROS,
  ROUND(SUM(CASE WHEN Categoria = 'Livros' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U6M_CONS_LIVROS,
  ROUND(SUM(CASE WHEN Categoria = 'Livros' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U9M_CONS_LIVROS,
  ROUND(SUM(CASE WHEN Categoria = 'Livros' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U12M_CONS_LIVROS,
  ROUND(AVG(CASE WHEN Categoria = 'Livros' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U3M_CONS_LIVROS,
  ROUND(AVG(CASE WHEN Categoria = 'Livros' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U6M_CONS_LIVROS,
  ROUND(AVG(CASE WHEN Categoria = 'Livros' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U9M_CONS_LIVROS,
  ROUND(AVG(CASE WHEN Categoria = 'Livros' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U12M_CONS_LIVROS
FROM
  df_temp_flag_hist
GROUP
  BY `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_11.createOrReplaceTempView('df_temp_11')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_11.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_11.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_11.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 8

+----------+----------------------+----------------------+----------------------+-----------------------+----------------------+----------------------+----------------------+-----------------------+
|ID Cliente|VL_TOT_U3M_CONS_LIVROS|VL_TOT_U6M_CONS_LIVROS|VL_TOT_U9M_CONS_LIVROS|VL_TOT_U12M_CONS_LIVROS|VL_MED_U3M_CONS_LIVROS|VL_MED_U6M_CONS_LIVROS|VL_MED_U9M_CONS_LIVROS|VL_MED_U12M_CONS_LIVROS|
+----------+----------------------+----------------------+----------------------+-----------------------+----------------------+----------------------+----------------------+-----------------------+
|1         |193.01                |193.01                |193.01                |551.06                 |193.01                |193.01                |193.01                |183.69                 |
|2         |null                  |null                  |null                  |null                   |null                  |null                  

**12. Total e Média do Valor Gasto em Roupas:** Esta variável calcula o total e a média dos gastos do cliente na categoria "Roupas" nos últimos três, seis, nove e doze meses.

In [77]:
df_temp_12 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(SUM(CASE WHEN Categoria = 'Roupas' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U3M_CONS_ROUPAS,
  ROUND(SUM(CASE WHEN Categoria = 'Roupas' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U6M_CONS_ROUPAS,
  ROUND(SUM(CASE WHEN Categoria = 'Roupas' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U9M_CONS_ROUPAS,
  ROUND(SUM(CASE WHEN Categoria = 'Roupas' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_TOT_U12M_CONS_ROUPAS,
  ROUND(AVG(CASE WHEN Categoria = 'Roupas' AND ultimos_3_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U3M_CONS_ROUPAS,
  ROUND(AVG(CASE WHEN Categoria = 'Roupas' AND ultimos_6_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U6M_CONS_ROUPAS,
  ROUND(AVG(CASE WHEN Categoria = 'Roupas' AND ultimos_9_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U9M_CONS_ROUPAS,
  ROUND(AVG(CASE WHEN Categoria = 'Roupas' AND ultimos_12_meses_flag = 1 THEN Valor ELSE NULL END), 2) AS VL_MED_U12M_CONS_ROUPAS
FROM
  df_temp_flag_hist
GROUP
  BY `ID Cliente`
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_12.createOrReplaceTempView('df_temp_12')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_12.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_12.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_12.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 8

+----------+----------------------+----------------------+----------------------+-----------------------+----------------------+----------------------+----------------------+-----------------------+
|ID Cliente|VL_TOT_U3M_CONS_ROUPAS|VL_TOT_U6M_CONS_ROUPAS|VL_TOT_U9M_CONS_ROUPAS|VL_TOT_U12M_CONS_ROUPAS|VL_MED_U3M_CONS_ROUPAS|VL_MED_U6M_CONS_ROUPAS|VL_MED_U9M_CONS_ROUPAS|VL_MED_U12M_CONS_ROUPAS|
+----------+----------------------+----------------------+----------------------+-----------------------+----------------------+----------------------+----------------------+-----------------------+
|1         |192.81                |198.2                 |198.2                 |397.59                 |96.4                  |66.07                 |66.07                 |99.4                   |
|2         |null                  |null                  |null                  |null                   |null                  |null                  

<br>

## **Criação de variáveis de segunda camada.**

**13. Razão entre variáveis históricas (uma variável dividida pela outra) - Categoria Alimentos.**

In [78]:
df_temp_13 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(VL_TOT_U3M_CONS_ALIMENTOS/VL_TOT_U6M_CONS_ALIMENTOS, 2) AS VL_RAZ_TOT_U3M_U6M_CONS_ALI,
  ROUND(VL_TOT_U6M_CONS_ALIMENTOS/VL_TOT_U9M_CONS_ALIMENTOS, 2) AS VL_RAZ_TOT_U6M_U9M_CONS_ALI,
  ROUND(VL_TOT_U9M_CONS_ALIMENTOS/VL_TOT_U12M_CONS_ALIMENTOS, 2) AS VL_RAZ_TOT_U9M_U12M_CONS_ALI,
  ROUND(VL_MED_U3M_CONS_ALIMENTOS/VL_MED_U6M_CONS_ALIMENTOS, 2) AS VL_RAZ_MED_U3M_U6M_CONS_ALI,
  ROUND(VL_MED_U6M_CONS_ALIMENTOS/VL_MED_U9M_CONS_ALIMENTOS, 2) AS VL_RAZ_MED_U6M_U9M_CONS_ALI,
  ROUND(VL_MED_U9M_CONS_ALIMENTOS/VL_MED_U12M_CONS_ALIMENTOS, 2) AS VL_RAZ_MED_U9M_U12M_CONS_ALI
FROM
  df_temp_08
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_13.createOrReplaceTempView('df_temp_13')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_13.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_13.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_13.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 6

+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_RAZ_TOT_U3M_U6M_CONS_ALI|VL_RAZ_TOT_U6M_U9M_CONS_ALI|VL_RAZ_TOT_U9M_U12M_CONS_ALI|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U9M_CONS_ALI|VL_RAZ_MED_U9M_U12M_CONS_ALI|
+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|1         |null                       |1.0                        |1.0                         |null                       |1.0                        |1.0                         |
|2         |0.43                       |1.0                        |1.0                         |0.87                       |1.0                        |1.0                         |
|3         |null               

**14. Razão entre variáveis históricas (uma variável dividida pela outra) - Categoria Eletrônicos.**

In [79]:
df_temp_14 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(VL_TOT_U3M_CONS_ELETRONICOS/VL_TOT_U6M_CONS_ELETRONICOS, 2) AS VL_RAZ_TOT_U3M_U6M_CONS_ELE,
  ROUND(VL_TOT_U6M_CONS_ELETRONICOS/VL_TOT_U9M_CONS_ELETRONICOS, 2) AS VL_RAZ_TOT_U6M_U9M_CONS_ELE,
  ROUND(VL_TOT_U9M_CONS_ELETRONICOS/VL_TOT_U12M_CONS_ELETRONICOS, 2) AS VL_RAZ_TOT_U9M_U12M_CONS_ELE,
  ROUND(VL_MED_U3M_CONS_ELETRONICOS/VL_MED_U6M_CONS_ELETRONICOS, 2) AS VL_RAZ_MED_U3M_U6M_CONS_ELE,
  ROUND(VL_MED_U6M_CONS_ELETRONICOS/VL_MED_U9M_CONS_ELETRONICOS, 2) AS VL_RAZ_MED_U6M_U9M_CONS_ELE,
  ROUND(VL_MED_U9M_CONS_ELETRONICOS/VL_MED_U12M_CONS_ELETRONICOS, 2) AS VL_RAZ_MED_U9M_U12M_CONS_ELE
FROM
  df_temp_09
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_14.createOrReplaceTempView('df_temp_14')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_14.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_14.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_14.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 6

+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_RAZ_TOT_U3M_U6M_CONS_ELE|VL_RAZ_TOT_U6M_U9M_CONS_ELE|VL_RAZ_TOT_U9M_U12M_CONS_ELE|VL_RAZ_MED_U3M_U6M_CONS_ELE|VL_RAZ_MED_U6M_U9M_CONS_ELE|VL_RAZ_MED_U9M_U12M_CONS_ELE|
+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|1         |1.0                        |0.6                        |1.0                         |1.0                        |0.9                        |1.0                         |
|2         |null                       |1.0                        |1.0                         |null                       |1.0                        |1.0                         |
|3         |1.0                

**15. Razão entre variáveis históricas (uma variável dividida pela outra) - Categoria Esportes.**

In [80]:
df_temp_15 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(VL_TOT_U3M_CONS_ESPORTES/VL_TOT_U6M_CONS_ESPORTES, 2) AS VL_RAZ_TOT_U3M_U6M_CONS_ESP,
  ROUND(VL_TOT_U6M_CONS_ESPORTES/VL_TOT_U9M_CONS_ESPORTES, 2) AS VL_RAZ_TOT_U6M_U9M_CONS_ESP,
  ROUND(VL_TOT_U9M_CONS_ESPORTES/VL_TOT_U12M_CONS_ESPORTES, 2) AS VL_RAZ_TOT_U9M_U12M_CONS_ESP,
  ROUND(VL_MED_U3M_CONS_ESPORTES/VL_MED_U6M_CONS_ESPORTES, 2) AS VL_RAZ_MED_U3M_U6M_CONS_ESP,
  ROUND(VL_MED_U6M_CONS_ESPORTES/VL_MED_U9M_CONS_ESPORTES, 2) AS VL_RAZ_MED_U6M_U9M_CONS_ESP,
  ROUND(VL_MED_U9M_CONS_ESPORTES/VL_MED_U12M_CONS_ESPORTES, 2) AS VL_RAZ_MED_U9M_U12M_CONS_ESP
FROM
  df_temp_10
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_15.createOrReplaceTempView('df_temp_15')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_15.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_15.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_15.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 6

+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_RAZ_TOT_U3M_U6M_CONS_ESP|VL_RAZ_TOT_U6M_U9M_CONS_ESP|VL_RAZ_TOT_U9M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U9M_CONS_ESP|VL_RAZ_MED_U9M_U12M_CONS_ESP|
+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|1         |1.0                        |1.0                        |1.0                         |1.0                        |1.0                        |1.0                         |
|2         |1.0                        |0.34                       |1.0                         |1.0                        |0.68                       |1.0                         |
|3         |1.0                

**16. Razão entre variáveis históricas (uma variável dividida pela outra) - Categoria Livros.**

In [81]:
df_temp_16 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(VL_TOT_U3M_CONS_LIVROS/VL_TOT_U6M_CONS_LIVROS, 2) AS VL_RAZ_TOT_U3M_U6M_CONS_LIV,
  ROUND(VL_TOT_U6M_CONS_LIVROS/VL_TOT_U9M_CONS_LIVROS, 2) AS VL_RAZ_TOT_U6M_U9M_CONS_LIV,
  ROUND(VL_TOT_U9M_CONS_LIVROS/VL_TOT_U12M_CONS_LIVROS, 2) AS VL_RAZ_TOT_U9M_U12M_CONS_LIV,
  ROUND(VL_MED_U3M_CONS_LIVROS/VL_MED_U6M_CONS_LIVROS, 2) AS VL_RAZ_MED_U3M_U6M_CONS_LIV,
  ROUND(VL_MED_U6M_CONS_LIVROS/VL_MED_U9M_CONS_LIVROS, 2) AS VL_RAZ_MED_U6M_U9M_CONS_LIV,
  ROUND(VL_MED_U9M_CONS_LIVROS/VL_MED_U12M_CONS_LIVROS, 2) AS VL_RAZ_MED_U9M_U12M_CONS_LIV
FROM
  df_temp_11
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_16.createOrReplaceTempView('df_temp_16')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_16.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_16.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_16.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 6

+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_RAZ_TOT_U3M_U6M_CONS_LIV|VL_RAZ_TOT_U6M_U9M_CONS_LIV|VL_RAZ_TOT_U9M_U12M_CONS_LIV|VL_RAZ_MED_U3M_U6M_CONS_LIV|VL_RAZ_MED_U6M_U9M_CONS_LIV|VL_RAZ_MED_U9M_U12M_CONS_LIV|
+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|1         |1.0                        |1.0                        |0.35                        |1.0                        |1.0                        |1.05                        |
|2         |null                       |null                       |null                        |null                       |null                       |null                        |
|3         |null               

**17. Razão entre variáveis históricas (uma variável dividida pela outra) - Categoria Roupas.**

In [82]:
df_temp_17 = spark.sql('''

SELECT
  `ID Cliente`,
  ROUND(VL_TOT_U3M_CONS_ROUPAS/VL_TOT_U6M_CONS_ROUPAS, 2) AS VL_RAZ_TOT_U3M_U6M_CONS_ROU,
  ROUND(VL_TOT_U6M_CONS_ROUPAS/VL_TOT_U9M_CONS_ROUPAS, 2) AS VL_RAZ_TOT_U6M_U9M_CONS_ROU,
  ROUND(VL_TOT_U9M_CONS_ROUPAS/VL_TOT_U12M_CONS_ROUPAS, 2) AS VL_RAZ_TOT_U9M_U12M_CONS_ROU,
  ROUND(VL_MED_U3M_CONS_ROUPAS/VL_MED_U6M_CONS_ROUPAS, 2) AS VL_RAZ_MED_U3M_U6M_CONS_ROU,
  ROUND(VL_MED_U6M_CONS_ROUPAS/VL_MED_U9M_CONS_ROUPAS, 2) AS VL_RAZ_MED_U6M_U9M_CONS_ROU,
  ROUND(VL_MED_U9M_CONS_ROUPAS/VL_MED_U12M_CONS_ROUPAS, 2) AS VL_RAZ_MED_U9M_U12M_CONS_ROU
FROM
  df_temp_12
ORDER BY
  `ID Cliente`

''')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_17.createOrReplaceTempView('df_temp_17')

# Checando o número de linhas desse DataFrame.
print(f'Total de Linhas: {df_temp_17.count()}\n')

# Contando o número de novas variáveis criadas.
print(f'Total de Variáveis Criadas: {len(df_temp_17.columns)-1}\n')

# Mostrando o novo DataFrame.
df_temp_17.show(10, False)

Total de Linhas: 1000

Total de Variáveis Criadas: 6

+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_RAZ_TOT_U3M_U6M_CONS_ROU|VL_RAZ_TOT_U6M_U9M_CONS_ROU|VL_RAZ_TOT_U9M_U12M_CONS_ROU|VL_RAZ_MED_U3M_U6M_CONS_ROU|VL_RAZ_MED_U6M_U9M_CONS_ROU|VL_RAZ_MED_U9M_U12M_CONS_ROU|
+----------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+----------------------------+
|1         |0.97                       |1.0                        |0.5                         |1.46                       |1.0                        |0.66                        |
|2         |null                       |null                       |null                        |null                       |null                       |null                        |
|3         |null               

<br>

##**Checando a quantidade de variáveis novas criadas.**

In [83]:
# Lista com os nomes dos DataFrames.
view_names = ['df_temp_01', 'df_temp_02', 'df_temp_03', 'df_temp_04', 'df_temp_05', 'df_temp_06', 'df_temp_07', 'df_temp_08', 'df_temp_09', 'df_temp_10', 'df_temp_11', 'df_temp_12', 'df_temp_13', 'df_temp_14', 'df_temp_15', 'df_temp_16', 'df_temp_17']

# Inicializando a variável de soma.
resultado_soma = 0

# Iterando sobre os nomes dos DataFrames.
for nome_df in view_names:
    # Obtendo o DataFrame usando globals().
    df_atual = globals()[nome_df]

    # Calculando o resultado e adicionando à soma.
    resultado_soma += len(df_atual.columns) - 1

# Exibindo o resultado.
print('Quantidade de Variáveis Novas Criadas:', resultado_soma)

Quantidade de Variáveis Novas Criadas: 90


<br>

## **Criação e tratamento da Tabela Analítica de Modelagem (ABT).**

**Juntando todas as variáveis criadas, em um único DataFrame.**

In [84]:
from functools import reduce

# Lista com os nomes das views temporárias.
view_names = ['df_temp_01', 'df_temp_02', 'df_temp_03', 'df_temp_04', 'df_temp_05', 'df_temp_06', 'df_temp_07', 'df_temp_08', 'df_temp_09', 'df_temp_10', 'df_temp_11', 'df_temp_12', 'df_temp_13', 'df_temp_14', 'df_temp_15', 'df_temp_16', 'df_temp_17']

# Criando um DataFrame inicial.
df_temp_combined = spark.table(view_names[0])

# Iterarando sobre as views temporárias e realizar joins sucessivos.
for name in view_names[1:]:
    df_temp_combined = df_temp_combined.join(spark.table(name), 'ID Cliente', 'inner')

# Esse código cria ou substitui uma view temporária no Apache Spark.
df_temp_combined.createOrReplaceTempView('df_temp_combined')

# Checando o número de linhas desse DataFrame.
num_rows = df_temp_combined.count()
print(f'Número de Linhas: {num_rows}\n')

# Contando o número total de novas variáveis criadas.
num_columns = len(df_temp_combined.columns)-1
print(f'Número de Variáveis Novas Criadas: {num_columns}\n')

# Exibindo o novo DataFrame resultante do join.
df_temp_combined.show(10, False)

Número de Linhas: 1000

Número de Variáveis Novas Criadas: 90

+----------+---------------+--------------------+--------------------------+---------------+------------+------------+------------+------------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+------------------------+------------------------+------------------------+-------------------------+-------

**Juntando as variáveis explicativas com a base de público.**

In [85]:
# Fazendo um Left Join para incluir as variáveis explicativas que foram criadas, na tabela de público.
abt_temp_01 = spark.sql('''

SELECT
  pub.*,
  com.*,
  date_format(CURRENT_DATE, 'yyyyMM') AS PK_DATREF,
  CURRENT_DATE AS PK_DAT_PROC
FROM
  df_publico AS pub
  left join
  df_temp_combined AS com
  on
  pub.ID = com.`ID Cliente`
ORDER BY
  ID

''')

# Contando o número de linhas dessa ABT.
# Importante fazer essa checagem (check de sanidade) para ver se o número de linhas bate com o número de linhas da tabela de público original. Se não bater tem algo de errado com a chave.
print(f'Total de Linhas: {abt_temp_01.count()}\n')

# Mostrando a ABT.
abt_temp_01.show(10, False)

Total de Linhas: 1000

+---+-----+------+----------------------+------------+-------------+-----+----------+---------------+--------------------+--------------------------+---------------+------------+------------+------------+------------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+------------------------+------------------------+------------------------+

**Renomeando campos com nomes fora do padrão.**
- Retirar espaços.
- Retirar caracteres especiais.
- Retirar qualquer coisa "estranha".
- Padronizar os nomes das features para que fiquem intuitivos.

In [86]:
# Dicionário de renomeação.
rename_dict = {
    'ID': 'ID_CLIENTE',
    'Idade': 'IDADE',
    'Gênero': 'NM_GENERO',
    'Dias desde a Inscrição': 'QT_DIAS_DESDE_INSCR',
    'Usou Suporte': 'FL_USOU_SUPORTE',
    'Plano': 'PLANO',
    'Churn': 'CHURN',
}

# Aplicando renomeações com SPARK puro.
for old_name, new_name in rename_dict.items():
    abt_temp_01 = abt_temp_01.withColumnRenamed(old_name, new_name)

In [87]:
abt_temp_01.show(10, False)

+----------+-----+---------+-------------------+---------------+-------------+-----+----------+---------------+--------------------+--------------------------+---------------+------------+------------+------------+------------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+------------------------+------------------------+------------------------+-------------

**Dropando a coluna ID Cliente porque ela já está sendo representada por ID_CLIENTE.**

In [88]:
# Descartando a coluna "ID Cliente".
abt_churn = abt_temp_01.drop('ID Cliente')
abt_churn.count()

1000

In [89]:
abt_churn.show()

+----------+-----+---------+-------------------+---------------+-------------+-----+---------------+--------------------+--------------------------+---------------+------------+------------+------------+------------+----------------------+----------------------+------------------------+------------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+-------------------------+-------------------------+-------------------------+--------------------------+-------------------------+-------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------+---------------------------+----------------------------+------------------------+------------------------+------------------------+------------------------

**Salvando a tabela no diretório do Drive.**

In [90]:
# Salvando um diretório ABT com as partições em parquet.
abt_churn.write.partitionBy('PK_DATREF').parquet('/content/drive/MyDrive/abt_churn')