In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


<br>
<br>
<br>
<br>

# **Feature Engineering: POS_CASH_balance**

A Feature Engineering é um processo fundamental para ciência de dados. Ela se refere ao processo de criação e transformação de variáveis para melhorar o desempenho dos modelos de Machine Learning.
<br>
<br>
Nesse notebook vamos trabalhar na criação de variáveis para o dataset **`POS_CASH_balance`**. Esse conjunto de dados refere-se a dados relacionados aos pagamentos feitos com cartão de crédito ou débito em maquininhas (POS) ou a empréstimos em dinheiro que foram feitos na PoD Bank.
<br>
<br>
O histórico de pagamentos significa a informação sobre como uma pessoa tem lidado com seus pagamentos no passado. Se ela costuma pagar suas contas em dia, isso é um bom histórico. Se atrasa frequentemente, é um histórico não tão bom.
<br>
<br>
Por se tratar de criação de variáveis, nessa etapa geralmente trabalhamos com muito volume de dados e grande demanda de processamento, por essa razão iremos utilizar o **Spark** e **Spark.SQL** para essa etapa.

<br>

## **Pacotes e Bibliotecas**

In [None]:
# Instalando o PySpark.
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Importando bibliotecas.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, sum, avg, max, min, when, countDistinct, count, date_format, current_date
import os
import sys

In [None]:
# Configurando a sessão Spark.
spark = SparkSession.builder \
    .appName('app_spark') \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '4g') \
    .getOrCreate()

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

<br>
<br>
<br>
<br>

## **Funções**

In [None]:
# Função para substituir espaço por underline e colocar a palavra em letras maiúsculas.
import re

def tratamento_palavra(palavra):
    palavra_transformada = re.sub(r'\s', '_', palavra).upper()

    return palavra_transformada

<br>
<br>
<br>
<br>

## **Leitura e visualização do conjunto de dados**

In [None]:
# Lendo nosso conjunto de dados e criando um DataFrame no Spark.
dados = spark.read.csv('/content/drive/MyDrive/Projetos_Big_Data_Analytics/Ciencia_de_Dados/Etapa_Modelagem_Credito/pod-academy-analise-de-credito-para-fintech/database/POS_CASH_balance.csv', header=True, inferSchema=True)
dados.createOrReplaceTempView('dados')

# Mostra os dados.
dados.show()

+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|SK_ID_PREV|SK_ID_CURR|MONTHS_BALANCE|CNT_INSTALMENT|CNT_INSTALMENT_FUTURE|NAME_CONTRACT_STATUS|SK_DPD|SK_DPD_DEF|
+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|   1803195|    182943|           -31|          48.0|                 45.0|              Active|     0|         0|
|   1715348|    367990|           -33|          36.0|                 35.0|              Active|     0|         0|
|   1784872|    397406|           -32|          12.0|                  9.0|              Active|     0|         0|
|   1903291|    269225|           -35|          48.0|                 42.0|              Active|     0|         0|
|   2341044|    334279|           -35|          36.0|                 35.0|              Active|     0|         0|
|   2207092|    342166|           -32|          12.0|                 12.0|     

In [None]:
# Checando a quantidade de linhas e colunas do DataFrame.

# Quantidade de linhas.
num_rows = dados.count()

# Quantidade de colunas.
num_columns = len(dados.columns)

# Quantidade de IDs únicos.
num_id_prev = dados.select(countDistinct('SK_ID_PREV').alias('num_id_prev')).collect()[0]['num_id_prev']

# Imprimir o resultado.
print(f'Quantidade de linhas do DataFrame: {num_rows}')
print(f'Quantidade de colunas do DataFrame: {num_columns}')
print(f'Quantidade de "SK_ID_PREV" únicos do DataFrame: {num_id_prev}')

Quantidade de linhas do DataFrame: 10001358
Quantidade de colunas do DataFrame: 8
Quantidade de "SK_ID_PREV" únicos do DataFrame: 936325


In [None]:
# Exibindo a estrutura do schema do DataFrame.
dados.printSchema()

