# Scoring Batch Pipeline - Credit Risk FPD

**Story**: HD-3.2 / Fase 4.3 — Deploy
**Objetivo**: Carregar modelo do MLflow Registry e aplicar scoring sobre novas SAFRAs
**Output**: `Gold.feature_store.clientes_scores` (particionado por SAFRA)

**Uso**: Parametrizar `SCORING_SAFRAS` e executar todas as celulas sequencialmente.

In [None]:
import sys
sys.path.insert(0, "/lakehouse/default/Files/projeto-final")

import logging
import json
from datetime import datetime

import numpy as np
import pandas as pd
import pandas.api.types as ptypes
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

from config.pipeline_config import (
    PATH_FEATURE_STORE, GOLD_BASE, EXPERIMENT_NAME,
    TARGET_COLUMNS,
    SPARK_BROADCAST_THRESHOLD, SPARK_SHUFFLE_PARTITIONS, SPARK_AQE_ENABLED,
)

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
logger = logging.getLogger("scoring_batch")

SCORE_SCALE = 1000  # Credit score range: 0-1000

logger.info("Imports OK")

In [None]:
# =============================================================================
# PARAMETROS DE SCORING — ajustar antes de executar
# =============================================================================

# Modelo a usar (nome registrado no MLflow Model Registry)
MODEL_NAME = "credit-risk-fpd-lgbm_baseline"
MODEL_STAGE = "Production"  # ou "Staging" para testes

# SAFRAs para scoring (lista de int YYYYMM)
SCORING_SAFRAS = [202502, 202503]

# Validar formato SAFRA
for safra in SCORING_SAFRAS:
    if not isinstance(safra, int):
        raise ValueError(f"SAFRA deve ser int, got {type(safra)}: {safra}")
    y, m = divmod(safra, 100)
    if not (1 <= m <= 12):
        raise ValueError(f"SAFRA invalida {safra}: mes {m} fora de 1-12")

# Output path
SCHEMA_SCORES = "feature_store"
TABLE_SCORES = "clientes_scores"
PATH_SCORES = f"{GOLD_BASE}/Tables/{SCHEMA_SCORES}/{TABLE_SCORES}"

# Faixas de risco (quintis)
N_FAIXAS = 5

logger.info("Modelo: %s (%s)", MODEL_NAME, MODEL_STAGE)
logger.info("SAFRAs para scoring: %s", SCORING_SAFRAS)
logger.info("Output: %s", PATH_SCORES)

In [None]:
# =============================================================================
# SPARK CONFIG
# =============================================================================
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(SPARK_BROADCAST_THRESHOLD))
spark.conf.set("spark.sql.adaptive.enabled", str(SPARK_AQE_ENABLED).lower())
spark.conf.set("spark.sql.shuffle.partitions", str(SPARK_SHUFFLE_PARTITIONS))
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

logger.info("Spark config OK (AQE=%s, shuffle=%d)", SPARK_AQE_ENABLED, SPARK_SHUFFLE_PARTITIONS)

In [None]:
# =============================================================================
# CARREGAR MODELO DO MLFLOW REGISTRY
# =============================================================================
client = MlflowClient()

# Buscar versao do modelo no stage especificado
model_versions = client.get_latest_versions(MODEL_NAME, stages=[MODEL_STAGE])
if not model_versions:
    raise RuntimeError(
        f"Nenhuma versao encontrada para '{MODEL_NAME}' no stage '{MODEL_STAGE}'. "
        f"Registre o modelo primeiro via export_model.py"
    )

mv = model_versions[0]
logger.info("Modelo encontrado: %s v%s (run_id=%s)", mv.name, mv.version, mv.run_id)

# Carregar modelo como pyfunc (funciona com sklearn e lightgbm)
model_uri = f"models:/{MODEL_NAME}/{MODEL_STAGE}"
model = mlflow.pyfunc.load_model(model_uri)
logger.info("Modelo carregado: %s", model_uri)

# Recuperar lista de features do run original
run = client.get_run(mv.run_id)
artifacts_path = client.download_artifacts(mv.run_id, "")

