## Lendo os dados

In [0]:
# Caminho do arquivo
file_path = "/FileStore/tables/podbank/installments_payments.csv"

df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)

df.display()

SK_ID_PREV,SK_ID_CURR,NUM_INSTALMENT_VERSION,NUM_INSTALMENT_NUMBER,DAYS_INSTALMENT,DAYS_ENTRY_PAYMENT,AMT_INSTALMENT,AMT_PAYMENT
1054186,161674,1.0,6,-1180.0,-1187.0,6948.36,6948.36
1330831,151639,0.0,34,-2156.0,-2156.0,1716.525,1716.525
2085231,193053,2.0,1,-63.0,-63.0,25425.0,25425.0
2452527,199697,1.0,3,-2418.0,-2426.0,24350.13,24350.13
2714724,167756,1.0,2,-1383.0,-1366.0,2165.04,2160.585
1137312,164489,1.0,12,-1384.0,-1417.0,5970.375,5970.375
2234264,184693,4.0,11,-349.0,-352.0,29432.295,29432.295
1818599,111420,2.0,4,-968.0,-994.0,17862.165,17862.165
2723183,112102,0.0,14,-197.0,-197.0,70.74,70.74
1413990,109741,1.0,4,-570.0,-609.0,14308.47,14308.47


## Verificando quantidade de linhas e colunas no dataframe

In [0]:
df.createOrReplaceTempView("dados_instalments")
# Verificando a quantidade de linhas e colunas do DataFrame.

# Quantidade de linhas.
qtt_rows = df.count()

# Quantidade de colunas.
qtt_columns = len(df.columns)

# Quantidade de IDs únicos.
distinct_id_instalments = spark.sql('''SELECT COUNT(DISTINCT `SK_ID_PREV`) as distinct_id_instalments FROM dados_instalments ''')
distinct_id_instalments.createOrReplaceTempView("distinct_id_instalments")

# Imprimir o resultado.
print(f'Quantidade de linhas do DataFrame: {qtt_rows}')
print(f'Quantidade de colunas do DataFrame: {qtt_columns}')
distinct_id_instalments.show()

Quantidade de linhas do DataFrame: 13605401
Quantidade de colunas do DataFrame: 8
+-----------------------+
|distinct_id_instalments|
+-----------------------+
|                 997752|
+-----------------------+



## Criando flags de janela temporal

In [0]:
df_temp_01 = spark.sql('''
  SELECT
    *,
    CASE WHEN DAYS_INSTALMENT >= -90 THEN 1 ELSE 0 END AS U3M,
    CASE WHEN DAYS_INSTALMENT >= -180 THEN 1 ELSE 0 END AS U6M,
    CASE WHEN DAYS_INSTALMENT >= -360 THEN 1 ELSE 0 END AS U12M
  FROM dados_instalments
  ORDER BY SK_ID_PREV
''')

df_temp_01.createOrReplaceTempView('df_temp_01')
display(df_temp_01.limit(5))

SK_ID_PREV,SK_ID_CURR,NUM_INSTALMENT_VERSION,NUM_INSTALMENT_NUMBER,DAYS_INSTALMENT,DAYS_ENTRY_PAYMENT,AMT_INSTALMENT,AMT_PAYMENT,U3M,U6M,U12M
1000001,158271,2.0,2,-238.0,-244.0,62039.115,62039.115,0,0,1
1000001,158271,1.0,1,-268.0,-294.0,6404.31,6404.31,0,0,1
1000002,101962,1.0,3,-1540.0,-1559.0,6264.0,6264.0,0,0,0
1000002,101962,2.0,4,-1510.0,-1554.0,18443.565,18443.565,0,0,0
1000002,101962,1.0,1,-1600.0,-1611.0,6264.0,6264.0,0,0,0


## Criando variáveis de primeira camada

In [0]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when, countDistinct, count, date_format, current_date
# Definindo as colunas para a agregação.
colunas_agregacao_total = df_temp_01.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_PREV')
colunas_agregacao_total.remove('NUM_INSTALMENT_VERSION')
colunas_agregacao_total.remove('NUM_INSTALMENT_NUMBER')

# Defindo a lista de colunas de flags.
colunas_flags = ['U3M', 'U6M', 'U12M']

# Criando uma lista vazia.
expressoes_agregacao = []

