### Bibliotecas e pacotes

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, sum, avg, max, min, when, countDistinct, count, date_format, current_date

In [2]:
#configurando sessão
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Instalments") \
        .master("local[*]") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

### Utilizando spark para ler os dados

In [4]:
dados_instalments = spark.read.csv("../database/raw/installments_payments.csv", header=True, inferSchema=True)
dados_instalments.createOrReplaceTempView("dados_instalments")

In [5]:
dados_instalments.show(5)

+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+
|SK_ID_PREV|SK_ID_CURR|NUM_INSTALMENT_VERSION|NUM_INSTALMENT_NUMBER|DAYS_INSTALMENT|DAYS_ENTRY_PAYMENT|AMT_INSTALMENT|AMT_PAYMENT|
+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+
|   1054186|    161674|                   1.0|                    6|        -1180.0|           -1187.0|       6948.36|    6948.36|
|   1330831|    151639|                   0.0|                   34|        -2156.0|           -2156.0|      1716.525|   1716.525|
|   2085231|    193053|                   2.0|                    1|          -63.0|             -63.0|       25425.0|    25425.0|
|   2452527|    199697|                   1.0|                    3|        -2418.0|           -2426.0|      24350.13|   24350.13|
|   2714724|    167756|                   1.0|                    2|        -1383.0

### Verificando quantidade de linhas e colunas no dataframe

In [6]:
# Verificando a quantidade de linhas e colunas do DataFrame.

# Quantidade de linhas.
qtt_rows = dados_instalments.count()

# Quantidade de colunas.
qtt_columns = len(dados_instalments.columns)

# Quantidade de IDs únicos.
distinct_id_instalments = spark.sql('''SELECT COUNT(DISTINCT `SK_ID_PREV`) as distinct_id_instalments FROM dados_instalments ''')
distinct_id_instalments.createOrReplaceTempView("distinct_id_instalments")

# Imprimir o resultado.
print(f'Quantidade de linhas do DataFrame: {qtt_rows}')
print(f'Quantidade de colunas do DataFrame: {qtt_columns}')
distinct_id_instalments.show()

Quantidade de linhas do DataFrame: 13605401
Quantidade de colunas do DataFrame: 8
+-----------------------+
|distinct_id_instalments|
+-----------------------+
|                 997752|
+-----------------------+



### Criando flags de janela temporal

In [8]:
df_temp_01 = spark.sql('''
  SELECT
    *,
    CASE WHEN DAYS_INSTALMENT >= -90 THEN 1 ELSE 0 END AS U3M,
    CASE WHEN DAYS_INSTALMENT >= -180 THEN 1 ELSE 0 END AS U6M,
    CASE WHEN DAYS_INSTALMENT >= -360 THEN 1 ELSE 0 END AS U12M,
    CASE WHEN DAYS_INSTALMENT >= -720 THEN 1 ELSE 0 END AS U24M,
    CASE WHEN DAYS_INSTALMENT >= -1080 THEN 1 ELSE 0 END AS U36M
  FROM dados_instalments
  ORDER BY SK_ID_PREV
''')

df_temp_01.createOrReplaceTempView('df_temp_01')
df_temp_01.show()

+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+---+---+----+----+----+
|SK_ID_PREV|SK_ID_CURR|NUM_INSTALMENT_VERSION|NUM_INSTALMENT_NUMBER|DAYS_INSTALMENT|DAYS_ENTRY_PAYMENT|AMT_INSTALMENT|AMT_PAYMENT|U3M|U6M|U12M|U24M|U36M|
+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+---+---+----+----+----+
|   1000001|    158271|                   1.0|                    1|         -268.0|            -294.0|       6404.31|    6404.31|  0|  0|   1|   1|   1|
|   1000001|    158271|                   2.0|                    2|         -238.0|            -244.0|     62039.115|  62039.115|  0|  0|   1|   1|   1|
|   1000002|    101962|                   1.0|                    3|        -1540.0|           -1559.0|        6264.0|     6264.0|  0|  0|   0|   0|   0|
|   1000002|    101962|                   1.0|                    1|        

### Criando variáveis de primeira camada

In [9]:
# Definindo as colunas para a agregação.
colunas_agregacao_total = df_temp_01.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_PREV')
colunas_agregacao_total.remove('NUM_INSTALMENT_VERSION')
colunas_agregacao_total.remove('NUM_INSTALMENT_NUMBER')

# Defindo a lista de colunas de flags.
colunas_flags = ['U3M', 'U6M', 'U12M', 'U24M', 'U36M']

# Criando uma lista vazia.
expressoes_agregacao = []

# Iterando sobre as colunas e criando as variáveis explicativas com as agregações.
for coluna in colunas_agregacao_total:
  # Verifica se a coluna atual não é uma coluna de flag.
  if not any(flag in coluna for flag in colunas_flags):
    for flag in colunas_flags:
      if 'DAYS' in coluna:
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MAX_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MIN_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
      else:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_TOT_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MED_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MAX_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MIN_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))

# Criando uma tupla com as variáveis criadas.
expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicando as expressões de agregação.
df_temp_02 = df_temp_01.groupBy('SK_ID_PREV').agg(*expressoes_agregacao).orderBy('SK_ID_PREV')


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_02.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_02.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_02.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_02.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
df_temp_02.show(5, False)

Quantidade Total de Variáveis Criadas: 60
Nomes das Variáveis Criadas: ['QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U24M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U24M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U36M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U36M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U24M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U24M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U36M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U36M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U3M

In [10]:
# Adicionando as colunas de data ao DataFrame.
df_temp_03 = df_temp_02.withColumn('PK_DATREF', date_format(current_date(), 'yyyyMMdd')) \
                       .withColumn('PK_DAT_PROC', current_date())


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_03.columns
nomes_cols_novas = nomes_cols[1:-2]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_03.columns) - 3)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_03.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_03.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
df_temp_03.show(5, False)

Quantidade Total de Variáveis Criadas: 60
Nomes das Variáveis Criadas: ['QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U24M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U24M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U36M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U36M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U24M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U24M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U36M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U36M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U3M

### Salvando a tabela sumarizada

In [11]:
df_temp_03.write.partitionBy("PK_DATREF").parquet('../database/feature_store/book_instalments')