root
 |-- SK_ID_PREV: integer (nullable = true)
 |-- SK_ID_CURR: integer (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- CNT_INSTALMENT: double (nullable = true)
 |-- CNT_INSTALMENT_FUTURE: double (nullable = true)
 |-- NAME_CONTRACT_STATUS: string (nullable = true)
 |-- SK_DPD: integer (nullable = true)
 |-- SK_DPD_DEF: integer (nullable = true)



In [None]:
# Extraindo os valores únicos da coluna 'NAME_CONTRACT_STATUS'.
status = dados.select('NAME_CONTRACT_STATUS').distinct().rdd.flatMap(lambda x: x).collect()

# Aplicando a função de tratamento de palavra a cada valor único.
nova_lista = [tratamento_palavra(status) for status in status]

# Exibindo a lista resultante.
print(nova_lista)

['DEMAND', 'APPROVED', 'COMPLETED', 'RETURNED_TO_THE_STORE', 'XNA', 'ACTIVE', 'SIGNED', 'CANCELED', 'AMORTIZED_DEBT']


<br>
<br>
<br>
<br>

## **Criação de flags**

A criação de flags binárias 1 ou 0 será útil para nos auxiliar na visão temporal dos dados e na criação das variáveis.
<br>
<br>
Serão criadas flags para os últimos 3, 6, 12, 24 e 36 meses.

In [None]:
df_temp_01 = spark.sql('''

SELECT
  *,
  CASE
    WHEN MONTHS_BALANCE >=  -3 THEN 1 ELSE 0 END AS U3M,
  CASE
    WHEN MONTHS_BALANCE >=  -6 THEN 1 ELSE 0 END AS U6M,
  CASE
    WHEN MONTHS_BALANCE >= -12 THEN 1 ELSE 0 END AS U12M,
  CASE
    WHEN MONTHS_BALANCE >= -24 THEN 1 ELSE 0 END AS U24M,
  CASE
    WHEN MONTHS_BALANCE >= -36 THEN 1 ELSE 0 END AS U36M
FROM
  dados
ORDER BY
  `SK_ID_PREV`;

''')
df_temp_01.createOrReplaceTempView('df_temp_01')
df_temp_01.show()

+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+---+---+----+----+----+
|SK_ID_PREV|SK_ID_CURR|MONTHS_BALANCE|CNT_INSTALMENT|CNT_INSTALMENT_FUTURE|NAME_CONTRACT_STATUS|SK_DPD|SK_DPD_DEF|U3M|U6M|U12M|U24M|U36M|
+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+---+---+----+----+----+
|   1000001|    158271|            -8|           2.0|                  0.0|           Completed|     0|         0|  0|  0|   1|   1|   1|
|   1000001|    158271|            -9|          12.0|                 11.0|              Active|     0|         0|  0|  0|   1|   1|   1|
|   1000001|    158271|           -10|          12.0|                 12.0|              Active|     0|         0|  0|  0|   1|   1|   1|
|   1000002|    101962|           -52|           6.0|                  2.0|              Active|     0|         0|  0|  0|   0|   0|   0|
|   1000002|    101962|           

<br>
<br>
<br>
<br>

## **Criação de variáveis explicativas**

Vamos criar as variáveis explicativas com base nas variáveis já existentes, usando a sumarização para as janelas temporais criadas na etapa anterior. Já iremos também agregar na granularidade indivíduo usando o ID **`SK_ID_PREV`**, para que depois a gente possa fazer o join com a tabela de **previous_application**.

### **Variáveis Explicativas Gerais**

In [None]:
# Definindo as colunas para a agregação.
colunas_agregacao_total = df_temp_01.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_PREV')
colunas_agregacao_total.remove('MONTHS_BALANCE')
colunas_agregacao_total.remove('NAME_CONTRACT_STATUS')

# Defindo a lista de colunas de flags.
colunas_flags = ['U3M', 'U6M', 'U12M', 'U24M', 'U36M']

# Criando uma lista vazia.
expressoes_agregacao = []

# Iterando sobre as colunas e criando as variáveis explicativas com as agregações.
for coluna in colunas_agregacao_total:
  # Verifica se a coluna atual não é uma coluna de flag.
  if not any(flag in coluna for flag in colunas_flags):
    for flag in colunas_flags:
      if 'DPD' in coluna:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_TOT_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MED_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MAX_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MIN_{coluna.upper()}_{flag.upper()}_POS_CASH'))
      else:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_TOT_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MED_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MAX_{coluna.upper()}_{flag.upper()}_POS_CASH'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MIN_{coluna.upper()}_{flag.upper()}_POS_CASH'))

