# Produ√ß√£o Tem√°tica PSI ‚Äì Assistente Executivo Master
### Tema: Empr√©stimos e Contratos Financeiros (Open Finance / CSVs PSI)
**Autor:** Sandro Kazanoski Bartz  \
**Notebook (.ipynb) ‚Äì compat√≠vel com Databricks**  \
**Data de gera√ß√£o:** 2026-01-29 01:51:31\n
> Este notebook foi preparado para rodar em **Databricks** (pyspark dispon√≠vel) **ou** localmente (fallback com *SparkSession* local / Pandas). Inclui: limpeza, checagens de qualidade, modelagem anal√≠tica, EDA, constru√ß√£o de *features*, KPIs e *storytelling*.

## Objetivos do desafio
- Carregar e compreender os dados de empr√©stimos/contratos.
- Realizar **limpeza** e **verifica√ß√£o de qualidade**.
- Construir **tabelas curadas** (camada *silver/gold*) para an√°lise.
- Identificar **padr√µes/tend√™ncias** e extrair **insights de neg√≥cio**.
- Entregar **visualiza√ß√µes** e **storytelling** para gestores.

### Arquivos esperados (CSV)
- `psi_convenentes_2026-01-13_17-09-53.csv`
- `psi_emprestimo_contrato_2026-01-13_17-09-53.csv`
- `psi_emprestimo_contrato_encargo_financeiro_2026-01-13_17-09-53.csv`
- `psi_emprestimo_contrato_tarifa_2026-01-13_17-09-53.csv`
- `psi_emprestimo_contrato_taxa_juros_2026-01-13_17-09-53.csv`
- `psi_emprestimo_garantia_2026-01-13_17-09-53.csv`
- `psi_emprestimo_pagamento_2026-01-15.csv`
- `psi_emprestimo_pagamento_lancamento_2026-01-15.csv`
- `psi_emprestimo_parcela_programada_2026-01-15.csv`

> **Observa√ß√£o:** o notebook √© resiliente a pequenas varia√ß√µes de esquema: detecta cabe√ßalhos, usa delimitador `;` e tenta *inferSchema* com tratamento de datas t√≠picas (ISO/BR).

In [None]:
# ‚õèÔ∏è Setup b√°sico (funciona em Databricks e fora)
import os, sys, math, json, textwrap, datetime as dt
from typing import List, Dict

try:
    spark
except NameError:
    # Se n√£o estiver em Databricks, cria uma SparkSession local
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("PSI_Analise_Emprestimos").getOrCreate()

from pyspark.sql import functions as F, types as T

print(f'Spark versao: {spark.version}')

# Widgets (Databricks) - tolerante a aus√™ncia do dbutils
def get_widget_or_env(name, default):
    try:
        dbutils.widgets.text(name, default)
        return dbutils.widgets.get(name)
    except Exception:
        return os.environ.get(name, default)

BASE_PATH = get_widget_or_env('BASE_PATH', '.')
DATE_START = get_widget_or_env('DATE_START', '2018-01-01')
DATE_END   = get_widget_or_env('DATE_END',   '2036-12-31')
print('BASE_PATH =', BASE_PATH)
print('Intervalo =', DATE_START, '‚Üí', DATE_END)


In [None]:
# üìÑ Mapeamento de arquivos CSV
files = {
  'convenentes': 'psi_convenentes_2026-01-13_17-09-53.csv',
  'contrato'  : 'psi_emprestimo_contrato_2026-01-13_17-09-53.csv',
  'encargo'   : 'psi_emprestimo_contrato_encargo_financeiro_2026-01-13_17-09-53.csv',
  'tarifa'    : 'psi_emprestimo_contrato_tarifa_2026-01-13_17-09-53.csv',
  'taxa'      : 'psi_emprestimo_contrato_taxa_juros_2026-01-13_17-09-53.csv',
  'garantia'  : 'psi_emprestimo_garantia_2026-01-13_17-09-53.csv',
  'pagamento' : 'psi_emprestimo_pagamento_2026-01-15.csv',
  'lancamento': 'psi_emprestimo_pagamento_lancamento_2026-01-15.csv',
  'parcela'   : 'psi_emprestimo_parcela_programada_2026-01-15.csv'
}