# Tentar carregar metadata JSON (gerado pelo export_model.py)
import glob
metadata_files = glob.glob(f"{artifacts_path}/*metadata*.json")
if metadata_files:
    with open(metadata_files[0]) as f:
        model_metadata = json.load(f)
    FEATURE_NAMES = model_metadata["feature_names"]
    logger.info("Features carregadas do metadata: %d features", len(FEATURE_NAMES))
else:
    logger.warning("Metadata JSON nao encontrado — usando features do run param")
    n_features = int(run.data.params.get("n_features", 0))
    raise RuntimeError(
        "Feature names nao disponiveis. Execute export_model.py primeiro para gerar metadata."
    )

print(f"\nModelo: {MODEL_NAME} v{mv.version}")
print(f"Features: {len(FEATURE_NAMES)}")
print(f"Primeiras 10: {FEATURE_NAMES[:10]}")

In [None]:
# =============================================================================
# CARREGAR FEATURE STORE (C1: excluir targets, H2: usar F.col, H5: schema check)
# =============================================================================
df_feature_store = spark.read.format("delta").load(PATH_FEATURE_STORE)

# Validar tipo da coluna SAFRA
safra_type = dict(df_feature_store.dtypes).get("SAFRA")
if safra_type not in ("int", "bigint"):
    logger.warning("SAFRA type is '%s' (expected int) — cast may be needed", safra_type)

# Filtrar SAFRAs usando API parametrizada (H2)
df_feature_store = df_feature_store.filter(F.col("SAFRA").isin(SCORING_SAFRAS))

total_records = df_feature_store.count()
logger.info("Feature store carregada: %d registros para SAFRAs %s", total_records, SCORING_SAFRAS)

if total_records == 0:
    raise RuntimeError(f"Nenhum registro encontrado para SAFRAs {SCORING_SAFRAS} no feature store")

# C1: Verificar e excluir target columns que possam existir no feature store
leaked = [c for c in df_feature_store.columns if c in TARGET_COLUMNS]
if leaked:
    logger.warning("Target columns encontradas no feature store — excluindo: %s", leaked)
leaked_in_features = [f for f in FEATURE_NAMES if f in TARGET_COLUMNS]
if leaked_in_features:
    raise RuntimeError(f"FEATURE_NAMES contem target columns (leakage!): {leaked_in_features}")

# Verificar que todas as features existem
available_cols = set(df_feature_store.columns)
missing_features = [f for f in FEATURE_NAMES if f not in available_cols]
if missing_features:
    logger.error("Missing features (%d): %s", len(missing_features), missing_features)
    raise RuntimeError(f"{len(missing_features)} features ausentes no feature store")
logger.info("Todas as %d features encontradas no feature store", len(FEATURE_NAMES))

# Volumetria por SAFRA
df_feature_store.groupBy("SAFRA").count().orderBy("SAFRA").show()

In [None]:
# =============================================================================
# SCORING — por SAFRA (H3: output validation, M1: ptypes, M3: qcut fallback)
# =============================================================================
all_scores = []

