# SIMBAD Landing → Bronze Pipeline

**Propósito**: Convierte datos CSV de SIMBAD desde landing a formato Parquet en bronze layer

**Funcionalidades**:
- Detecta automáticamente el último directorio dt=YYYY-MM-DD
- Manejo robusto de encoding (UTF-8 fallback a ISO-8859-1)
- Esquema estable con tipos de datos correctos
- Particionado inteligente por año/mes
- Trazabilidad completa (archivo origen, dt_captura)

**Input**: `gs://bucket/lakehouse/landing/simbad/simbad_carteras_aayp_hipotecarios/dt=*/`
**Output**: `gs://bucket/lakehouse/bronze/simbad/simbad_carteras_aayp_hipotecarios/anio=*/mes=*/`

In [None]:
# ==============================================
# SIMBAD landing -> bronze (schema estable)
# ==============================================
from pyspark.sql import functions as F
from datetime import datetime

BUCKET = "dae-integrador-2025"
LANDING = f"gs://{BUCKET}/lakehouse/landing/simbad/simbad_carteras_aayp_hipotecarios"
BRONZE_BASE = f"gs://{BUCKET}/lakehouse/bronze/simbad/simbad_carteras_aayp_hipotecarios"

print(f"🔄 Iniciando SIMBAD Landing → Bronze")
print(f"📥 Source: {LANDING}")
print(f"📤 Target: {BRONZE_BASE}")

In [None]:
# --- utilidades para dt ---
def _list_subdirs(gcs_dir: str):
    """Lista subdirectorios usando Hadoop FileSystem"""
    jsc = sc._jsc
    hconf = jsc.hadoopConfiguration()
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    fs = FileSystem.get(Path(gcs_dir).toUri(), hconf)
    return [st.getPath().getName() for st in fs.listStatus(Path(gcs_dir)) if st.isDirectory()]

def _pick_latest_dt_dir(base_dir: str):
    """Encuentra el directorio dt= más reciente"""
    subdirs = _list_subdirs(base_dir)
    dt_dirs = [d for d in subdirs if d.startswith("dt=")]
    if not dt_dirs:
        raise RuntimeError(f"No se encontraron directorios dt= en {base_dir}")
    latest = max(dt_dirs, key=lambda d: datetime.strptime(d.split("=")[1], "%Y-%m-%d"))
    return latest, latest.split("=")[1]

# --- lee CSV del mayor dt (con fallback de encoding) ---
def _read_csv(path_glob: str):
    """Lee CSV con manejo robusto de encoding"""
    try:
        df = (spark.read
              .option("header", "true")
              .option("mode", "PERMISSIVE")
              .option("multiLine", "false")
              .option("inferSchema", "false")     # todo como STRING
              .option("encoding", "UTF-8")
              .csv(path_glob))
        print("✅ Lectura exitosa con UTF-8")
    except Exception as e:
        print(f"⚠️ UTF-8 falló, probando ISO-8859-1: {str(e)}")
        df = (spark.read
              .option("header", "true")
              .option("mode", "PERMISSIVE")
              .option("multiLine", "false")
              .option("inferSchema", "false")
              .option("encoding", "ISO-8859-1")
              .csv(path_glob))
        print("✅ Lectura exitosa con ISO-8859-1")
    return df.select("*")

In [None]:
# --- columnas esperadas (en camelCase) ---
EXPECTED = [
    "periodo","tipoCredito","tipoEntidad","entidad","sectorEconomico","region","provincia",
    "moneda","tipoCartera","actividad","sector","persona","facilidad","residencia",
    "administracionYPropiedad","genero","tipoCliente","clasificacionEntidad",
    "cantidadPlasticos","cantidadCredito","deuda","tasaPorDeuda","deudaCapital",
    "deudaVencida","deudaVencidaDe31A90Dias","valorDesembolso","valorGarantia",
    "valorProvisionCapitalYRendimiento","__periodo"
]

# --- ingest ---
dt_dir, dt_str = _pick_latest_dt_dir(LANDING)
csv_glob = f"{LANDING}/{dt_dir}/*.csv"
print(f"📁 Último directorio encontrado: {dt_dir}")
print(f"📄 Leyendo: {csv_glob}")