def path(name):
    return os.path.join(BASE_PATH, files[name])

files


In [None]:
# üì• Fun√ß√µes utilit√°rias de leitura com tratamento padr√£o (delimitador ';', header, inferSchema)
def read_csv_semicolon(fp):
    return (spark.read
            .option('header','true')
            .option('delimiter',';')
            .option('inferSchema','true')
            .csv(fp))

dfs = {}
for k in files:
    fp = path(k)
    try:
        df = read_csv_semicolon(fp)
        dfs[k] = df
        print(k, '‚Üí', df.count(), 'linhas |', len(df.columns), 'colunas')
    except Exception as e:
        print(f'AVISO: falha ao ler {fp}: {e}')

# Exibe schemas detectados (amostras)
for k, df in dfs.items():
    print('
Schema:', k)
    df.printSchema()


## Qualidade dos Dados (DQ)
Ser√£o aplicados *checks* essenciais: nulidade, duplicidade de chaves, tipos e faixas v√°lidas, datas inconsistentes e reconcilia√ß√µes simples.

In [None]:
# üîç Fun√ß√µes de DQ
def null_report(df, name):
    rows = df.count()
    exprs = [F.sum(F.when(F.col(c).isNull() | F.col(c).isin('','null','NULL'), 1).otherwise(0)).alias(c) for c in df.columns]
    nr = df.select(exprs).collect()[0].asDict()
    out = [(k, int(v), round(int(v)/rows,4) if rows else 0) for k,v in nr.items()]
    return spark.createDataFrame(out, ['coluna','n_null','perc_null'])

def dup_count(df, cols:List[str]):
    return df.groupBy([F.col(c) for c in cols]).count().filter('count>1')

# Exemplos de chaves prov√°veis
key_contrato = ['contractId'] if 'contrato' in dfs and 'contractId' in dfs['contrato'].columns else None
key_pagto    = ['paymentId'] if 'pagamento' in dfs and 'paymentId' in dfs['pagamento'].columns else None
key_parcela  = ['contractId','instalmentNumber'] if 'parcela' in dfs else None

reports = {}
for name, df in dfs.items():
    reports[name] = null_report(df, name)
    print(f'Relat√≥rio de nulos: {name}')
    reports[name].orderBy(F.desc('perc_null')).show(10, truncate=False)

if key_contrato:
    print('Duplicatas em contrato:')
    dup_count(dfs['contrato'], key_contrato).show(5, False)
if key_parcela and set(key_parcela).issubset(set(dfs['parcela'].columns)):
    print('Duplicatas em parcela:')
    dup_count(dfs['parcela'], key_parcela).show(5, False)


## Limpeza e Padroniza√ß√µes
- *Trim* e *upper* em campos de texto de classifica√ß√£o (produto, tipo/subtipo).
- Convers√£o de datas (`contractDate`, `dueDate`, `firstInstalmentDueDate`, etc.).
- Normaliza√ß√£o de valores num√©ricos (substitui v√≠rgula por ponto se necess√°rio).
- Harmoniza√ß√£o de chaves (`contractId`, `ipocCode`).

In [None]:
def to_date(col):
    # Tenta m√∫ltiplos formatos
    return F.coalesce(
        F.to_date(col, 'yyyy-MM-dd'),
        F.to_date(col, 'yyyy-MM-dd'T'HH:mm:ss.SSS'Z''),
        F.to_date(col, 'dd/MM/yyyy'),
        F.to_date(col, 'dd-MM-yyyy')
    )

def normalize_numeric(c):
    return F.regexp_replace(F.regexp_replace(F.col(c), ',', '.'), ' ', '').cast('double')

contrato = dfs.get('contrato')
if contrato is not None:
    cols_up = [c for c in ['productName','productType','productSubType','productSubTypeCategory','amortizationScheduled','instalmentPeriodicity','currency','nomeInstituicao'] if c in contrato.columns]
    for c in cols_up:
        contrato = contrato.withColumn(c, F.upper(F.trim(F.col(c))))
    # Datas
    for c in ['contractDate','settlementDate','dueDate','firstInstalmentDueDate']:
        if c in contrato.columns:
            contrato = contrato.withColumn(c, to_date(F.col(c)))
    # Num√©ricos
    for c in ['contractAmount','CET','nextInstalmentAmount']:
        if c in contrato.columns:
            contrato = contrato.withColumn(c, normalize_numeric(c))
    dfs['contrato'] = contrato.cache()
    print('Contrato limpo:', contrato.count(), 'linhas')

# Limpeza similar para parcelas e pagamentos (se existirem)
for name in ['parcela','pagamento','lancamento','encargo','tarifa','taxa']:
    df = dfs.get(name)
    if df is None:
        continue
    for c in df.columns:
        if df.schema[c].dataType.simpleString() == 'string':
            df = df.withColumn(c, F.trim(F.col(c)))
    # tentativa gen√©rica de converter campos de data conhecidos
    date_like = [c for c in df.columns if 'date' in c.lower() or 'data' in c.lower()]
    for c in date_like:
        df = df.withColumn(c, to_date(F.col(c)))
    dfs[name] = df.cache()
    print(f'{name} limpo:', df.count(), 'linhas')


## Modelo Anal√≠tico (Curado)
Construiremos *facts* e *dims*:
- **dim_convenente** (setor, prazos e faixas)
- **dim_produto** (tipo/subtipo/categoria/amortiza√ß√£o/periodicidade)
- **f_contrato** (gr√£o: `contractId`)
- **f_parcela_programada** (gr√£o: `contractId` + n¬∫ parcela)
- **f_pagamento** e **f_lancamento** (fluxos)
- *lookups* de **tarifa/encargo/taxa/garantia** ligadas ao contrato.

In [None]:
contrato = dfs.get('contrato')
conven   = dfs.get('convenentes')
parcela  = dfs.get('parcela')
pagto    = dfs.get('pagamento')
lcto     = dfs.get('lancamento')
encargo  = dfs.get('encargo')
tarifa   = dfs.get('tarifa')
taxa     = dfs.get('taxa')
garantia = dfs.get('garantia')

# dim_produto
dim_produto = None
if contrato is not None:
    attrs = [c for c in ['productName','productType','productSubType','productSubTypeCategory','amortizationScheduled','instalmentPeriodicity'] if c in contrato.columns]
    dim_produto = (contrato
        .select(attrs)
        .dropDuplicates()
        .withColumn('produto_id', F.sha2(F.concat_ws('||', *[F.coalesce(F.col(c), F.lit('')) for c in attrs]), 256))
    )

# dim_convenente
dim_convenente = None
if conven is not None:
    csel = [c for c in ['convenente','cpf_cnpj','codigo_situacao','situacao','setor','prazo_inicial','prazo_final','faixa_b','faixa_c'] if c in conven.columns]
    dim_convenente = (conven
        .select(csel)
        .withColumnRenamed('cpf_cnpj','cpfCnpj_convenente')
        .withColumn('convenente_id', F.sha2(F.concat_ws('||', *[F.coalesce(F.col(c), F.lit('')) for c in csel]), 256))
        .dropDuplicates())

# f_contrato
f_contrato = None
if contrato is not None:
    f_contrato = contrato
    if dim_produto is not None:
        f_contrato = (f_contrato
            .join(dim_produto, on=attrs, how='left'))
    # chave convenente por cpfCnpj quando houver
    if dim_convenente is not None and 'cpfCnpj' in f_contrato.columns:
        f_contrato = (f_contrato
            .join(dim_convenente.select('convenente_id','cpfCnpj_convenente'),
                  f_contrato.cpfCnpj == dim_convenente.cpfCnpj_convenente, 'left'))
    # Janela para prazo do contrato em meses (se dueDate/contractDate dispon√≠veis)
    if set(['contractDate','dueDate']).issubset(set(f_contrato.columns)):
        f_contrato = f_contrato.withColumn('prazo_meses_est',
            F.floor(F.months_between(F.col('dueDate'), F.col('contractDate'))))

# f_parcela_programada
f_parcela = None
if parcela is not None:
    f_parcela = parcela

# f_pagamento e f_lancamento
f_pagamento = pagto if pagto is not None else None
f_lancamento = lcto if lcto is not None else None

# Links auxiliares (tarifa/encargo/taxa/garantia) agregadas por contrato
def agg_by_contract(df, value_cols:List[str], prefix:str):
    sels = ['contractId'] + [c for c in value_cols if c in df.columns]
    aggs = [F.sum(F.col(c)).alias(f'{prefix}_{c}_sum') for c in value_cols if c in df.columns]
    return df.select(*sels).groupBy('contractId').agg(*aggs)

agg_list = []
if encargo is not None:
    cols = [c for c in ['valor','valorCalculado','valorOriginal','valorAtual'] if c in encargo.columns]
    agg_list.append(agg_by_contract(encargo, cols, 'encargo'))
if tarifa is not None:
    cols = [c for c in ['valor','valorCalculado','valorOriginal','valorAtual'] if c in tarifa.columns]
    agg_list.append(agg_by_contract(tarifa, cols, 'tarifa'))
if taxa is not None:
    cols = [c for c in ['taxaNominal','taxaEfetiva','taxaAnual'] if c in taxa.columns]
    agg_list.append(agg_by_contract(taxa, cols, 'taxa'))
if garantia is not None:
    g_cnt = garantia.groupBy('contractId').count().withColumnRenamed('count','qtd_garantias')
    agg_list.append(g_cnt)

# Enriquecimento do contrato
if f_contrato is not None:
    for agg in agg_list:
        f_contrato = f_contrato.join(agg, on='contractId', how='left')
    f_contrato = f_contrato.cache()
    print('f_contrato pronto:', f_contrato.count(), 'linhas')


## KPIs e Indicadores
- **Carteira (R$)**: soma de `contractAmount`.
- **CET m√©dio** por produto/subtipo/categoria.
- **Prazo m√©dio** (meses).
- **Intensidade de encargos/tarifas** por contrato.
- **Uso de garantias** (# por contrato).
- **Adimpl√™ncia/Inadimpl√™ncia** (*proxy* via pagamentos atrasados, se campos dispon√≠veis).

In [None]:
from pyspark.sql.window import Window

if f_contrato is not None:
    print('Carteira total (R$):')
    f_contrato.select(F.sum('contractAmount').alias('carteira_total')).show()

    kpi = f_contrato
    if 'productSubType' in kpi.columns:
        print('CET m√©dio por subproduto:')
        kpi.groupBy('productSubType').agg(F.avg('CET').alias('CET_medio'), F.count('*').alias('qtd')).orderBy(F.desc('qtd')).show(20, False)

    if 'prazo_meses_est' in kpi.columns:
        print('Prazo m√©dio (meses):')
        kpi.select(F.avg('prazo_meses_est')).show()

# Proxy simples de atraso se existirem datas de parcela programada e pagamento
if 'parcela' in dfs and 'pagamento' in dfs:
    parc = dfs['parcela']
    pay  = dfs['pagamento']
    # tenta chaves padr√£o
    join_cols = [c for c in ['contractId','instalmentNumber'] if c in parc.columns and c in pay.columns]
    if join_cols:
        pago = (parc.alias('p')
            .join(pay.alias('y'), on=join_cols, how='left'))
        # atraso (dias) se houver dueDate/dtPagamento
        for dcol in ['dueDate','dataVencimento','dataVenc']:
            if dcol in pago.columns:
                duec = dcol
                break
        else:
            duec = None
        for pcol in ['paymentDate','dataPagamento','dtPagamento']:
            if pcol in pago.columns:
                payc = pcol
                break
        else:
            payc = None
        if duec and payc:
            pago = pago.withColumn('dias_atraso', F.datediff(F.col(payc), F.col(duec)))
            print('Distribui√ß√£o de atraso (amostra):')
            pago.groupBy(F.when(F.col('dias_atraso')>0, 'ATRASO').otherwise('EM DIA').alias('status')).count().show()


## Visualiza√ß√µes (r√°pidas)
> Em Databricks, use `display()` nos *DataFrames*. Abaixo, exemplo com Pandas/Matplotlib para exportar gr√°ficos simples.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (10,5)
plt.rcParams['axes.titlesize'] = 12

if f_contrato is not None:
    top_prod = (f_contrato
        .groupBy('productSubType')
        .agg(F.sum('contractAmount').alias('carteira'))
        .orderBy(F.desc('carteira'))
        .limit(10))
    pdf = top_prod.toPandas()
    pdf.plot(kind='bar', x='productSubType', y='carteira', legend=False)
    plt.title('Top 10 Subprodutos por Carteira (R$)')
    plt.ylabel('Carteira (R$)')
    plt.xlabel('Subproduto')
    plt.tight_layout()
    plt.show()


## Storytelling (Template)
**Pergunta de neg√≥cio:** Como otimizar a carteira de cr√©dito pessoal (com/sem consigna√ß√£o) maximizando margem com risco controlado?

**Principais achados (preencha com a sua execu√ß√£o dos KPIs acima):**
1. Subprodutos com maior CET m√©dio e *take-up* de garantias indicam ...
2. Prazo m√©dio de X meses com concentra√ß√£o entre Y‚ÄìZ meses sugere ...
3. Segmentos de convenentes (setor p√∫blico x INSS) exibem ...
4. Proxy de atraso indica taxa de inadimpl√™ncia de ~A% em N contratos ...

**Oportunidade de neg√≥cio (exemplo):**
- Ajustar *pricing* (CET) e *mix* de prazos em subprodutos com maior propens√£o a atraso;
- Priorizar origina√ß√£o em conv√™nios com *spread* e *loss given default* mais favor√°veis;
- Melhorar *onboarding* documental e checagens de elegibilidade para reduzir custos de tarifa/encargo.

## Persist√™ncia (Delta/Parquet)
No Databricks, recomenda-se salvar em **Delta**. Abaixo, usamos Parquet como *fallback*.

In [None]:
OUTPUT_PATH = os.path.join(BASE_PATH, 'curated')
dbutils_fs_available = 'dbutils' in globals() and hasattr(dbutils, 'fs')
if dbutils_fs_available:
    try:
        dbutils.fs.mkdirs(f'file:{OUTPUT_PATH}')
    except Exception:
        pass
else:
    os.makedirs(OUTPUT_PATH, exist_ok=True)

to_save = {'f_contrato': 'f_contrato', 'dim_produto': 'dim_produto', 'dim_convenente': 'dim_convenente'}
for name, df in [('f_contrato', globals().get('f_contrato')), ('dim_produto', globals().get('dim_produto')), ('dim_convenente', globals().get('dim_convenente'))]:
    if df is None:
        continue
    dest = os.path.join(OUTPUT_PATH, name)
    print('Salvando', name, 'em', dest)
    try:
        df.write.mode('overwrite').parquet(dest)
    except Exception as e:
        print('Falha ao salvar', name, e)


## Dicion√°rio de Dados (gerado automaticamente a partir dos *DataFrames*)

In [None]:
def schema_table(df, name):
    rows = [(f.name, f.dataType.simpleString(), f.nullable) for f in df.schema.fields]
    return spark.createDataFrame(rows, ['coluna','tipo','aceita_nulo']).withColumn('tabela', F.lit(name))

tab_list = []
for name, df in dfs.items():
    try:
        tab_list.append(schema_table(df, name))
    except Exception:
        pass
if tab_list:
    dic = tab_list[0]
    for t in tab_list[1:]:
        dic = dic.unionByName(t, allowMissingColumns=True)
    display(dic.orderBy('tabela','coluna')) if 'display' in globals() else dic.orderBy('tabela','coluna').show(200, False)


## Governan√ßa de Dados & Conformidade
- Minimizar exposi√ß√£o de PII: utilizar *hashes* ou *masking* para `cpfCnpj`.
- Controle de acesso por ambiente/pasta em Databricks.
- *Versionamento* do notebook e dados curados.
- Testes de DQ em *jobs* agendados.

---
**FIM ‚Äì PSI Empr√©stimos | Sandro Kazanoski Bartz**

> Sugest√£o: publicar *insights* e tabelas *gold* em um dashboard (Power BI / Databricks SQL) com *slicer* de per√≠odo, tipo de produto e institui√ß√£o.