for safra in SCORING_SAFRAS:
    logger.info("Scoring SAFRA %d...", safra)

    # Filtrar SAFRA usando API parametrizada
    df_safra = df_feature_store.filter(F.col("SAFRA") == safra)

    # Selecionar chaves + features
    # H-NEW-2: .copy() para evitar SettingWithCopyWarning ao modificar com fillna
    df_keys = df_safra.select("NUM_CPF", "SAFRA").toPandas().copy()
    df_X = df_safra.select(FEATURE_NAMES).toPandas().copy()

    # M1: Tratar missing usando pandas type API
    for col in df_X.columns:
        if ptypes.is_numeric_dtype(df_X[col]):
            df_X[col] = df_X[col].fillna(0)
        else:
            df_X[col] = df_X[col].fillna("MISSING")

    # H3: Predizer com validacao de output
    try:
        if hasattr(model, '_model_impl'):
            inner = model._model_impl
            if hasattr(inner, 'predict_proba'):
                raw_scores = inner.predict_proba(df_X)
                if raw_scores.ndim == 2 and raw_scores.shape[1] == 2:
                    scores = raw_scores[:, 1]
                else:
                    raise RuntimeError(f"predict_proba shape inesperado: {raw_scores.shape}")
            else:
                scores = inner.predict(df_X)
        else:
            scores = model.predict(df_X)
    except Exception as e:
        logger.error("Predicao falhou para SAFRA %d: %s", safra, e)
        logger.error("X shape: %s", df_X.shape)
        raise

    scores = np.asarray(scores, dtype=float)
    if len(scores) != len(df_X):
        raise RuntimeError(f"Score count mismatch: {len(scores)} != {len(df_X)}")
    if not np.all(np.isfinite(scores)):
        n_invalid = (~np.isfinite(scores)).sum()
        logger.warning("  %d scores nao-finitos — clipping to [0, 1]", n_invalid)
        scores = np.clip(np.nan_to_num(scores, nan=0.5), 0, 1)
    if np.any((scores < 0) | (scores > 1)):
        logger.warning("  Scores fora de [0, 1] — clipping")
        scores = np.clip(scores, 0, 1)

    logger.info("  Scores range: [%.4f, %.4f]", scores.min(), scores.max())

    # Montar DataFrame de saida
    df_result = df_keys.copy()
    df_result["SCORE_PROB"] = scores

    # Score invertido (menor = melhor, padrao mercado credito)
    df_result["SCORE"] = (SCORE_SCALE * (1 - df_result["SCORE_PROB"])).round(0).astype(int)

    # M3: Faixa de risco por quintil com fallback
    try:
        df_result["FAIXA_RISCO"] = pd.qcut(
            df_result["SCORE_PROB"],
            q=N_FAIXAS,
            labels=list(range(1, N_FAIXAS + 1)),
            duplicates="drop"
        ).astype(int)
        n_bins = df_result["FAIXA_RISCO"].nunique()
        if n_bins < N_FAIXAS:
            logger.warning("  SAFRA %d: qcut produced %d bins (expected %d)", safra, n_bins, N_FAIXAS)
    except ValueError:
        logger.warning("  SAFRA %d: qcut failed — using rank-based binning", safra)
        df_result["FAIXA_RISCO"] = pd.cut(
            df_result["SCORE_PROB"].rank(pct=True),
            bins=N_FAIXAS,
            labels=list(range(1, N_FAIXAS + 1))
        ).astype(int)

    df_result["MODEL_NAME"] = MODEL_NAME
    df_result["MODEL_VERSION"] = str(mv.version)
    df_result["DT_SCORING"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    all_scores.append(df_result)
    logger.info("  SAFRA %d: %d registros scored (score medio=%.0f)",
                safra, len(df_result), df_result["SCORE"].mean())

# Consolidar
df_all_scores = pd.concat(all_scores, ignore_index=True)
logger.info("Total scored: %d registros", len(df_all_scores))

logger.info("Distribuicao de scores:")
logger.info("\n%s", df_all_scores["SCORE"].describe())
logger.info("Distribuicao por faixa de risco:")
logger.info("\n%s", df_all_scores.groupby(["SAFRA", "FAIXA_RISCO"]).size().unstack(fill_value=0))

In [None]:
# =============================================================================
# SALVAR SCORES (H4: single dynamic write, M5: enhanced validation)
# =============================================================================
schema = StructType([
    StructField("NUM_CPF", StringType(), True),
    StructField("SAFRA", IntegerType(), True),
    StructField("SCORE_PROB", DoubleType(), True),
    StructField("SCORE", IntegerType(), True),
    StructField("FAIXA_RISCO", IntegerType(), True),
    StructField("MODEL_NAME", StringType(), True),
    StructField("MODEL_VERSION", StringType(), True),
    StructField("DT_SCORING", StringType(), True),
])

df_spark_scores = spark.createDataFrame(df_all_scores, schema=schema)

# H4: Single write com dynamic partition overwrite (idempotente, sem race condition)
df_spark_scores.write.format("delta") \
    .mode("overwrite") \
    .option("partitionOverwriteMode", "dynamic") \
    .partitionBy("SAFRA") \
    .save(PATH_SCORES)
logger.info("Scores escritos em %s", PATH_SCORES)

# M5: Validacao pos-escrita aprimorada
df_check = spark.read.format("delta").load(PATH_SCORES)

# Check 1: Count por SAFRA
logger.info("Validacao pos-escrita:")
df_check.groupBy("SAFRA").count().orderBy("SAFRA").show()

# Check 2: Todas as SAFRAs presentes
written_safras = set(r[0] for r in df_check.select("SAFRA").distinct().collect())
expected_safras = set(SCORING_SAFRAS)
missing_safras = expected_safras - written_safras
if missing_safras:
    raise RuntimeError(f"SAFRAs nao escritas: {missing_safras}")

# Check 3: Duplicatas
dup_count = df_check.filter(F.col("SAFRA").isin(SCORING_SAFRAS)) \
    .groupBy("NUM_CPF", "SAFRA").count().filter("count > 1").count()
if dup_count > 0:
    raise RuntimeError(f"Encontradas {dup_count} duplicatas (NUM_CPF, SAFRA)")

# Check 4: Score ranges validos
invalid_scores = df_check.filter(F.col("SAFRA").isin(SCORING_SAFRAS)).filter(
    (F.col("SCORE") < 0) | (F.col("SCORE") > SCORE_SCALE) |
    (F.col("SCORE_PROB") < 0) | (F.col("SCORE_PROB") > 1)
).count()
if invalid_scores > 0:
    logger.warning("Encontrados %d registros com scores invalidos", invalid_scores)

total = df_check.filter(F.col("SAFRA").isin(SCORING_SAFRAS)).count()
logger.info("Validacao OK: %d registros, %d colunas, 0 duplicatas", total, len(df_check.columns))

In [None]:
# =============================================================================
# LOG DE SCORING NO MLFLOW
# =============================================================================
mlflow.set_experiment(EXPERIMENT_NAME)

with mlflow.start_run(run_name=f"scoring_batch_{datetime.now().strftime('%Y%m%d_%H%M')}"):
    mlflow.log_param("model_name", MODEL_NAME)
    mlflow.log_param("model_version", mv.version)
    mlflow.log_param("model_stage", MODEL_STAGE)
    mlflow.log_param("scoring_safras", str(SCORING_SAFRAS))
    mlflow.log_param("n_features", len(FEATURE_NAMES))
    mlflow.log_param("output_path", PATH_SCORES)

    mlflow.log_metric("total_records_scored", len(df_all_scores))
    mlflow.log_metric("score_mean", float(df_all_scores["SCORE"].mean()))
    mlflow.log_metric("score_std", float(df_all_scores["SCORE"].std()))
    mlflow.log_metric("score_prob_mean", float(df_all_scores["SCORE_PROB"].mean()))

    for safra in SCORING_SAFRAS:
        mask = df_all_scores["SAFRA"] == safra
        mlflow.log_metric(f"records_safra_{safra}", int(mask.sum()))
        mlflow.log_metric(f"score_mean_safra_{safra}", float(df_all_scores.loc[mask, "SCORE"].mean()))

    # Salvar distribuicao como artefato CSV
    dist_path = "/tmp/scoring_distribution.csv"
    df_all_scores.groupby(["SAFRA", "FAIXA_RISCO"]).agg(
        count=("NUM_CPF", "count"),
        score_mean=("SCORE", "mean"),
        score_prob_mean=("SCORE_PROB", "mean"),
    ).reset_index().to_csv(dist_path, index=False)
    mlflow.log_artifact(dist_path)

    run_id = mlflow.active_run().info.run_id
    logger.info("MLflow scoring run: %s", run_id)

print(f"\nScoring batch concluido com sucesso!")
print(f"MLflow Run ID: {run_id}")
print(f"Output: {PATH_SCORES}")

## Resumo

| Item | Valor |
|------|-------|
| Modelo | `credit-risk-fpd-lgbm_baseline` |
| Stage | Production |
| Output | `Gold.feature_store.clientes_scores` |
| Particionado por | SAFRA |
| Colunas output | NUM_CPF, SAFRA, SCORE_PROB, SCORE, FAIXA_RISCO, MODEL_NAME, MODEL_VERSION, DT_SCORING |

**Proximos passos**: Executar `validacao_deploy.py` para confirmar que metricas do scoring == metricas da avaliacao.