In [0]:
import pandas as pd
from pyspark.sql import SparkSession

# Criar sessão Spark (opcional se estiver no notebook Databricks)
spark = SparkSession.builder.appName("fs_bureau_balance").getOrCreate()

In [0]:
fs_bureau_balance = spark.table("podbank.raw.bureau_balance")
fs_bureau_balance.display()

In [0]:
num_rows = fs_bureau_balance.count()
num_columns = len(fs_bureau_balance.columns)

print(f'Quantidade de linhas: {num_rows}')
print(f'Quantidade de variaveis (colunas): {num_columns}')

In [0]:
df_temp_01 = spark.sql("""
SELECT
    *,
    CASE WHEN MONTHS_BALANCE >= -3 THEN 1 ELSE 0 END AS U3M,
    CASE WHEN MONTHS_BALANCE >= -6 THEN 1 ELSE 0 END AS U6M,
    CASE WHEN MONTHS_BALANCE >= -9 THEN 1 ELSE 0 END AS U9M,
    CASE WHEN MONTHS_BALANCE >= -12 THEN 1 ELSE 0 END AS U12M
FROM podbank.raw.bureau_balance
ORDER BY SK_ID_BUREAU
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
display(df_temp_01)

In [0]:
df_temp_02 = spark.sql("""
SELECT
    SK_ID_BUREAU,
    MONTHS_BALANCE,
    STATUS,
    CASE
        WHEN STATUS = "C" THEN 1
        ELSE 0
    END AS STATUS_C,
    CASE
        WHEN STATUS = "0" THEN 1
        ELSE 0
    END AS STATUS_0,
    CASE
        WHEN STATUS = "X" THEN 1
        ELSE 0
    END AS STATUS_X,
    CASE
        WHEN STATUS = "1" THEN 1
        ELSE 0
    END AS STATUS_1,
    U3M,
    U6M,
    U9M,
    U12M
FROM df_temp_01
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_02.createOrReplaceTempView("df_temp_01")
df_temp_02.display(5)

In [0]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when, count, lit

colunas_agregacao_total = ['STATUS_C','STATUS_0','STATUS_X','STATUS_1']

colunas_flags = ['U3M','U6M', 'U9M', 'U12M']
expressoes_agregacao = []

for flag in colunas_flags:
  for coluna in colunas_agregacao_total:
    expressoes_agregacao.append(round(count(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_TT_{coluna.upper()}_{flag.upper()}_BUREAU_BL"))
    expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna)).otherwise(lit(None))), 2).alias(f"QT_MED_{coluna.upper()}_{flag.upper()}_BUREAU_BL"))
    expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{flag.upper()}_BUREAU_BL"))
    expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{flag.upper()}_BUREAU_BL"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_03 = df_temp_02.groupBy("SK_ID_BUREAU").agg(*expressoes_agregacao).orderBy("SK_ID_BUREAU")

# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_03.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_03.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_03.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_03.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
display(df_temp_03)

In [0]:
from pyspark.sql.functions import current_date, date_format

# Adicionando as colunas de data ao DataFrame.
df_temp_04 = df_temp_03.withColumn('PK_DATREF', date_format(current_date(), 'yyyyMMdd')) \
                       .withColumn('PK_DAT_PROC', current_date())

# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_04.columns
nomes_cols_novas = nomes_cols[1:-2]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_04.columns) - 3)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_04.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_04.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

# Mostrando o novo DataFrame com as variáveis criadas.
display(df_temp_04)

In [0]:
        df_temp_04.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", True) \
            .saveAsTable("podbank.feature_store.bureau_balance")