# Criando uma tupla com as variáveis criadas.
expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicando as expressões de agregação.
df_temp_02 = df_temp_01.groupBy('SK_ID_PREV').agg(*expressoes_agregacao).orderBy('SK_ID_PREV')


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_02.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_02.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_02.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_02.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
df_temp_02.show(20, False)

Quantidade Total de Variáveis Criadas: 80
Nomes das Variáveis Criadas: ['VL_TOT_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U3M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U6M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U12M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U24M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U36M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MED_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MIN_CNT_INSTALMEN

<br>

### **Variáveis Explicativas com base no "NAME_CONTRACT_STATUS"**

In [None]:
# Definindo as colunas para agregação.
colunas_agregacao_total = df_temp_01.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_PREV')
colunas_agregacao_total.remove('MONTHS_BALANCE')
colunas_agregacao_total.remove('NAME_CONTRACT_STATUS')

# Defindo a lista de colunas de flags.
colunas_flags = ['U3M', 'U6M', 'U12M', 'U24M', 'U36M']

# Criando uma lista vazia.
expressoes_agregacao = []

# Iterando sobre as colunas e criando as variáveis explicativas com as agregações.
for categoria in nova_lista:
  for coluna in colunas_agregacao_total:
    # Verifica se a coluna atual não é uma coluna de flag.
    if not any(flag in coluna for flag in colunas_flags):
      for flag in colunas_flags:
        if 'DPD' in coluna:
          expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_TOT_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MED_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MAX_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MIN_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
        else:
          expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_TOT_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MED_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MAX_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))
          expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MIN_{coluna.upper()}_{flag.upper()}_{categoria}_POS_CASH'))

# Criando uma tupla com as variáveis criadas.
expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicando as expressões de agregação.
df_temp_03 = df_temp_01.groupBy('SK_ID_PREV').agg(*expressoes_agregacao).orderBy('SK_ID_PREV')


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_03.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_03.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_03.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_03.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
df_temp_03.show(20, False)

Quantidade Total de Variáveis Criadas: 720
Nomes das Variáveis Criadas: ['VL_TOT_CNT_INSTALMENT_U3M_DEMAND_POS_CASH', 'VL_MED_CNT_INSTALMENT_U3M_DEMAND_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U3M_DEMAND_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U3M_DEMAND_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U6M_DEMAND_POS_CASH', 'VL_MED_CNT_INSTALMENT_U6M_DEMAND_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U6M_DEMAND_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U6M_DEMAND_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U12M_DEMAND_POS_CASH', 'VL_MED_CNT_INSTALMENT_U12M_DEMAND_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U12M_DEMAND_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U12M_DEMAND_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U24M_DEMAND_POS_CASH', 'VL_MED_CNT_INSTALMENT_U24M_DEMAND_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U24M_DEMAND_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U24M_DEMAND_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U36M_DEMAND_POS_CASH', 'VL_MED_CNT_INSTALMENT_U36M_DEMAND_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U36M_DEMAND_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U36M_DEMAND_POS_CASH', 'VL_TOT_CNT_INS

<br>

### **Join das tabelas criadas**

In [None]:
# Fazendo o join das duas tabelas criadas.
df_temp_04 = df_temp_02.join(df_temp_03, 'SK_ID_PREV')

# Adicionando as colunas de data ao DataFrame.
df_temp_04 = df_temp_04.withColumn('PK_DATREF', date_format(current_date(), 'yyyyMMdd')) \
                       .withColumn('PK_DAT_PROC', current_date())


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_04.columns
nomes_cols_novas = nomes_cols[1:-2]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_04.columns) - 3)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_04.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_04.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
df_temp_04.show(20, False)

Quantidade Total de Variáveis Criadas: 800
Nomes das Variáveis Criadas: ['VL_TOT_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U3M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U3M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U6M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U6M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U12M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U12M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U24M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U24M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MED_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_U36M_POS_CASH', 'VL_MIN_CNT_INSTALMENT_U36M_POS_CASH', 'VL_TOT_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MED_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MAX_CNT_INSTALMENT_FUTURE_U3M_POS_CASH', 'VL_MIN_CNT_INSTALME

<br>
<br>
<br>
<br>

## **Salvando a tabela sumarizada**

In [None]:
df_temp_04.write.mode('overwrite').parquet('/content/drive/MyDrive/Projetos_Big_Data_Analytics/Ciencia_de_Dados/Etapa_Modelagem_Credito/pod-academy-analise-de-credito-para-fintech/feature_engineering/book_vars/book_pos_cash_balance')