# Neste notebook vamos aprender a:

- Ler dados com spark
- Habilitar uso do Spark.SQL
- Criar variáveis explicativas para Machine Learning
- Montar uma Tabela Analítica de Modelagem (ABT)
- Gerar amostra para estudos e modelagem
- Salvar dados

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

#### Lendo dados com Spark

In [4]:
df_transacoes = spark.read.csv("/content/drive/Shareddrives/PoD Academy/Cursos/Formação em Ciência de Dados/dados/sinteticos/base_transacoes.csv",
                               header=True,
                               inferSchema=True)

## Habilitando uso do SparkSQL
df_transacoes.createOrReplaceTempView("df_transacoes")

In [5]:
df_publico = spark.read.csv("/content/drive/Shareddrives/PoD Academy/Cursos/Formação em Ciência de Dados/dados/sinteticos/base_churn.csv", header=True, inferSchema=True)
df_publico.createOrReplaceTempView("df_publico")

In [6]:
df_publico.show(10)

+---+-----+------+----------------------+------------+-------------+-----+
| ID|Idade|Gênero|Dias desde a Inscrição|Usou Suporte|        Plano|Churn|
+---+-----+------+----------------------+------------+-------------+-----+
|  1|   21|     F|                  1331|           1|Intermediário|    1|
|  2|   21|     M|                  1160|           0|Intermediário|    0|
|  3|   62|     M|                   454|           1|       Básico|    0|
|  4|   64|     M|                   226|           1|Intermediário|    0|
|  5|   61|     M|                   474|           1|     Avançado|    0|
|  6|   18|     M|                   419|           0|       Básico|    0|
|  7|   52|     M|                  1334|           0|       Básico|    0|
|  8|   44|     M|                  1124|           1|Intermediário|    0|
|  9|   52|     M|                  1256|           1|Intermediário|    0|
| 10|   64|     F|                  1197|           0|       Básico|    0|
+---+-----+------+-------

In [11]:
df_transacoes.show(5,False)

+------------+----------+-----------------------------+------------------+-----------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |
+------------+----------+-----------------------------+------------------+-----------+
|1           |1         |2022-11-25 13:50:26.548672560|57.287427536330505|Esportes   |
|2           |1         |2020-01-19 12:27:36.637168141|97.07199340552512 |Alimentos  |
|3           |1         |2021-12-28 12:33:58.938053096|169.10581012381087|Livros     |
|4           |1         |2022-02-05 01:39:49.380530968|199.38694865538451|Roupas     |
|5           |1         |2022-11-16 23:06:54.159292032|160.00228343317622|Eletrônicos|
+------------+----------+-----------------------------+------------------+-----------+
only showing top 5 rows



#### Criando flags de janela para histórico:
- últimos 3 meses
- últimos 6 meses
- últimos 12 meses

In [55]:
df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -90) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_3_meses_flag,
    CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -180) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_6_meses_flag,
    CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -365) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_12_meses_flag
FROM df_transacoes
ORDER BY `ID Cliente`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.count()

10171

In [56]:
df_temp_01.show(10,False)

+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+---------------------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |ultimos_3_meses_flag|ultimos_6_meses_flag|ultimos_12_meses_flag|
+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+---------------------+
|10          |1         |2020-09-06 21:37:41.946902652|56.524638713138636|Alimentos  |0                   |0                   |0                    |
|9           |1         |2022-11-02 12:50:58.407079648|193.00568791656067|Livros     |1                   |1                   |1                    |
|4           |1         |2022-02-05 01:39:49.380530968|199.38694865538451|Roupas     |0                   |0                   |1                    |
|8           |1         |2020-10-06 12:14:52.035398228|53.20607404593595 |Roupas     |0       

#### Criando variáveis explicativas de primeira camada
Apenas algumas variáveis foram criadas, a sugestão é criar muito mais:
- use média, mínimo, soma, máximo
- use os outros tipos de consumo


In [58]:
df_temp_02 = spark.sql("""
select
    `ID Cliente`,
    round(sum(Valor),2) as VL_TOT_CONSUMO,
    round(avg(Valor),2) as VL_MED_CONSUMO,
    round(max(Valor),2) as VL_MAX_CONSUMO,
    round(min(Valor),2) as VL_MIN_CONSUMO,
    round(sum(case when Categoria = 'Esportes' then Valor else 0 end),2) as VL_TOT_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' then Valor else NULL end),2) as VL_MED_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_ESPORTES,
    round(avg(case when Categoria = 'Alimentos' then Valor else NULL end),2) as VL_MED_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_Alimentos
from df_temp_01
group by `ID Cliente`
order by `ID Cliente`
""")

df_temp_02.createOrReplaceTempView("df_temp_02")
df_temp_02.count()

1000

In [59]:
df_transacoes.show()

+------------+----------+--------------------+------------------+-----------+
|ID Transação|ID Cliente|                Data|             Valor|  Categoria|
+------------+----------+--------------------+------------------+-----------+
|           1|         1|2022-11-25 13:50:...|57.287427536330505|   Esportes|
|           2|         1|2020-01-19 12:27:...| 97.07199340552512|  Alimentos|
|           3|         1|2021-12-28 12:33:...|169.10581012381087|     Livros|
|           4|         1|2022-02-05 01:39:...|199.38694865538451|     Roupas|
|           5|         1|2022-11-16 23:06:...|160.00228343317622|Eletrônicos|
|           6|         1|2020-04-25 12:42:...| 9.842481270765422|  Alimentos|
|           7|         1|2022-10-31 22:05:...| 76.90706330227667|Eletrônicos|
|           8|         1|2020-10-06 12:14:...| 53.20607404593595|     Roupas|
|           9|         1|2022-11-02 12:50:...|193.00568791656067|     Livros|
|          10|         1|2020-09-06 21:37:...|56.524638713138636