# Iterando sobre as colunas e criando as variáveis explicativas com as agregações.
for coluna in colunas_agregacao_total:
  # Verifica se a coluna atual não é uma coluna de flag.
  if not any(flag in coluna for flag in colunas_flags):
    for flag in colunas_flags:
      if 'DAYS' in coluna:
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MAX_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'QT_MIN_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
      else:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_TOT_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MED_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MAX_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f'VL_MIN_{coluna.upper()}_{flag.upper()}_INSTALLMENTS'))

# Criando uma tupla com as variáveis criadas.
expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicando as expressões de agregação.
df_temp_02 = df_temp_01.groupBy('SK_ID_PREV').agg(*expressoes_agregacao).orderBy('SK_ID_PREV')


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_02.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_02.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_02.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_02.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
display(df_temp_02.limit(5))

Quantidade Total de Variáveis Criadas: 36
Nomes das Variáveis Criadas: ['QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MED_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MAX_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MIN_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MED_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MAX_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MIN_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U12M_INSTALLMENTS', 'VL_MED_AMT

SK_ID_PREV,QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS,QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS,QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U3M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U6M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U12M_INSTALLMENTS
1000001,,,,,-238.0,-268.0,,,,,-244.0,-294.0,,,,,,,,,68443.43,34221.71,62039.12,6404.31,,,,,,,,,68443.43,34221.71,62039.12,6404.31
1000002,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1000003,-34.0,-64.0,-34.0,-94.0,-34.0,-94.0,-49.0,-81.0,-49.0,-108.0,-49.0,-108.0,9902.7,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,9902.7,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35
1000004,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1000005,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:


# Adicionando as colunas de data ao DataFrame.
df_temp_03 = df_temp_02.withColumn('PK_DATREF', date_format(current_date(), 'yyyyMMdd')) \
                       .withColumn('PK_DAT_PROC', current_date())


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_03.columns
nomes_cols_novas = nomes_cols[1:-2]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_03.columns) - 3)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_03.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_03.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
display(df_temp_03.limit(5))

Quantidade Total de Variáveis Criadas: 36
Nomes das Variáveis Criadas: ['QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS', 'QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MED_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MAX_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_MIN_AMT_INSTALMENT_U3M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MED_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MAX_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_MIN_AMT_INSTALMENT_U6M_INSTALLMENTS', 'VL_TOT_AMT_INSTALMENT_U12M_INSTALLMENTS', 'VL_MED_AMT

SK_ID_PREV,QT_MAX_DAYS_INSTALMENT_U3M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U3M_INSTALLMENTS,QT_MAX_DAYS_INSTALMENT_U6M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U6M_INSTALLMENTS,QT_MAX_DAYS_INSTALMENT_U12M_INSTALLMENTS,QT_MIN_DAYS_INSTALMENT_U12M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U3M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U6M_INSTALLMENTS,QT_MAX_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS,QT_MIN_DAYS_ENTRY_PAYMENT_U12M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U3M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U6M_INSTALLMENTS,VL_TOT_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MED_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MAX_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_MIN_AMT_INSTALMENT_U12M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U3M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U3M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U6M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U6M_INSTALLMENTS,VL_TOT_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MED_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MAX_AMT_PAYMENT_U12M_INSTALLMENTS,VL_MIN_AMT_PAYMENT_U12M_INSTALLMENTS,PK_DATREF,PK_DAT_PROC
1000001,,,,,-238.0,-268.0,,,,,-244.0,-294.0,,,,,,,,,68443.43,34221.71,62039.12,6404.31,,,,,,,,,68443.43,34221.71,62039.12,6404.31,20241210,2024-12-10
1000002,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,20241210,2024-12-10
1000003,-34.0,-64.0,-34.0,-94.0,-34.0,-94.0,-49.0,-81.0,-49.0,-108.0,-49.0,-108.0,9902.7,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,9902.7,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,14854.05,4951.35,4951.35,4951.35,20241210,2024-12-10
1000004,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,20241210,2024-12-10
1000005,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,20241210,2024-12-10


## Salvando os dados em parquet

In [0]:
# Reparticionar para um único arquivo
df_temp_04 = df_temp_03.repartition(1)


df_temp_04.write.mode("overwrite").partitionBy("PK_DATREF").parquet('/FileStore/tables/podbank/instalments_payments')