#Book_Pagamentos

##configurações iniciais

In [1]:
!pip install spark

Collecting spark
  Downloading spark-0.3.2-py3-none-any.whl.metadata (1.3 kB)
Collecting json-repair>=0.52.4 (from spark)
  Downloading json_repair-0.56.0-py3-none-any.whl.metadata (14 kB)
Downloading spark-0.3.2-py3-none-any.whl (351 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m351.7/351.7 kB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading json_repair-0.56.0-py3-none-any.whl (38 kB)
Installing collected packages: json-repair, spark
Successfully installed json-repair-0.56.0 spark-0.3.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, col, count, round, cast, desc,to_timestamp, concat_ws, substring, lit, lpad, countDistinct, row_number, isnan, when, corr, first, max as Fmax, min as Fmin, sum as Fsum, to_timestamp, to_date, concat
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import builtins

In [3]:
spark = (
    SparkSession.builder
    .appName("book_pagamentos")

    # =========================
    # PARALELISMO
    # =========================
    .config("spark.sql.shuffle.partitions", "120")
    .config("spark.default.parallelism", "16")

    # =========================
    # MEMÓRIA (RAM 50GB)
    # =========================
    .config("spark.driver.memory", "32g")
    .config("spark.executor.memory", "32g")
    .config("spark.memory.fraction", "0.85")
    .config("spark.memory.storageFraction", "0.25")

    # =========================
    # EXECUÇÃO
    # =========================
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.adaptive.skewJoin.enabled", "true")

    # =========================
    # JOINS
    # =========================
    .config("spark.sql.autoBroadcastJoinThreshold", "100m")

    # =========================
    # SERIALIZAÇÃO
    # =========================
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryoserializer.buffer.max", "512m")

    # =========================
    # CACHE / COLUMNAR
    # =========================
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true")
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "20000")

    # =========================
    # GC
    # =========================
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC")

    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [4]:
from pyspark.sql import functions as F

In [5]:
from pyspark.sql import types as T

In [6]:
from pyspark.sql.window import Window

##Importação da base de público válida

In [7]:
# Instalação e configuração de variaveis ambiente para utilizar Google Drive
# Se utilizar o Colab ajustar para True para instalação dos pre-requisitos
colab = True

if colab==True:
    from google.colab import drive
    drive.mount('/content/drive')

Mounted at /content/drive


antes de iniciar o book_pagamentos, traremos a base score full com todos CPFs que iremos considerar, para cruzar com os pagamentos e reduzir registros que não interessam para o público alvo (base pagamentos tem 21,8 MM de registros e cai para 15,8 MM após filtro).

In [8]:
df_base_score_full = spark.read.parquet("/content/drive/MyDrive/hackathon/projeto_hackathon/01_INGESTAO/base_score/stg/base_score_full/SAFRA=202502",
                                        "/content/drive/MyDrive/hackathon/projeto_hackathon/01_INGESTAO/base_score/stg/base_score_full/SAFRA=202503")

###Exploração | tratamentos | **filtros**

In [9]:
#renomear safra para safra_buerau
df_base_score_full = df_base_score_full.withColumnRenamed("SAFRA", "SAFRA_BUREAU")

In [10]:
df_base_score_full.groupBy("NUM_CPF").count().filter("count > 1").count()

19332

In [11]:
print(f"Linhas totais: {df_base_score_full.count():,}")
print(f"CPFs únicos: {df_base_score_full.select('NUM_CPF').distinct().count():,}")

Linhas totais: 1,235,862
CPFs únicos: 1,216,530


In [12]:
#transformação formato variáveis numéricas
df_base_score_full = (
df_base_score_full    .withColumn("NUM_CPF", col("NUM_CPF").cast("string"))
    .withColumn("SCORE_01", col("SCORE_01").cast("double"))
    .withColumn("SCORE_02", col("SCORE_02").cast("double"))
    .withColumn("FPD", col("FPD").cast("int"))
)

In [13]:
df_cpfs_validos = df_base_score_full.select("NUM_CPF").distinct()

In [14]:
#quantidade de CPFs distintos no bureau
df_cpfs_validos.select("NUM_CPF").distinct().count()

1216530

###IMPORTAÇÃO BASE PAGAMENTOS

In [15]:
path_rec = "/content/drive/MyDrive/hackathon/book_pagamento/dados_pagamento"
df_pagamentos = spark.read.parquet(path_rec)

In [16]:
print("✅ Linhas:", df_pagamentos.count())
print("✅ Colunas:", len(df_pagamentos.columns))
df_pagamentos.printSchema()
df_pagamentos.show(5)

✅ Linhas: 21829628
✅ Colunas: 73
root
 |-- NUM_CPF: string (nullable = true)
 |-- DAT_STATUS_FATURA: string (nullable = true)
 |-- CONTRATO: string (nullable = true)
 |-- SEQ_FATURA: string (nullable = true)
 |-- NUM_SUB_SEQ_FATURA: string (nullable = true)
 |-- NUM_CREDITO_SEQ: string (nullable = true)
 |-- DW_TIPO_FATURA: string (nullable = true)
 |-- IND_STATUS_FATURA: string (nullable = true)
 |-- DW_NUM_CLIENTE: string (nullable = true)
 |-- DW_AREA: string (nullable = true)
 |-- DW_UN_NEGOCIO: string (nullable = true)
 |-- DW_FORMA_PAGAMENTO: string (nullable = true)
 |-- VAL_PAGAMENTO_FATURA: string (nullable = true)
 |-- DAT_CRIACAO_DW: string (nullable = true)
 |-- DW_BANCO: string (nullable = true)
 |-- DW_TIPO_PAGAMENTO: string (nullable = true)
 |-- NUM_BANCO_PAGAMENTO: string (nullable = true)
 |-- NUM_AGENCIA_PAGAMENTO: string (nullable = true)
 |-- NUM_CC_PAGAMENTO: string (nullable = true)
 |-- DW_MOTIVO_ESTORNO: string (nullable = true)
 |-- VAL_DESCONTO_ITEM: string (

In [17]:
df_pagamentos.select("NUM_CPF").distinct().count()

1930502

###JOIN BASES

In [18]:
#join para usar somente os CPFs válidos
df_pagamentos_alvo = df_pagamentos.join(df_cpfs_validos, "NUM_CPF", "inner")

In [19]:
print("✅ Linhas:", df_pagamentos_alvo.count())
print("✅ Colunas:", len(df_pagamentos_alvo.columns))
df_pagamentos_alvo.printSchema()
df_pagamentos_alvo.show(5)

✅ Linhas: 6431083
✅ Colunas: 73
root
 |-- NUM_CPF: string (nullable = true)
 |-- DAT_STATUS_FATURA: string (nullable = true)
 |-- CONTRATO: string (nullable = true)
 |-- SEQ_FATURA: string (nullable = true)
 |-- NUM_SUB_SEQ_FATURA: string (nullable = true)
 |-- NUM_CREDITO_SEQ: string (nullable = true)
 |-- DW_TIPO_FATURA: string (nullable = true)
 |-- IND_STATUS_FATURA: string (nullable = true)
 |-- DW_NUM_CLIENTE: string (nullable = true)
 |-- DW_AREA: string (nullable = true)
 |-- DW_UN_NEGOCIO: string (nullable = true)
 |-- DW_FORMA_PAGAMENTO: string (nullable = true)
 |-- VAL_PAGAMENTO_FATURA: string (nullable = true)
 |-- DAT_CRIACAO_DW: string (nullable = true)
 |-- DW_BANCO: string (nullable = true)
 |-- DW_TIPO_PAGAMENTO: string (nullable = true)
 |-- NUM_BANCO_PAGAMENTO: string (nullable = true)
 |-- NUM_AGENCIA_PAGAMENTO: string (nullable = true)
 |-- NUM_CC_PAGAMENTO: string (nullable = true)
 |-- DW_MOTIVO_ESTORNO: string (nullable = true)
 |-- VAL_DESCONTO_ITEM: string (n

In [20]:
#quantidade de CPFs correspondidos entre bureau e pagamentos
df_pagamentos_alvo.select("NUM_CPF").distinct().count()

420803

In [21]:
#marcação da terminação nas colunas da base da base origem(_PAG)
cols = [F.col("NUM_CPF")] + [
    F.col(c).alias(f"{c}_PAG") for c in df_pagamentos_alvo.columns if c != "NUM_CPF"
]
df_pagamentos_alvo = df_pagamentos_alvo.select(cols)

In [28]:
df_00=df_pagamentos_alvo

In [29]:
def padronizar_df_pagamento(df):
    """
    Padroniza colunas do book de pagamento:
    - Mantém NUM_CPF sem sufixo
    - Aplica '_PAG' às demais
    - Converte valores e datas com tolerância a erros
    """
    # Mantém CPF e renomeia demais
    '''cols = [F.col("NUM_CPF")] + [
        F.col(c).alias(f"{c}_PAG") for c in df.columns if c != "NUM_CPF"
    ]
    df = df.select(cols)'''

    # Colunas numéricas: usar try_cast seguro
    num_cols = [c for c in df.columns if (("VAL_" in c) or ("NUM_" in c)) and c != "NUM_CPF"]

    for c in num_cols:
        df = df.withColumn(c, F.regexp_replace(F.col(c), ",", "."))
        df = df.withColumn(
            c,
            F.when(
                F.col(c).rlike("^[0-9.]+$"),  # mantém apenas se é número
                F.col(c).cast("double")
            ).otherwise(None)
        )

    # Colunas de data: tolerar formatos diferentes
    date_cols = [c for c in df.columns if "DAT_" in c]
    for c in date_cols:
        df = df.withColumn(
            c,
            F.coalesce(
                F.try_to_timestamp(F.col(c), F.lit("ddMMMyyyy:HH:mm:ss")),
                F.try_to_timestamp(F.col(c), F.lit("yyyy-MM-dd HH:mm:ss")),
                F.try_to_timestamp(F.col(c), F.lit("dd/MM/yyyy HH:mm:ss")),
                F.try_to_timestamp(F.col(c), F.lit("yyyyMMddHHmmss"))
            )
        )

    return df

# Reexecutar com segurança
df_00 = padronizar_df_pagamento(df_pagamentos_alvo)

print("✅ Padronização concluída com segurança.")
print("✅ Total de colunas:", len(df_00.columns))
df_00.printSchema()


✅ Padronização concluída com segurança.
✅ Total de colunas: 73
root
 |-- NUM_CPF: string (nullable = true)
 |-- DAT_STATUS_FATURA_PAG: timestamp (nullable = true)
 |-- CONTRATO_PAG: string (nullable = true)
 |-- SEQ_FATURA_PAG: string (nullable = true)
 |-- NUM_SUB_SEQ_FATURA_PAG: double (nullable = true)
 |-- NUM_CREDITO_SEQ_PAG: double (nullable = true)
 |-- DW_TIPO_FATURA_PAG: string (nullable = true)
 |-- IND_STATUS_FATURA_PAG: string (nullable = true)
 |-- DW_NUM_CLIENTE_PAG: double (nullable = true)
 |-- DW_AREA_PAG: string (nullable = true)
 |-- DW_UN_NEGOCIO_PAG: string (nullable = true)
 |-- DW_FORMA_PAGAMENTO_PAG: string (nullable = true)
 |-- VAL_PAGAMENTO_FATURA_PAG: double (nullable = true)
 |-- DAT_CRIACAO_DW_PAG: timestamp (nullable = true)
 |-- DW_BANCO_PAG: string (nullable = true)
 |-- DW_TIPO_PAGAMENTO_PAG: string (nullable = true)
 |-- NUM_BANCO_PAGAMENTO_PAG: double (nullable = true)
 |-- NUM_AGENCIA_PAGAMENTO_PAG: double (nullable = true)
 |-- NUM_CC_PAGAMENTO_PAG

In [30]:
#transformação da variável DAT_CRIACAO_DW_PAG em safra de PAGAMENTO

df_00 = df_00.withColumn("SAFRA_PAGAMENTO", F.date_format(F.col("DAT_CRIACAO_DW_PAG"), "yyyyMM")) #DAT_STATUS_PAGAMENTO_PAG ou DAT_BAIXA_ATIVIDADE_PAG? DAT_CRIACAO_DW
df_00 = df_00.withColumn("SAFRA_VENCIMENTO", F.date_format(F.col("DAT_VENCIMENTO_CREDITO_PAG"), "yyyyMM"))

In [31]:
df_00.groupBy("SAFRA_PAGAMENTO").count().orderBy("SAFRA_PAGAMENTO").show(50, truncate=False)

+---------------+------+
|SAFRA_PAGAMENTO|count |
+---------------+------+
|202310         |278009|
|202311         |282139|
|202312         |332382|
|202401         |302086|
|202402         |311144|
|202403         |359655|
|202404         |332303|
|202405         |355226|
|202406         |351984|
|202407         |357696|
|202408         |371959|
|202409         |367019|
|202410         |368155|
|202411         |363113|
|202412         |390721|
|202501         |358954|
|202502         |369004|
|202503         |546511|
|202504         |33023 |
+---------------+------+



In [26]:
# df_00 = df_00.filter(~F.col("SAFRA_PAGAMENTO").isin(["202502", "202503","202504"]))
# print(f"Linhas totais: {df_00.count():,}")
# print(f"CPFs únicos: {df_00.select('NUM_CPF').distinct().count():,}")

Linhas totais: 5,482,545


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

##Verificação dos nulos

In [32]:
df_00.printSchema()
df_00.show(5)

root
 |-- NUM_CPF: string (nullable = true)
 |-- DAT_STATUS_FATURA_PAG: timestamp (nullable = true)
 |-- CONTRATO_PAG: string (nullable = true)
 |-- SEQ_FATURA_PAG: string (nullable = true)
 |-- NUM_SUB_SEQ_FATURA_PAG: double (nullable = true)
 |-- NUM_CREDITO_SEQ_PAG: double (nullable = true)
 |-- DW_TIPO_FATURA_PAG: string (nullable = true)
 |-- IND_STATUS_FATURA_PAG: string (nullable = true)
 |-- DW_NUM_CLIENTE_PAG: double (nullable = true)
 |-- DW_AREA_PAG: string (nullable = true)
 |-- DW_UN_NEGOCIO_PAG: string (nullable = true)
 |-- DW_FORMA_PAGAMENTO_PAG: string (nullable = true)
 |-- VAL_PAGAMENTO_FATURA_PAG: double (nullable = true)
 |-- DAT_CRIACAO_DW_PAG: timestamp (nullable = true)
 |-- DW_BANCO_PAG: string (nullable = true)
 |-- DW_TIPO_PAGAMENTO_PAG: string (nullable = true)
 |-- NUM_BANCO_PAGAMENTO_PAG: double (nullable = true)
 |-- NUM_AGENCIA_PAGAMENTO_PAG: double (nullable = true)
 |-- NUM_CC_PAGAMENTO_PAG: double (nullable = true)
 |-- DW_MOTIVO_ESTORNO_PAG: string (

In [None]:
df_00.select(F.countDistinct("DSC_NOME_BANCO_PAGAMENTO_PAG")).show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
df_00.groupBy("DSC_NOME_BANCO_PAGAMENTO_PAG").agg(F.count("NUM_CPF").alias("count_cpf")).orderBy(F.col("count_cpf").desc()).show(truncate=False)

In [33]:
cols_nao_comportamentais = [
    # Identificadores técnicos
    "CONTRATO_PAG", "SEQ_FATURA_PAG", "NUM_SUB_SEQ_FATURA_PAG",
    "NUM_CREDITO_SEQ_PAG", "DW_NUM_CLIENTE_PAG",
    "SEQ_ENTIDADE_ATIVIDADE_PAG", "SEQ_ENTIDADE_PAGAMENTO_PAG",
    "SEQ_ARQUIVO_PAGAMENTO_PAG", "SEQ_PAGAMENTO_CREDITO_PAG",
    "SEQ_FATURA_CREDITO_PAG", "SEQ_ENTIDADE_CREDITO_PAG",

    # Códigos de sistemas internos
    "COD_ORIGEM_NETUNO_PAG", "COD_CONTA_ATIVIDADE_PAG", "COD_LOGIN_OPERADOR_ATIVIDADE_PAG",
    "COD_ATIVIDADE_PAG", "COD_RAZAO_ATIVIDADE_PAG", "COD_FUNDO_ATIVIDADE_PAG",
    "COD_BANCO_ATIVIDADE_PAG", "COD_AGENCIA_ATIVIDADE_PAG", "COD_LOGIN_PAGAMENTO_PAG",
    "COD_ARQUIVO_PAGAMENTO_PAG", "COD_NETUNO_PAGAMENTO_PAG", "COD_LOGIN_CREDITO_PAG",
    "COD_ALOCACAO_CREDITO_PAG", "COD_DESALOCACAO_CREDITO_PAG", "DW_MOTIVO_ESTORNO_PAG",

    # Descrições textuais administrativas
    "DSC_PAGAMENTO_PAG", "DSC_NOME_BANCO_PAGAMENTO_PAG",

    # Sequenciais, banco, agência, etc.
    "NUM_BANCO_PAGAMENTO_PAG", "NUM_AGENCIA_PAGAMENTO_PAG", "NUM_CC_PAGAMENTO_PAG",
    "NUM_CONTA_ATIVIDADE_PAG", "NUM_PARCELA_PAGAMENTO_PAG", "NUM_AGRUPADOR_PAGAMENTO_PAG"
]
df_00 = df_00.drop(*cols_nao_comportamentais)

In [34]:
cols_com_mais_80perc_nulos = [
    # colunas
    "DAT_ATUALIZACAO_CREDITO_PAG","DAT_ATUALIZACAO_ATIVIDADE_PAG","DAT_ATUALIZACAO_PAGAMENTO_PAG"

]
df_01 = df_00.drop(*cols_com_mais_80perc_nulos)

In [35]:
df_01.printSchema()
df_01.show(5)

root
 |-- NUM_CPF: string (nullable = true)
 |-- DAT_STATUS_FATURA_PAG: timestamp (nullable = true)
 |-- DW_TIPO_FATURA_PAG: string (nullable = true)
 |-- IND_STATUS_FATURA_PAG: string (nullable = true)
 |-- DW_AREA_PAG: string (nullable = true)
 |-- DW_UN_NEGOCIO_PAG: string (nullable = true)
 |-- DW_FORMA_PAGAMENTO_PAG: string (nullable = true)
 |-- VAL_PAGAMENTO_FATURA_PAG: double (nullable = true)
 |-- DAT_CRIACAO_DW_PAG: timestamp (nullable = true)
 |-- DW_BANCO_PAG: string (nullable = true)
 |-- DW_TIPO_PAGAMENTO_PAG: string (nullable = true)
 |-- VAL_DESCONTO_ITEM_PAG: double (nullable = true)
 |-- VAL_PAGAMENTO_ITEM_PAG: double (nullable = true)
 |-- VAL_JUROS_MULTAS_ITEM_PAG: double (nullable = true)
 |-- VAL_MULTA_EQUIP_ITEM_PAG: double (nullable = true)
 |-- VAL_MULTA_EQUIP_TOTAL_PAG: double (nullable = true)
 |-- VAL_MULTA_FID_ITEM_PAG: double (nullable = true)
 |-- DAT_CRIACAO_ATIVIDADE_PAG: timestamp (nullable = true)
 |-- DAT_BAIXA_ATIVIDADE_PAG: timestamp (nullable = tr

In [36]:
# Filtrar e ordenar df_01
df_filtered_ordered = df_01.filter(F.col("NUM_CPF") == "ZXY9X7ZYXNU") \
                         .orderBy(F.col("DAT_STATUS_PAGAMENTO_PAG").asc())

# Exibir os resultados
df_filtered_ordered.show(truncate=False)

+-------+---------------------+------------------+---------------------+-----------+-----------------+----------------------+------------------------+------------------+------------+---------------------+---------------------+----------------------+-------------------------+------------------------+-------------------------+----------------------+-------------------------+-----------------------+-----------------------+--------------------------+-------------------------+-----------------------+--------------------------+------------------------+----------------------+-----------------------+------------------------+------------------------+------------------------+-----------------------+-------------------------+--------------------+-------------------+-------------------------+--------------------------+---------------+----------------+
|NUM_CPF|DAT_STATUS_FATURA_PAG|DW_TIPO_FATURA_PAG|IND_STATUS_FATURA_PAG|DW_AREA_PAG|DW_UN_NEGOCIO_PAG|DW_FORMA_PAGAMENTO_PAG|VAL_PAGAMENTO_FATURA_PA

In [37]:
len(df_01.columns)

38

In [38]:
from pyspark.sql import functions as F

df = df_01

total_registros = df.count()

diagnostico = (
    df
    .select([
        F.lit(c).alias("coluna"),
        F.count(F.lit(1)).alias("qtd_registros"),
        F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias("qtd_nulos"),
        F.countDistinct(F.col(c)).alias("qtd_distintos")
    ])
    for c in df.columns
)

# União de todas as colunas
df_diagnostico = None
for d in diagnostico:
    if df_diagnostico is None:
        df_diagnostico = d
    else:
        df_diagnostico = df_diagnostico.unionByName(d)

# Métricas percentuais
df_diagnostico = (
    df_diagnostico
    .withColumn(
        "pct_nulos",
        F.round(F.col("qtd_nulos") / F.col("qtd_registros") * 100, 2)
    )
    .withColumn(
        "pct_distintos",
        F.round(F.col("qtd_distintos") / F.col("qtd_registros") * 100, 2)
    )
    .orderBy(F.desc("pct_nulos"))
)

df_diagnostico.show(truncate=False)

+--------------------------+-------------+---------+-------------+---------+-------------+
|coluna                    |qtd_registros|qtd_nulos|qtd_distintos|pct_nulos|pct_distintos|
+--------------------------+-------------+---------+-------------+---------+-------------+
|COD_METODO_PAGAMENTO_PAG  |6431083      |4875103  |6            |75.81    |0.0          |
|IND_STATUS_PAGAMENTO_PAG  |6431083      |2956631  |4            |45.97    |0.0          |
|NUM_FATURA_PAGAMENTO_PAG  |6431083      |2396406  |3140602      |37.26    |48.83        |
|VAL_BAIXA_ATIVIDADE_PAG   |6431083      |2082845  |49252        |32.39    |0.77         |
|DAT_VENCIMENTO_CREDITO_PAG|6431083      |2078882  |1056         |32.33    |0.02         |
|DAT_ATIVIDADE_CREDITO_PAG |6431083      |2078882  |420          |32.33    |0.01         |
|SAFRA_VENCIMENTO          |6431083      |2078882  |95           |32.33    |0.0          |
|COD_TIPO_FATURA_PAG       |6431083      |2078882  |22           |32.33    |0.0          |

##Início criação variáveis

###variáveis de data

1️⃣ Definição da data de pagamento de referência

In [39]:
from pyspark.sql import functions as F

df_base = df_01.withColumn(
    "DATA_PAGAMENTO_REF",
    F.col("DAT_CRIACAO_DW_PAG")
)


2️⃣ Cálculo de atraso (pontualidade)

In [40]:
df_base = df_base.withColumn(
    "DIAS_ATRASO",
    F.datediff(
        F.col("DATA_PAGAMENTO_REF"),
        F.col("DAT_VENCIMENTO_CREDITO_PAG")
    )
)


Interpretação

< 0 → pagamento antecipado

= 0 → no vencimento

'>' 0 → pagamento em atraso

3️⃣ Agregações de atraso por CPF + Safra

In [41]:
agg_atraso = [
    F.avg("DIAS_ATRASO").alias("AVG_DIAS_ATRASO_SAFRA"),
    F.max("DIAS_ATRASO").alias("MAX_DIAS_ATRASO_SAFRA"),
    F.sum(
        F.when(F.col("DIAS_ATRASO") > 0, 1).otherwise(0)
    ).alias("QTD_PAG_ATRASADOS_SAFRA"),
    F.max(
        F.when(F.col("DIAS_ATRASO") > 0, 1).otherwise(0)
    ).alias("FLAG_ATRASO_SAFRA")
]


4️⃣ Regularidade do pagamento dentro da safra

In [42]:
agg_regularidade = [
    F.min("DATA_PAGAMENTO_REF").alias("DT_PRIMEIRO_PAG_SAFRA"),
    F.max("DATA_PAGAMENTO_REF").alias("DT_ULTIMO_PAG_SAFRA"),
    F.datediff(
        F.max("DATA_PAGAMENTO_REF"),
        F.min("DATA_PAGAMENTO_REF")
    ).alias("SPAN_DIAS_PAG_SAFRA")
]


📌 Leitura importante

SPAN = 0 → pagamento único

SPAN > 0 → parcelamento, múltiplos eventos ou regularização

5️⃣ Ciclo operacional (usando status como marco)

In [43]:
agg_ciclo = [
    F.avg(
        F.datediff(
            F.col("DAT_STATUS_PAGAMENTO_PAG"),
            F.col("DAT_CRIACAO_PAGAMENTO_PAG")
        )
    ).alias("AVG_DIAS_CRIACAO_STATUS_SAFRA"),

    F.avg(
        F.datediff(
            F.col("DAT_DEPOSITO_ATIVIDADE_PAG"),
            F.col("DAT_STATUS_PAGAMENTO_PAG")
        )
    ).alias("AVG_DIAS_STATUS_DEPOSITO_SAFRA")
]


6️⃣ Dataset final — Book de Datas (CPF + Safra)

In [44]:
df_book_datas = (
    df_base
    .groupBy("NUM_CPF", "SAFRA_PAGAMENTO")
    .agg(
        *agg_atraso,
        *agg_regularidade,
        *agg_ciclo
    )
)

df_book_datas.show(truncate=False)


+-----------+---------------+---------------------+---------------------+-----------------------+-----------------+---------------------+-------------------+-------------------+-----------------------------+------------------------------+
|NUM_CPF    |SAFRA_PAGAMENTO|AVG_DIAS_ATRASO_SAFRA|MAX_DIAS_ATRASO_SAFRA|QTD_PAG_ATRASADOS_SAFRA|FLAG_ATRASO_SAFRA|DT_PRIMEIRO_PAG_SAFRA|DT_ULTIMO_PAG_SAFRA|SPAN_DIAS_PAG_SAFRA|AVG_DIAS_CRIACAO_STATUS_SAFRA|AVG_DIAS_STATUS_DEPOSITO_SAFRA|
+-----------+---------------+---------------------+---------------------+-----------------------+-----------------+---------------------+-------------------+-------------------+-----------------------------+------------------------------+
|UN9XYNWT8NT|202410         |4.0                  |4                    |1                      |1                |2024-10-12 14:15:02  |2024-10-12 14:15:02|0                  |0.0                          |-1.0                          |
|Z8TXT9UX8YX|202409         |1.0            

7️⃣ Variáveis criadas (para documentar no Book)

| Variável                         | Significado                              |
| -------------------------------- | ---------------------------------------- |
| `AVG_DIAS_ATRASO_SAFRA`          | Atraso médio dos pagamentos na safra     |
| `MAX_DIAS_ATRASO_SAFRA`          | Maior atraso observado na safra          |
| `QTD_PAG_ATRASADOS_SAFRA`        | Quantidade de pagamentos em atraso       |
| `FLAG_ATRASO_SAFRA`              | Indica se houve atraso na safra          |
| `DT_PRIMEIRO_PAG_SAFRA`          | Primeiro pagamento da safra              |
| `DT_ULTIMO_PAG_SAFRA`            | Último pagamento da safra                |
| `SPAN_DIAS_PAG_SAFRA`            | Intervalo entre pagamentos na safra      |
| `AVG_DIAS_CRIACAO_STATUS_SAFRA`  | Tempo médio entre criação e status final |
| `AVG_DIAS_STATUS_DEPOSITO_SAFRA` | Tempo médio entre status e depósito      |


Pensando em enriquecer mais ainda o estudo criaremos também:

*   criar lags temporais por CPF (safra t vs t-1)
*   classificar clientes por perfil de pontualidade
*   gerar flags de risco temporal


Criação de LAGs por CPF (comportamento histórico) - Comparar a safra atual vs safra anterior do mesmo CPF.

In [45]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_cpf = (
    Window
    .partitionBy("NUM_CPF")
    .orderBy("SAFRA_PAGAMENTO")
)


Lags principais

In [46]:
df_lag = (
    df_book_datas
    .withColumn("LAG_AVG_DIAS_ATRASO",
                F.lag("AVG_DIAS_ATRASO_SAFRA", 1).over(window_cpf))
    .withColumn("LAG_MAX_DIAS_ATRASO",
                F.lag("MAX_DIAS_ATRASO_SAFRA", 1).over(window_cpf))
    .withColumn("LAG_FLAG_ATRASO",
                F.lag("FLAG_ATRASO_SAFRA", 1).over(window_cpf))
    .withColumn("LAG_QTD_PAG_ATRASADOS",
                F.lag("QTD_PAG_ATRASADOS_SAFRA", 1).over(window_cpf))
)


📌 Book value
Essas variáveis respondem:

“O cliente está melhorando ou piorando?”

“O atraso é recorrente ou pontual?”

2️⃣ Evolução temporal (tendência de comportamento) - Diferença entre safra atual e anterior

In [47]:
df_lag = (
    df_lag
    .withColumn(
        "DELTA_AVG_DIAS_ATRASO",
        F.col("AVG_DIAS_ATRASO_SAFRA") - F.col("LAG_AVG_DIAS_ATRASO")
    )
    .withColumn(
        "DELTA_QTD_ATRASOS",
        F.col("QTD_PAG_ATRASADOS_SAFRA") - F.col("LAG_QTD_PAG_ATRASADOS")
    )
)


Interpretação

DELTA > 0 → piora do comportamento

DELTA < 0 → melhora

NULL → primeira safra do CPF

3️⃣ Classificação de perfil de pontualidade

In [48]:
df_lag = (
    df_lag
    .withColumn(
        "PERFIL_PAGAMENTO",
        F.when(F.col("MAX_DIAS_ATRASO_SAFRA") <= 0, "PONTUAL")
         .when(F.col("MAX_DIAS_ATRASO_SAFRA") <= 10, "LEVE_ATRASO")
         .when(F.col("MAX_DIAS_ATRASO_SAFRA") <= 30, "ATRASO_MODERADO")
         .otherwise("ATRASO_SEVERO")
    )
)


4️⃣ Flags de risco temporal - Cliente reincidente em atraso

In [49]:
df_lag = df_lag.withColumn(
    "FLAG_REINCIDENCIA_ATRASO",
    F.when(
        (F.col("FLAG_ATRASO_SAFRA") == 1) &
        (F.col("LAG_FLAG_ATRASO") == 1),
        1
    ).otherwise(0)
)


Piora clara do comportamento

In [50]:
df_lag = df_lag.withColumn(
    "FLAG_PIORA_ATRASO",
    F.when(F.col("DELTA_AVG_DIAS_ATRASO") > 5, 1).otherwise(0)
)


| Variável                   | O que mede                           |
| -------------------------- | ------------------------------------ |
| `LAG_AVG_DIAS_ATRASO`      | Atraso médio da safra anterior       |
| `DELTA_AVG_DIAS_ATRASO`    | Evolução do atraso médio             |
| `PERFIL_PAGAMENTO`         | Classificação de pontualidade        |
| `FLAG_REINCIDENCIA_ATRASO` | Atraso em safras consecutivas        |
| `FLAG_PIORA_ATRASO`        | Indica deterioração do comportamento |
| `LAG_QTD_PAG_ATRASADOS`    | Histórico de volume de atrasos       |


🧠 Nota metodológica (importante para o Book)

As variáveis de lag e delta são calculadas por CPF, respeitando a ordem temporal da SAFRA_PAGAMENTO, permitindo capturar tendência, reincidência e deterioração do comportamento de pagamento.

In [51]:
df_book_datas_final = df_lag.select(
    "NUM_CPF",
    "SAFRA_PAGAMENTO",

    # Regularidade
    "DT_PRIMEIRO_PAG_SAFRA",
    "DT_ULTIMO_PAG_SAFRA",
    "SPAN_DIAS_PAG_SAFRA",

    # Atraso
    "AVG_DIAS_ATRASO_SAFRA",
    "MAX_DIAS_ATRASO_SAFRA",
    "QTD_PAG_ATRASADOS_SAFRA",
    "FLAG_ATRASO_SAFRA",

    # Ciclo
    "AVG_DIAS_CRIACAO_STATUS_SAFRA",
    "AVG_DIAS_STATUS_DEPOSITO_SAFRA",

    # Histórico
    "LAG_AVG_DIAS_ATRASO",
    "LAG_MAX_DIAS_ATRASO",
    "LAG_QTD_PAG_ATRASADOS",
    "LAG_FLAG_ATRASO",

    # Evolução
    "DELTA_AVG_DIAS_ATRASO",
    "DELTA_QTD_ATRASOS",

    # Perfil e risco
    "PERFIL_PAGAMENTO",
    "FLAG_REINCIDENCIA_ATRASO",
    "FLAG_PIORA_ATRASO"
)


In [52]:
df_book_datas_final.printSchema()

root
 |-- NUM_CPF: string (nullable = true)
 |-- SAFRA_PAGAMENTO: string (nullable = true)
 |-- DT_PRIMEIRO_PAG_SAFRA: timestamp (nullable = true)
 |-- DT_ULTIMO_PAG_SAFRA: timestamp (nullable = true)
 |-- SPAN_DIAS_PAG_SAFRA: integer (nullable = true)
 |-- AVG_DIAS_ATRASO_SAFRA: double (nullable = true)
 |-- MAX_DIAS_ATRASO_SAFRA: integer (nullable = true)
 |-- QTD_PAG_ATRASADOS_SAFRA: long (nullable = true)
 |-- FLAG_ATRASO_SAFRA: integer (nullable = true)
 |-- AVG_DIAS_CRIACAO_STATUS_SAFRA: double (nullable = true)
 |-- AVG_DIAS_STATUS_DEPOSITO_SAFRA: double (nullable = true)
 |-- LAG_AVG_DIAS_ATRASO: double (nullable = true)
 |-- LAG_MAX_DIAS_ATRASO: integer (nullable = true)
 |-- LAG_QTD_PAG_ATRASADOS: long (nullable = true)
 |-- LAG_FLAG_ATRASO: integer (nullable = true)
 |-- DELTA_AVG_DIAS_ATRASO: double (nullable = true)
 |-- DELTA_QTD_ATRASOS: long (nullable = true)
 |-- PERFIL_PAGAMENTO: string (nullable = false)
 |-- FLAG_REINCIDENCIA_ATRASO: integer (nullable = false)
 |-- 

In [53]:
len(df_book_datas_final.columns)

20

###variáveis numéricas

1️⃣ PREMISSAS DO BOOK NUMÉRICO
📌 Base transacional

📌 Uma linha final = 1 CPF + 1 Safra

📌 Foco em:

volume financeiro

recorrência

descontos / baixas

juros e multas

estabilidade

evolução histórica

In [54]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

Agregações financeiras base

In [55]:
df_book_valores = (
    df_01
    .groupBy("NUM_CPF", "SAFRA_PAGAMENTO")
    .agg(

        # Volume pago
        F.sum("VAL_PAGAMENTO_FATURA_PAG").alias("SUM_VAL_PAGO_SAFRA"),
        F.avg("VAL_PAGAMENTO_FATURA_PAG").alias("AVG_VAL_PAGO_SAFRA"),
        F.max("VAL_PAGAMENTO_FATURA_PAG").alias("MAX_VAL_PAGO_SAFRA"),
        F.min("VAL_PAGAMENTO_FATURA_PAG").alias("MIN_VAL_PAGO_SAFRA"),

        # Frequência
        F.count("VAL_PAGAMENTO_FATURA_PAG").alias("QTD_PAGAMENTOS_SAFRA"),

        # Valores de referência
        F.sum("VAL_ORIGINAL_PAGAMENTO_PAG").alias("SUM_VAL_ORIGINAL_SAFRA"),
        F.sum("VAL_ATUAL_PAGAMENTO_PAG").alias("SUM_VAL_ATUAL_SAFRA"),

        # Baixa financeira
        F.sum("VAL_BAIXA_ATIVIDADE_PAG").alias("SUM_VAL_BAIXA_SAFRA"),

        # Descontos
        F.sum("VAL_DESCONTO_ITEM_PAG").alias("SUM_DESCONTO_SAFRA"),

        # Juros e multas
        F.sum("VAL_JUROS_MULTAS_ITEM_PAG").alias("SUM_JUROS_MULTAS_SAFRA"),
        F.sum("VAL_MULTA_EQUIP_ITEM_PAG").alias("SUM_MULTA_EQUIP_ITEM_SAFRA"),
        F.sum("VAL_MULTA_EQUIP_TOTAL_PAG").alias("SUM_MULTA_EQUIP_TOTAL_SAFRA"),
        F.sum("VAL_MULTA_FID_ITEM_PAG").alias("SUM_MULTA_FID_SAFRA")
    )
)


Indicadores derivados (normalizações)

In [56]:
df_book_valores = (
    df_book_valores
    .withColumn(
        "TICKET_MEDIO_SAFRA",
        F.col("SUM_VAL_PAGO_SAFRA") / F.col("QTD_PAGAMENTOS_SAFRA")
    )
    .withColumn(
        "VAL_PAGO_LIQUIDO_SAFRA",
        F.col("SUM_VAL_PAGO_SAFRA") - F.col("SUM_DESCONTO_SAFRA")
    )
    .withColumn(
        "RATIO_BAIXA_SOBRE_PAGO",
        F.col("SUM_VAL_BAIXA_SAFRA") / F.col("SUM_VAL_PAGO_SAFRA")
    )
    .withColumn(
        "RATIO_DESCONTO_SOBRE_PAGO",
        F.col("SUM_DESCONTO_SAFRA") / F.col("SUM_VAL_PAGO_SAFRA")
    )
    .withColumn(
        "RATIO_JUROS_SOBRE_PAGO",
        F.col("SUM_JUROS_MULTAS_SAFRA") / F.col("SUM_VAL_PAGO_SAFRA")
    )
    .withColumn(
        "RATIO_MULTAS_SOBRE_PAGO",
        (
            F.col("SUM_MULTA_EQUIP_TOTAL_SAFRA") +
            F.col("SUM_MULTA_FID_SAFRA")
        ) / F.col("SUM_VAL_PAGO_SAFRA")
    )
    .withColumn(
        "DELTA_VALOR_ORIGINAL_ATUAL",
        F.col("SUM_VAL_ATUAL_SAFRA") - F.col("SUM_VAL_ORIGINAL_SAFRA")
    )
)


Estabilidade financeira

In [57]:
window_spec_cpf = Window.partitionBy("NUM_CPF").orderBy("SAFRA_PAGAMENTO")
df_book_valores = df_book_valores.withColumn(
    "STDDEV_VAL_PAGO_SAFRA",
    F.stddev("AVG_VAL_PAGO_SAFRA").over(window_spec_cpf)
)

Histórico e evolução (lags)

In [58]:
window_cpf = (
    Window
    .partitionBy("NUM_CPF")
    .orderBy("SAFRA_PAGAMENTO")
)

df_book_valores = (
    df_book_valores
    .withColumn(
        "LAG_SUM_VAL_PAGO",
        F.lag("SUM_VAL_PAGO_SAFRA").over(window_cpf)
    )
    .withColumn(
        "DELTA_SUM_VAL_PAGO",
        F.col("SUM_VAL_PAGO_SAFRA") - F.col("LAG_SUM_VAL_PAGO")
    )
)


Perfil financeiro (feature explicável)

In [59]:
df_book_valores = df_book_valores.withColumn(
    "PERFIL_VOLUME_FINANCEIRO",
    F.when(F.col("SUM_VAL_PAGO_SAFRA") < 50, "BAIXO")
     .when(F.col("SUM_VAL_PAGO_SAFRA") < 150, "MEDIO")
     .otherwise("ALTO")
)


Seleção final do Book Numérico

In [60]:
df_book_valores_final = df_book_valores.select(
    "NUM_CPF",
    "SAFRA_PAGAMENTO",

    "SUM_VAL_PAGO_SAFRA",
    "AVG_VAL_PAGO_SAFRA",
    "MAX_VAL_PAGO_SAFRA",
    "MIN_VAL_PAGO_SAFRA",
    "QTD_PAGAMENTOS_SAFRA",
    "TICKET_MEDIO_SAFRA",

    "SUM_VAL_ORIGINAL_SAFRA",
    "SUM_VAL_ATUAL_SAFRA",
    "DELTA_VALOR_ORIGINAL_ATUAL",

    "SUM_VAL_BAIXA_SAFRA",
    "RATIO_BAIXA_SOBRE_PAGO",

    "SUM_DESCONTO_SAFRA",
    "RATIO_DESCONTO_SOBRE_PAGO",
    "VAL_PAGO_LIQUIDO_SAFRA",

    "SUM_JUROS_MULTAS_SAFRA",
    "RATIO_JUROS_SOBRE_PAGO",

    "SUM_MULTA_EQUIP_TOTAL_SAFRA",
    "SUM_MULTA_FID_SAFRA",
    "RATIO_MULTAS_SOBRE_PAGO",

    "STDDEV_VAL_PAGO_SAFRA",

    "LAG_SUM_VAL_PAGO",
    "DELTA_SUM_VAL_PAGO",

    "PERFIL_VOLUME_FINANCEIRO"
)


BOOK NUMÉRICO DE PAGAMENTOS

| Variável                    | Significado                 | Pergunta / Contribuição ao negócio  |
| --------------------------- | --------------------------- | ----------------------------------- |
| SUM_VAL_PAGO_SAFRA          | Total pago na safra         | Quanto o cliente gera de receita?   |
| AVG_VAL_PAGO_SAFRA          | Média dos pagamentos        | Qual o ticket típico do cliente?    |
| MAX_VAL_PAGO_SAFRA          | Maior pagamento             | Pagamento concentrado ou parcelado? |
| MIN_VAL_PAGO_SAFRA          | Menor pagamento             | Existem valores residuais?          |
| QTD_PAGAMENTOS_SAFRA        | Nº de pagamentos            | Cliente recorrente ou pontual?      |
| TICKET_MEDIO_SAFRA          | Ticket médio calculado      | Segmentação financeira              |
| SUM_VAL_ORIGINAL_SAFRA      | Valor original da dívida    | Exposição inicial ao risco          |
| SUM_VAL_ATUAL_SAFRA         | Valor atualizado            | Houve acréscimos financeiros?       |
| DELTA_VALOR_ORIGINAL_ATUAL  | Diferença original vs atual | Endividamento crescente?            |
| SUM_VAL_BAIXA_SAFRA         | Valor efetivamente baixado  | Qualidade da liquidação             |
| RATIO_BAIXA_SOBRE_PAGO      | Baixa ÷ pago                | Pagamento integral ou parcial?      |
| SUM_DESCONTO_SAFRA          | Total de descontos          | Dependência de incentivo            |
| RATIO_DESCONTO_SOBRE_PAGO   | Desconto ÷ pago             | Fragilidade de margem               |
| VAL_PAGO_LIQUIDO_SAFRA      | Pago menos desconto         | Receita real do cliente             |
| SUM_JUROS_MULTAS_SAFRA      | Juros e multas pagos        | Custo financeiro do atraso          |
| RATIO_JUROS_SOBRE_PAGO      | Juros ÷ pago                | Penalização financeira recorrente   |
| SUM_MULTA_EQUIP_TOTAL_SAFRA | Multas contratuais          | Quebra de contrato                  |
| SUM_MULTA_FID_SAFRA         | Multa de fidelidade         | Indício de churn                    |
| RATIO_MULTAS_SOBRE_PAGO     | Multas ÷ pago               | Pressão financeira                  |
| STDDEV_VAL_PAGO_SAFRA       | Variabilidade dos valores   | Estabilidade financeira             |
| LAG_SUM_VAL_PAGO            | Total pago safra anterior   | Histórico financeiro                |
| DELTA_SUM_VAL_PAGO          | Evolução do valor pago      | Crescimento ou retração             |
| PERFIL_VOLUME_FINANCEIRO    | Classificação de volume     | Regras e estratégias                |


###variáveis categóricas

In [61]:
df_cat_agg = (
    df_01
    .groupBy("NUM_CPF", "SAFRA_PAGAMENTO")
    .agg(
        # Volume
        F.count("*").alias("QTD_PAGAMENTOS_SAFRA"),

        # Diversidade
        F.countDistinct("DW_FORMA_PAGAMENTO_PAG").alias("QTD_FORMAS_PAGAMENTO"),
        F.countDistinct("COD_METODO_PAGAMENTO_PAG").alias("QTD_METODOS_PAGAMENTO"),
        F.countDistinct("DW_BANCO_PAG").alias("QTD_BANCOS_UTILIZADOS"),
        F.countDistinct("DW_TIPO_PAGAMENTO_PAG").alias("QTD_TIPOS_PAGAMENTO"),
        F.countDistinct("DW_TIPO_FATURA_PAG").alias("QTD_TIPOS_FATURA"),

        # Status
        F.sum(F.when(F.col("IND_STATUS_PAGAMENTO_PAG") == "C", 1).otherwise(0)).alias("QTD_STATUS_PAGO"),
        F.sum(F.when(F.col("IND_STATUS_FATURA_PAG") == "C", 1).otherwise(0)).alias("QTD_FATURA_PAGA"),

        # Crédito
        F.count(
            F.when(F.col("IND_TIPO_CREDITO_PAG").isNotNull(), 1)
        ).alias("QTD_EVENTOS_CREDITO")
    )
)


In [62]:
df_cat_agg = df_cat_agg.withColumn(
    "RATIO_PAG_PAGOS",
    F.col("QTD_STATUS_PAGO") / F.col("QTD_PAGAMENTOS_SAFRA")
)


In [63]:
w_moda = Window.partitionBy("NUM_CPF", "SAFRA_PAGAMENTO") \
               .orderBy(F.desc("freq"))


In [64]:
def calcular_moda(df, col, alias):
    return (
        df.groupBy("NUM_CPF", "SAFRA_PAGAMENTO", col)
          .agg(F.count("*").alias("freq"))
          .withColumn("rn", F.row_number().over(w_moda))
          .filter(F.col("rn") == 1)
          .select("NUM_CPF", "SAFRA_PAGAMENTO", F.col(col).alias(alias))
    )


In [65]:
df_forma_dom = calcular_moda(df_01, "DW_FORMA_PAGAMENTO_PAG", "FORMA_PAGAMENTO_DOMINANTE")
df_banco_dom = calcular_moda(df_01, "DW_BANCO_PAG", "BANCO_DOMINANTE")
df_tipo_fatura_dom = calcular_moda(df_01, "DW_TIPO_FATURA_PAG", "TIPO_FATURA_DOMINANTE")
df_area_dom = calcular_moda(df_01, "DW_AREA_PAG", "AREA_DOMINANTE")
df_un_negocio_dom = calcular_moda(df_01, "DW_UN_NEGOCIO_PAG", "UN_NEGOCIO_DOMINANTE")
df_metodo_pagamento_dom = calcular_moda(df_01, "COD_METODO_PAGAMENTO_PAG", "METODO_PAGAMENTO_DOMINANTE")
df_tipo_pagamento_dom = calcular_moda(df_01, "DW_TIPO_PAGAMENTO_PAG", "TIPO_PAGAMENTO_DOMINANTE")


In [66]:
df_cat = (
    df_cat_agg
    .join(df_forma_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_banco_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_tipo_fatura_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_area_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_un_negocio_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_metodo_pagamento_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_tipo_pagamento_dom, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
)


In [67]:
w_time = Window.partitionBy("NUM_CPF").orderBy("SAFRA_PAGAMENTO")


In [68]:
df_cat = (
    df_cat
    .withColumn("FORMA_PAGAMENTO_ANT",
                F.lag("FORMA_PAGAMENTO_DOMINANTE").over(w_time))
    .withColumn("BANCO_ANT",
                F.lag("BANCO_DOMINANTE").over(w_time))

    .withColumn(
        "FLAG_MUDANCA_FORMA_PAG",
        F.when(
            (F.col("FORMA_PAGAMENTO_ANT").isNotNull()) &
            (F.col("FORMA_PAGAMENTO_DOMINANTE") != F.col("FORMA_PAGAMENTO_ANT")),
            1
        ).otherwise(0)
    )

    .withColumn(
        "FLAG_MUDANCA_BANCO",
        F.when(
            (F.col("BANCO_ANT").isNotNull()) &
            (F.col("BANCO_DOMINANTE") != F.col("BANCO_ANT")),
            1
        ).otherwise(0)
    )
)


In [69]:
df_cat = (
    df_cat
    .withColumn("FLAG_MULTIPLAS_FORMAS_PAG",
                F.when(F.col("QTD_FORMAS_PAGAMENTO") > 1, 1).otherwise(0))
    .withColumn("FLAG_MULTIPLOS_BANCOS",
                F.when(F.col("QTD_BANCOS_UTILIZADOS") > 1, 1).otherwise(0))
)


In [70]:
df_book_categorico = df_cat.select(
    "NUM_CPF",
    "SAFRA_PAGAMENTO",

    "QTD_FORMAS_PAGAMENTO",
    "QTD_METODOS_PAGAMENTO",
    "QTD_BANCOS_UTILIZADOS",
    "QTD_TIPOS_PAGAMENTO",
    "QTD_TIPOS_FATURA",

    "QTD_STATUS_PAGO",
    "QTD_FATURA_PAGA",
    "RATIO_PAG_PAGOS",

    "QTD_EVENTOS_CREDITO",

    "FORMA_PAGAMENTO_DOMINANTE",
    "BANCO_DOMINANTE",
    "TIPO_FATURA_DOMINANTE",
    "AREA_DOMINANTE",
    "UN_NEGOCIO_DOMINANTE",
    "TIPO_PAGAMENTO_DOMINANTE",
    "METODO_PAGAMENTO_DOMINANTE",


    "FLAG_MULTIPLAS_FORMAS_PAG",
    "FLAG_MULTIPLOS_BANCOS",
    "FLAG_MUDANCA_FORMA_PAG",
    "FLAG_MUDANCA_BANCO"
)


VOLUME & DIVERSIDADE

| Variável              | Significado                              | Pergunta de negócio que responde                     |
| --------------------- | ---------------------------------------- | ---------------------------------------------------- |
| QTD_PAGAMENTOS_SAFRA  | Total de registros de pagamento na safra | Cliente teve quantos eventos de pagamento?           |
| QTD_FORMAS_PAGAMENTO  | Nº de formas distintas usadas            | Cliente é consistente ou alterna forma de pagamento? |
| QTD_METODOS_PAGAMENTO | Nº de métodos distintos                  | Pagamentos são simples ou complexos?                 |
| QTD_BANCOS_UTILIZADOS | Nº de bancos distintos                   | Cliente centraliza ou pulveriza pagamentos?          |
| QTD_TIPOS_PAGAMENTO   | Nº de tipos distintos de pagamento       | Pagamentos recorrentes ou variados?                  |
| QTD_TIPOS_FATURA      | Nº de tipos distintos de fatura          | Complexidade da estrutura de cobrança                |


DOMINÂNCIA (MODA)

| Variável                   | Significado                  | Pergunta de negócio                            |
| -------------------------- | ---------------------------- | ---------------------------------------------- |
| FORMA_PAGAMENTO_DOMINANTE  | Forma mais usada na safra    | Qual meio o cliente prefere pagar?             |
| METODO_PAGAMENTO_DOMINANTE | Método mais frequente        | Preferência operacional (PIX, débito, boleto)? |
| BANCO_DOMINANTE            | Banco mais utilizado         | Existe relacionamento bancário principal?      |
| TIPO_PAGAMENTO_DOMINANTE   | Tipo de pagamento mais comum | Pagamento é recorrente ou eventual?            |
| TIPO_FATURA_DOMINANTE      | Tipo de fatura predominante  | Estrutura principal de cobrança                |
| AREA_DOMINANTE             | Área responsável             | Contexto institucional do pagamento            |
| UN_NEGOCIO_DOMINANTE       | Unidade de negócio dominante | Segmento principal do cliente                  |


STATUS & QUALIDADE

| Variável        | Significado                      | Pergunta de negócio                   |
| --------------- | -------------------------------- | ------------------------------------- |
| QTD_STATUS_PAGO | Nº de pagamentos com status pago | Histórico de adimplência              |
| QTD_FATURA_PAGA | Nº de faturas pagas              | Efetividade da cobrança               |
| RATIO_PAG_PAGOS | % de pagamentos pagos            | Qualidade do comportamento financeiro |


CRÉDITO & EVENTOS

| Variável            | Significado                 | Pergunta de negócio                  |
| ------------------- | --------------------------- | ------------------------------------ |
| QTD_EVENTOS_CREDITO | Nº de registros com crédito | Cliente recebeu crédito/ajuste?      |
| FLAG_EVENTO_CREDITO | 1 se houve crédito na safra | Evento financeiro relevante ocorreu? |


FLAGS COMPORTAMENTAIS

| Variável                  | Significado                   | Pergunta de negócio                  |
| ------------------------- | ----------------------------- | ------------------------------------ |
| FLAG_MULTIPLAS_FORMAS_PAG | 1 se usou mais de uma forma   | Indício de instabilidade operacional |
| FLAG_MULTIPLOS_BANCOS     | 1 se usou mais de um banco    | Fragmentação financeira              |
| FLAG_MUDANCA_FORMA_PAG    | Mudou forma vs safra anterior | Alteração recente de hábito?         |
| FLAG_MUDANCA_BANCO        | Mudou banco vs safra anterior | Mudança estrutural financeira?       |


#Book de pagamentos

In [71]:
# Consolidation of the book, avoiding duplicate columns
df_book = (
    df_book_datas_final
    .join(df_book_valores_final, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")
    .join(df_book_categorico, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")

)

In [72]:
df_book.printSchema()

root
 |-- NUM_CPF: string (nullable = true)
 |-- SAFRA_PAGAMENTO: string (nullable = true)
 |-- DT_PRIMEIRO_PAG_SAFRA: timestamp (nullable = true)
 |-- DT_ULTIMO_PAG_SAFRA: timestamp (nullable = true)
 |-- SPAN_DIAS_PAG_SAFRA: integer (nullable = true)
 |-- AVG_DIAS_ATRASO_SAFRA: double (nullable = true)
 |-- MAX_DIAS_ATRASO_SAFRA: integer (nullable = true)
 |-- QTD_PAG_ATRASADOS_SAFRA: long (nullable = true)
 |-- FLAG_ATRASO_SAFRA: integer (nullable = true)
 |-- AVG_DIAS_CRIACAO_STATUS_SAFRA: double (nullable = true)
 |-- AVG_DIAS_STATUS_DEPOSITO_SAFRA: double (nullable = true)
 |-- LAG_AVG_DIAS_ATRASO: double (nullable = true)
 |-- LAG_MAX_DIAS_ATRASO: integer (nullable = true)
 |-- LAG_QTD_PAG_ATRASADOS: long (nullable = true)
 |-- LAG_FLAG_ATRASO: integer (nullable = true)
 |-- DELTA_AVG_DIAS_ATRASO: double (nullable = true)
 |-- DELTA_QTD_ATRASOS: long (nullable = true)
 |-- PERFIL_PAGAMENTO: string (nullable = false)
 |-- FLAG_REINCIDENCIA_ATRASO: integer (nullable = false)
 |-- 

In [73]:
len(df_book.columns)

63

#Construção das janelas temporárias de 1, 3. 6, 9, 12 meses

## Função genérica para janelas temporais VERSÃO COMPLETA

In [74]:
def window_features(df, months):

    w = Window.partitionBy("NUM_CPF") \
              .orderBy("SAFRA_PAGAMENTO") \
              .rowsBetween(-months + 1, 0)

    df_w = df.select (
        "NUM_CPF",
        "SAFRA_PAGAMENTO",

        # =========================
        # INTENSIDADE/MÉDIA (nível médio)
        F.avg("QTD_PAG_ATRASADOS_SAFRA").over(w).alias(f"mean_QTD_PAG_ATRASADOS_SAFRA_u{months}m"),
        F.avg("QTD_PAGAMENTOS_SAFRA").over(w).alias(f"mean_QTD_PAGAMENTOS_SAFRA_u{months}m"),



        # =========================
        #  FREQUENCIA/CONTAGEM
        # =========================
        F.count("QTD_PAG_ATRASADOS_SAFRA").over(w).alias(f"freq_QTD_PAG_ATRASADOS_SAFRA_u{months}m"),
        F.count("QTD_PAGAMENTOS_SAFRA").over(w).alias(f"freq_QTD_PAGAMENTOS_SAFRA_u{months}m"),

        # =========================
        # VOLATILIDADE/VARIABILIDADE
        # =========================


        # =========================
        # VOLUNE/SOMA
        # =========================
        # QTD_FATURAS_ATRASO
        F.sum("QTD_PAG_ATRASADOS_SAFRA").over(w).alias(f"cumsum_QTD_PAG_ATRASADOS_SAFRA_u{months}m"),
        F.sum("QTD_PAGAMENTOS_SAFRA").over(w).alias(f"cumsum_QTD_PAGAMENTOS_SAFRA_u{months}m"),



        # =========================
        # EXTREMOS/MÍNIMO MÁXIMO
        # =========================


        # =========================
        # DISTRIBUIÇÃO (cauda)
        # =========================
        F.skewness("QTD_PAG_ATRASADOS_SAFRA").over(w).alias(f"skewness_QTD_PAG_ATRASADOS_SAFRA_u{months}m"),
        F.skewness("QTD_PAGAMENTOS_SAFRA").over(w).alias(f"skewness_QTD_PAGAMENTOS_SAFRA_u{months}m"),




            # =========================
            # QUARTIZ
            # =========================


            # =========================
            # PERCENTIL
            # =========================

        # =========================
        # PERSISTÊNCIA
        # =========================
        F.sum("FLAG_ATRASO_SAFRA").over(w).alias(f"qtd_meses_FLAG_ATRASO_SAFRA_u{months}m"),
        #FLAG_REINCIDENCIA_ATRASO
        F.sum("FLAG_REINCIDENCIA_ATRASO").over(w).alias(f"qtd_meses_FLAG_REINCIDENCIA_ATRASO_u{months}m"),




        # =========================
        # FLAGS TEMPORAIS
        # =========================
        F.max("FLAG_ATRASO_SAFRA").over(w).alias(f"flg_FLAG_ATRASO_SAFRA_u{months}m"),
        F.max("FLAG_REINCIDENCIA_ATRASO").over(w).alias(f"flg_FLAG_REINCIDENCIA_ATRASO_u{months}m"),


        # =========================
        # RANKING
        # =========================

        # =========================
        # FEATURES CRUZADAS
        # =========================

        # =========================
        # VARIAVEIS DE COMPARAÇÃO
        # Comparamos janelas curtas com janelas longas para identificar mudanças de comportamento.
        # =========================

        # =========================
        #       DETERIORAÇÃO/MELHORA
        # =========================


        # =========================
        #       ACELERAÇÃO
        #       Exemplo: Se a razão for > 1, o cliente está gastando mais do que o seu padrão histórico.
        # =========================


        # =========================
        #       DESACELERAÇÃO
        #       Exemplo: Se a razão for < 1, o cliente está gastando menos do que o seu padrão histórico.
        # =========================



    )

    return df_w

## Construção de janela de 1 mês do datraframe df_book

In [75]:
df_m1 = window_features(df_book, 1)

In [76]:
df_m1.printSchema()

root
 |-- NUM_CPF: string (nullable = true)
 |-- SAFRA_PAGAMENTO: string (nullable = true)
 |-- mean_QTD_PAG_ATRASADOS_SAFRA_u1m: double (nullable = true)
 |-- mean_QTD_PAGAMENTOS_SAFRA_u1m: double (nullable = true)
 |-- freq_QTD_PAG_ATRASADOS_SAFRA_u1m: long (nullable = false)
 |-- freq_QTD_PAGAMENTOS_SAFRA_u1m: long (nullable = false)
 |-- cumsum_QTD_PAG_ATRASADOS_SAFRA_u1m: long (nullable = true)
 |-- cumsum_QTD_PAGAMENTOS_SAFRA_u1m: long (nullable = true)
 |-- skewness_QTD_PAG_ATRASADOS_SAFRA_u1m: double (nullable = true)
 |-- skewness_QTD_PAGAMENTOS_SAFRA_u1m: double (nullable = true)
 |-- qtd_meses_FLAG_ATRASO_SAFRA_u1m: long (nullable = true)
 |-- qtd_meses_FLAG_REINCIDENCIA_ATRASO_u1m: long (nullable = true)
 |-- flg_FLAG_ATRASO_SAFRA_u1m: integer (nullable = true)
 |-- flg_FLAG_REINCIDENCIA_ATRASO_u1m: integer (nullable = true)



In [77]:
# numero de colunas do dataframe pyspark
len(df_m1.columns)


14

## Construção de janela de 3 meses do datraframe df_book

In [78]:
df_m3 = window_features(df_book, 3)

## Construção de janela de 6 meses do datraframe df_book

In [79]:
df_m6 = window_features(df_book, 6)

## Construção de janela de 9 meses do datraframe df_book

---



In [80]:
df_m9 = window_features(df_book, 9)

## Construção de janela de 12 meses do datraframe df_book



---



In [81]:
df_m12 = window_features(df_book, 12)

## Book final com as janelas

In [82]:
df_features = df_book \
    .select("*") \
    .join(df_m1, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left") \
    .join(df_m3, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left") \
    .join(df_m6, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left") \
    .join(df_m9, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left") \
    .join(df_m12, ["NUM_CPF", "SAFRA_PAGAMENTO"], "left")

In [None]:
df_features.printSchema()

In [84]:
len (df_features.columns)

123

In [83]:
df_book_pagamentos_202502 = df_features.filter(F.col('SAFRA_PAGAMENTO') == 202502)
df_book_pagamentos_202503 = df_features.filter(F.col('SAFRA_PAGAMENTO') == 202503)
# df_book_pagamentos_202411 = df_features.filter(F.col('SAFRA_PAGAMENTO') == 202411)
# df_book_pagamentos_202410 = df_features.filter(F.col('SAFRA_PAGAMENTO') == 202410)

In [None]:
# spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
# spark.conf.set("spark.sql.parquet.mergeSchema", "false")
# spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

In [86]:
from google.colab import drive
drive.mount("/content/drive")
from pyspark.sql import functions as F
from pyspark.sql.types import NullType
import math

def save_parquet_wide_df(
    df,
    base_path,
    block_size=300,
    n_partitions=24,
    mode="overwrite"
):
    """
    Salva DataFrame Spark muito largo (ex: 3000+ colunas)
    em blocos de colunas no Google Drive (ou FS estável).

    Cada bloco é um parquet separado.
    """

    # =========================
    # 1. Sanear schema
    # =========================
    for field in df.schema.fields:
        if isinstance(field.dataType, NullType):
            df = df.withColumn(field.name, F.lit(None).cast("string"))

    # =========================
    # 2. Repartition (NUNCA coalesce)
    # =========================
    df = df.repartition(n_partitions)

    # =========================
    # 3. Preparar blocos
    # =========================
    cols = df.columns
    total_cols = len(cols)
    n_blocks = math.ceil(total_cols / block_size)

    print(f"→ Total colunas: {total_cols}")
    print(f"→ Blocos: {n_blocks} (≈ {block_size} colunas/bloco)")
    print(f"→ Partições: {n_partitions}")

    # =========================
    # 4. Salvar metadados
    # =========================
    spark.createDataFrame(
        [(i, c) for i, c in enumerate(cols)],
        ["ordem", "coluna"]
    ).write.mode(mode).parquet(f"{base_path}/metadata")

    # =========================
    # 5. Salvar blocos
    # =========================
    for i in range(n_blocks):
        start = i * block_size
        end = min(start + block_size, total_cols)
        block_cols = cols[start:end]

        block_path = f"{base_path}/block{i:03d}"

        print(f"→ Salvando bloco {i+1}/{n_blocks} | cols {start}:{end}")

        (
            df.select(*block_cols)
              .write
              .mode(mode)
              .parquet(block_path)
        )

    print("✅ Salvamento concluído com sucesso.")



save_parquet_wide_df(
    df=df_book_pagamentos_202503,
    #base_path="/content/drive/MyDrive/HACKATHON_2025/recargas_book_202501",
    base_path="/content/drive/MyDrive/HACKATHON_2025/book_pagamentos_202503",
    block_size=300,     # ajuste se necessário (300–500)
    n_partitions=24     # ajuste c
)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
→ Total colunas: 123
→ Blocos: 1 (≈ 300 colunas/bloco)
→ Partições: 24
→ Salvando bloco 1/1 | cols 0:123
✅ Salvamento concluído com sucesso.