In [60]:
df_temp_02.show()

+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+
|ID Cliente|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|
+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+
|         1|       1976.56|        116.27|        199.39|          5.39|              226.73|              113.37|          

#### Criando variáveis explicativas de segunda camada
- Razão entre variáveis históricas

In [61]:
df_temp_03 = spark.sql("""
Select
*,
round(VL_MED_U3M_CONS_ESPORTES/VL_MED_U6M_CONS_ESPORTES,2) as VL_RAZ_MED_U3M_U6M_CONS_ESP,
round(VL_MED_U6M_CONS_ESPORTES/VL_MED_U12M_CONS_ESPORTES,2) as VL_RAZ_MED_U6M_U12M_CONS_ESP,
round(VL_MED_U3M_CONS_Alimentos/VL_MED_U6M_CONS_Alimentos,2) as VL_RAZ_MED_U3M_U6M_CONS_ALI,
round(VL_MED_U6M_CONS_Alimentos/VL_MED_U12M_CONS_Alimentos,2) as VL_RAZ_MED_U6M_U12M_CONS_ALI
from df_temp_02

""")

df_temp_03.createOrReplaceTempView("df_temp_03")
df_temp_03.count()

1000

In [62]:
df_temp_03.show()

+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+---------------------------+----------------------------+---------------------------+----------------------------+
|ID Cliente|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U12M_CONS_ALI|
+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+------------------------

In [63]:
df_publico.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Idade: integer (nullable = true)
 |-- Gênero: string (nullable = true)
 |-- Dias desde a Inscrição: integer (nullable = true)
 |-- Usou Suporte: integer (nullable = true)
 |-- Plano: string (nullable = true)
 |-- Churn: integer (nullable = true)



##### Trazer as variáveis explicativas para a base de público

In [64]:
abt_temp_01 = spark.sql("""
    Select
        pub.*,
        b.*,
        date_format(CURRENT_DATE, 'yyyyMM') as PK_DATREF,
        CURRENT_DATE as PK_DAT_PROC
    from
        df_publico as pub
    left join
        df_temp_03 as b
    on
        pub.ID = b.`ID Cliente`

""")

abt_temp_01.count()

1000

In [65]:
abt_temp_01.show()

+---+-----+------+----------------------+------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+---------------------------+----------------------------+---------------------------+----------------------------+---------+-----------+
| ID|Idade|Gênero|Dias desde a Inscrição|Usou Suporte|        Plano|Churn|ID Cliente|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U12M_CONS_ALI|PK_DATREF|PK_DAT_PROC

#### Renomeando campos com nomes fora do padrão
- retirar espaços
- retirar caracteres especiais
- retirar qualquer coisa "estranha"
- padronizar os nomes das features para que fiquem intuitivos

In [71]:
# Dicionário de renomeação
rename_dict = {
    "Gênero": "NM_GENERO",
    "Dias desde a Inscrição": "QT_DIAS_DESDE_INSCR",
    "Usou Suporte": "FL_USOU_SUPORTE",
    "ID Cliente": "ID_CLIENTE",
}

# Aplicando renomeações
for old_name, new_name in rename_dict.items():
    abt_temp_01 = abt_temp_01.withColumnRenamed(old_name, new_name)

In [72]:
abt_temp_01.show()

+---+-----+---------+-------------------+---------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+---------------------------+----------------------------+---------------------------+----------------------------+---------+-----------+
| ID|Idade|NM_GENERO|QT_DIAS_DESDE_INSCR|FL_USOU_SUPORTE|        Plano|Churn|ID_CLIENTE|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U12M_CONS_ALI|PK_DATREF|PK_DA

#### Dropar a coluna ID porque ela já esta sendo representada por ID_CLIENTE

In [73]:
# Descartando as colunas "idade" e "cidade"
abt = abt_temp_01.drop("ID")
abt.count()

1000

In [74]:
abt.show()

+-----+---------+-------------------+---------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+---------------------------+----------------------------+---------------------------+----------------------------+---------+-----------+
|Idade|NM_GENERO|QT_DIAS_DESDE_INSCR|FL_USOU_SUPORTE|        Plano|Churn|ID_CLIENTE|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U12M_CONS_ALI|PK_DATREF|PK_DAT_PROC|


#### Salvando tabela no diretório do Drive

In [75]:
abt.write.partitionBy("PK_DATREF").parquet("/content/drive/Shareddrives/PoD Academy/Cursos/Formação em Ciência de Dados/dados/sinteticos/ABT")

#### Gerando amostra aleatória

In [76]:
# Amostra sem reposição: 10% da população total
sampled_abt = abt.sample(False, 0.1)
sampled_abt.count()

98

In [78]:
sampled_abt.show()

+-----+---------+-------------------+---------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+-------------------------+-------------------------+--------------------------+---------------------------+----------------------------+---------------------------+----------------------------+---------+-----------+
|Idade|NM_GENERO|QT_DIAS_DESDE_INSCR|FL_USOU_SUPORTE|        Plano|Churn|ID_CLIENTE|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MED_U3M_CONS_ESPORTES|VL_MED_U6M_CONS_ESPORTES|VL_MED_U12M_CONS_ESPORTES|VL_MED_CONS_Alimentos|VL_MED_U3M_CONS_Alimentos|VL_MED_U6M_CONS_Alimentos|VL_MED_U12M_CONS_Alimentos|VL_RAZ_MED_U3M_U6M_CONS_ESP|VL_RAZ_MED_U6M_U12M_CONS_ESP|VL_RAZ_MED_U3M_U6M_CONS_ALI|VL_RAZ_MED_U6M_U12M_CONS_ALI|PK_DATREF|PK_DAT_PROC|


In [80]:
abt.count()

1000