df_raw = _read_csv(csv_glob)
print(f"📊 Filas raw: {df_raw.count():,}")
print(f"📋 Columnas raw: {len(df_raw.columns)}")

In [None]:
# Asegura presencia/orden de columnas
sel = [ (F.col(c) if c in df_raw.columns else F.lit(None).alias(c)) for c in EXPECTED ]
df = df_raw.select(*sel)

print("✅ Esquema normalizado a columnas esperadas")
df.printSchema()

In [None]:
# Normaliza y TIPOS FIJOS (un solo select)
df_cast = df.select(
    # Campos string (con trim)
    F.trim(F.col("periodo")).alias("periodo"),
    F.trim(F.col("tipoCredito")).alias("tipoCredito"),
    F.trim(F.col("tipoEntidad")).alias("tipoEntidad"),
    F.trim(F.col("entidad")).alias("entidad"),
    F.trim(F.col("sectorEconomico")).alias("sectorEconomico"),
    F.trim(F.col("region")).alias("region"),
    F.trim(F.col("provincia")).alias("provincia"),
    F.trim(F.col("moneda")).alias("moneda"),
    F.trim(F.col("tipoCartera")).alias("tipoCartera"),
    F.trim(F.col("actividad")).alias("actividad"),
    F.trim(F.col("sector")).alias("sector"),
    F.trim(F.col("persona")).alias("persona"),
    F.trim(F.col("facilidad")).alias("facilidad"),
    F.trim(F.col("residencia")).alias("residencia"),
    F.trim(F.col("administracionYPropiedad")).alias("administracionYPropiedad"),
    F.trim(F.col("genero")).alias("genero"),
    F.trim(F.col("tipoCliente")).alias("tipoCliente"),
    F.trim(F.col("clasificacionEntidad")).alias("clasificacionEntidad"),

    # Campos numéricos
    F.col("cantidadPlasticos").cast("int").alias("cantidadPlasticos"),
    F.col("cantidadCredito").cast("int").alias("cantidadCredito"),
    F.col("deuda").cast("double").alias("deuda"),
    F.col("tasaPorDeuda").cast("double").alias("tasaPorDeuda"),
    F.col("deudaCapital").cast("double").alias("deudaCapital"),
    F.col("deudaVencida").cast("double").alias("deudaVencida"),
    F.col("deudaVencidaDe31A90Dias").cast("double").alias("deudaVencidaDe31A90Dias"),
    F.col("valorDesembolso").cast("double").alias("valorDesembolso"),
    F.col("valorGarantia").cast("double").alias("valorGarantia"),
    F.col("valorProvisionCapitalYRendimiento").cast("double").alias("valorProvisionCapitalYRendimiento"),

    F.col("__periodo").alias("__periodo"),

    # derivados y trazabilidad
    F.to_date(F.col("periodo"), "yyyy-MM").alias("periodo_date"),
    F.year(F.to_date(F.col("periodo"), "yyyy-MM")).alias("anio"),
    F.month(F.to_date(F.col("periodo"), "yyyy-MM")).alias("mes"),
    F.lit(dt_str).alias("dt_captura"),
    F.input_file_name().alias("archivo_origen")
)

print("✅ Tipos de datos aplicados y columnas derivadas creadas")
print(f"📊 Filas procesadas: {df_cast.count():,}")

In [None]:
# Muestra sample de datos
print("📋 Sample de datos procesados:")
df_cast.select("periodo", "tipoEntidad", "entidad", "deuda", "anio", "mes", "dt_captura").show(5)

In [None]:
# --- escritura por año (carpeta) y partición por mes ---
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

anios = [r.anio for r in df_cast.select("anio").distinct().collect()]
anios_sorted = sorted(anios)

print(f"📅 Años a procesar: {anios_sorted}")

for y in anios_sorted:
    out = f"{BRONZE_BASE}/anio={y}"
    filas_anio = df_cast.filter(F.col("anio")==y).count()
    
    print(f"⏳ Escribiendo año {y} ({filas_anio:,} filas) → {out}")
    
    (df_cast.filter(F.col("anio")==y)
            .write.mode("overwrite")
            .partitionBy("mes")
            .parquet(out))
    
    print(f"✅ {y} completado")

print("🎉 ¡SIMBAD Bronze ingestion completada exitosamente!")