In [2]:
import polars as pl
import pandas as pd
from datetime import date

### Definición de Rutas

In [3]:
cierre = "202506"
cierre_l = "Jun25"
cierre_d = date(2025,6,30)


path = "E:/Users/jhernandezr/DAR/garantias/reporte/fotos/"
wd_data_raw = path + "data/raw/"
wd_data_external = path + "data/external/"

wd_data_processed_dwh = "E:/Users/jhernandezr/DAR/garantias/data_pipeline_garantias/data/processed/dwh/"
wd_data_validations = "E:/Users/jhernandezr/DAR/garantias/data_pipeline_garantias/data/validations/"
wd_data_processed_curvarecup = "E:/Users/jhernandezr/DAR/garantias/data_pipeline_garantias/data/processed/curva_recup/"
wd_data_processed_fotos = "E:/Users/jhernandezr/DAR/garantias/data_pipeline_garantias/data/processed/fotos/"

# Inputs
fl_pagadas_detalle_vf = wd_data_processed_curvarecup + f"PAGADAS_DETALLE_VF_{cierre}.parquet"
fl_recupera_con_pagos_flujos = wd_data_processed_dwh + f"parquet/Recupera_con_Pagos_Flujos_{cierre}.parquet"
fl_db_dwh_r = wd_data_processed_fotos + f"parquet/DB_DWH_R_{cierre}.parquet"
fl_db_dwh_nr = wd_data_processed_fotos + f"parquet/DB_DWH_NR_{cierre}.parquet"

fl_acumulado_saldos = ""


In [4]:
pagadas_detalle_vf = pl.read_parquet(fl_pagadas_detalle_vf)
recuperadas_global_vf = pl.read_parquet(fl_recupera_con_pagos_flujos)
db_dwh_r = pl.read_parquet(fl_db_dwh_r)
db_dwh_nr = pl.read_parquet(fl_db_dwh_nr)

In [5]:
def une_pagadas(db_dwh, pagadas_detalle_vf):
    df_joined = db_dwh.join(
        pagadas_detalle_vf,
        left_on=["INTERMEDIARIO_ID", "NUMERO_CREDITO"],
        right_on=["Intermediario_Id", "Numero_Credito"],
        how="left"
    )

    monto_expr = (
        pl.coalesce([pl.col("Monto_Desembolso_Mn"), pl.lit(0)]) +
        pl.coalesce([pl.col("Interes_Desembolso_Mn"), pl.lit(0)]) +
        pl.coalesce([pl.col("Interes_Moratorios_Mn"), pl.lit(0)])
    )

    # Añadir columnas calculadas
    df_final = df_joined.with_columns([
        (monto_expr / 1_000_000).alias("MPAGADO (MDP)"),
        (monto_expr > 0).cast(pl.Int8).alias("PAGADAS"),
        (monto_expr > 0).cast(pl.Int8).alias("INCUMPLIDO"),
        pl.coalesce([
            pl.col("Fecha_Garantia_Honrada"),
            pl.date(1899,12,30)
        ]).alias("FECHA_PAGO")
    ])

    return df_final

db_dwh_r_p = une_pagadas(db_dwh_r, pagadas_detalle_vf)
# db_dwh_nr_p = une_pagadas(db_dwh_nr, pagadas_detalle_vf)

In [6]:
def z3_recup(recuperadas_global_vf, db_dwh):
    # === Paso 1: Crear columna Concatenado2 en A ===
    recuperadas_global_vf = recuperadas_global_vf.with_columns(
        (pl.col("Numero_Credito").cast(pl.Utf8) + pl.col("Intermediario_Id").cast(pl.Utf8)).alias("Concatenado")
    )

    # === Paso 2: Crear columna Concatenado2 en B ===
    db_dwh = db_dwh.with_columns(
        (pl.col("NUMERO_CREDITO").cast(pl.Utf8) + pl.col("INTERMEDIARIO_ID").cast(pl.Utf8)).alias("Concatenado2"),
        pl.coalesce([
            pl.col("FECHA_PAGO"),
            pl.date(1899, 12, 30)  # Default fallback
        ]).alias("FECHA_PAGO")
    )

    # === Paso 3: JOIN A y B usando Concatenado ===
    result = recuperadas_global_vf.join(
        db_dwh.select(["Concatenado2", "FECHA_PAGO"]),
        left_on="Concatenado",
        right_on="Concatenado2",
        how="left"
    )

    # === Paso 4: Calcular montos ===
    monto_total_expr = (
        pl.coalesce([pl.col("Monto_Mn"), pl.lit(0)]) +
        pl.coalesce([pl.col("Interes_Mn"), pl.lit(0)]) +
        pl.coalesce([pl.col("Moratorios_Mn"), pl.lit(0)]) +
        pl.coalesce([pl.col("Excedente_Mn"), pl.lit(0)]) -
        pl.coalesce([pl.col("Gastos_Juicio_Mn"), pl.lit(0)])
    ) / 1_000_000

    # === Paso 5: Estatus lógicos ===
    estatus_recup = ["D", "E", "RI", "CR", "RAR", "RAC"]
    estatus_rescat = ["CJ", "CS", "R", "RJ", "RS"]

    # === Paso 6: Agregar columnas finales ===
    result = result.with_columns([
        (pl.col("Fecha") > pl.col("FECHA_PAGO")).cast(pl.Int8).alias("ENTRA_RECUP"),
        pl.when(
            (pl.col("Fecha") > pl.col("FECHA_PAGO")) &
            (pl.col("Estatus").is_in(estatus_recup + estatus_rescat))
        ).then(monto_total_expr).otherwise(0).alias("MONTOTOTAL (MDP)"),
        pl.when(
            (pl.col("Fecha") > pl.col("FECHA_PAGO")) &
            (pl.col("Estatus").is_in(estatus_recup))
        ).then(monto_total_expr).otherwise(0).alias("RECUPERADOS (MDP)"),
        pl.when(
            (pl.col("Fecha") > pl.col("FECHA_PAGO")) &
            (pl.col("Estatus").is_in(estatus_rescat))
        ).then(monto_total_expr).otherwise(0).alias("RESCATADOS (MDP)")
    ])

    return result

z3_recup_cohort = z3_recup(recuperadas_global_vf, db_dwh_r_p)
# z3_recup(recuperadas_global_vf, db_dwh_nr_p)

In [7]:
def paso_2(z3_recup_cohort):
    result = z3_recup_cohort.group_by([
        "Numero_Credito",
        "Intermediario_Id",
        "NR_R",
        "Producto"
    ]).agg(
        pl.sum("MONTOTOTAL (MDP)"),
        pl.sum("RECUPERADOS (MDP)"),
        pl.sum("RESCATADOS (MDP)")
    ).sort([
        "Numero_Credito",
        "Intermediario_Id",
        "NR_R",
        "Producto"
    ])

    return result

vf_recuperadas = paso_2(z3_recup_cohort)

In [17]:
def fecha_or_default(col):
    return pl.when(pl.col(col).is_null()) \
             .then(date(1899, 12, 30)) \
             .otherwise(pl.col(col)) \
             .alias(col + "_LIMPIA")

def foto_saldo(db_dwh):
    # === Aplicar transformaciones ===
    result = db_dwh.with_columns([
        # Campos calculados
        (pl.col("Monto _Credito_Mn") * pl.col("CAMBIO")).alias("MCrédito_MM_UDIS"),
        (pl.col("Monto_Garantizado_Mn") / 1_000_000).alias("MGI (MDP)"),
        (pl.col("Monto _Credito_Mn") / 1_000_000).alias("MONTO CREDITO (MDP)"),
        (pl.col("Saldo_Contingente_Mn") / 1_000_000).alias("SALDO (MDP)"),
        (pl.col("Monto_Garantizado_Mn_Original") / 1_000_000).alias("MGI (MDP) Original"),
        
        # Manejo de NULLs
        pl.col("Plazo Días").fill_null(0).alias("PLAZO_DIAS"),
        pl.col("NUMERO_CREDITO").is_not_null().cast(pl.Int8).alias("NUM_GAR"),

        # Cambios de nombre
        pl.col("Fecha de Apertura").alias("FECHA_VALOR"),
        pl.col("INTERMEDIARIO_ID").alias("INTER_CLAVE"),
        pl.col("Razón Social (Intermediario)").alias("BANCO"),
        pl.col("Nombre_v1").alias("NOMBRE"),
        pl.col("RFC Empresa / Acreditado").alias("RFC"),
        pl.col("NUMERO_CREDITO").alias("CLAVE_CREDITO"),
        pl.col("Fecha Registro Alta").alias("FECHA_REGISTRO_GARANTIA"),
        pl.col("Producto ID").alias("CLAVE_TAXO"),
        pl.col("Producto").alias("TAXONOMIA"),
        pl.col("Porcentaje de Comisión Garantia").alias("Porcentaje_Comision_Garantia"),
        pl.col("VALOR_TASA_INTERES").alias("Tasa_Interes"),
        
        # Clasificación de plazo
        pl.when(pl.col("PLAZO") <= 12).then(1)
        .when(pl.col("PLAZO") <= 24).then(2)
        .when(pl.col("PLAZO") <= 36).then(3)
        .otherwise(4)
        .alias("PLAZO_BUCKET"),

        # Manejo de fechas: si es 0 se vuelve NULL, si no se redondea a primer día del mes
        pl.when(pl.col("Fecha de Apertura") == 0).then(None)
        .otherwise(pl.col("Fecha de Apertura").dt.replace(day=1))
        .alias("FECHA_VALOR1"),
        
        pl.when(pl.col("Fecha Registro Alta") == 0).then(None)
        .otherwise(pl.col("Fecha Registro Alta").dt.replace(day=1))
        .alias("FECHA_REGISTRO1"),
        
        # Fecha de primer incumplimiento con valor default si NULL
        fecha_or_default("FECHA_PRIMER_INCUMPLIMIENTO").alias("FECHA_PRIMER_INCUM"),

        # Fecha de pago también reemplazada si NULL
        fecha_or_default("FECHA_PAGO").alias("FECHA_PAGO_LIMPIA")
    ])

    # === Selección final de columnas ===
    result = result.select([
    "BUCKET", "CAMBIO", "MCrédito_MM_UDIS", "MM_UDIS",
    "INTER_CLAVE", "NOMBRE", "RFC", "TIPO_PERSONA", "CLAVE_CREDITO",
    "FECHA_VALOR", "PLAZO_DIAS", "PLAZO", 
    "test", # FVTO_Riesgosd
    "FECHA_REGISTRO_GARANTIA",
    "MGI (MDP)", "Porcentaje Garantizado", "BANCO", "FECHA_PRIMER_INCUM",
    "MONTO CREDITO (MDP)", "SALDO (MDP)", "TPRO_CLAVE", "CLAVE_TAXO", "TAXONOMIA", "NR_R",
    "FECHA_VALOR1", "FECHA_REGISTRO1", "NUM_GAR", "CSG", "PLAZO_BUCKET", "MPAGADO (MDP)",
    "PAGADAS", "INCUMPLIDO", "FECHA_PAGO_LIMPIA",
    "Programa_Original", "Programa_Id", "ESTRATO_ID", "SECTOR_ID", "ESTADO_ID",
    "Tipo_Credito_Id", "Porcentaje_Comision_Garantia", "TASA_ID", "Tasa_Interes",
    "MGI (MDP) Original", "AGRUPAMIENTO_ID", "ESQUEMA", "SUBESQUEMA", "AGRUPAMIENTO",
    "FONDOS_CONTRAGARANTIA", "CONREC_CLAVE", "Describe_Desrec"
    ])

    return result

vf_pagadas_r = foto_saldo(db_dwh_r_p)

In [18]:
def une_pagos_recuperaciones(vf_pagadas, vf_recuperadas):
    result = vf_pagadas.join(
        vf_recuperadas.select([
            "Intermediario_Id",
            "Numero_Credito",
            "MONTOTOTAL (MDP)", 
            "RECUPERADOS (MDP)", 
            "RESCATADOS (MDP)"
        ]),
        left_on=["INTER_CLAVE", "CLAVE_CREDITO"],
        right_on=["Intermediario_Id", "Numero_Credito"],
        how="left"
    )

    return result

vf_foto_r = une_pagos_recuperaciones(vf_pagadas_r, vf_recuperadas)

In [22]:
def genera_saldos(vf_foto):
    result = (vf_foto
              .filter(pl.col("SALDO (MDP)") > 0)
              .with_columns(
                  pl.col("SALDO (MDP)").alias("SALDO_MDP"),
                 (pl.col("CLAVE_CREDITO") + pl.col("INTER_CLAVE")).alias("CONCATENAR_SALDOS")
              ).select([
                  "BUCKET",
                  "INTER_CLAVE",
                  "CLAVE_CREDITO",
                  "BANCO",
                  "SALDO_MDP",
                  "CONCATENAR_SALDOS"
              ])
              )
    return result

genera_saldos(vf_foto_r)

BUCKET,INTER_CLAVE,CLAVE_CREDITO,BANCO,SALDO_MDP,CONCATENAR_SALDOS
i16,str,str,str,f64,str
2,"""10000144""","""4485750000083730""","""BANCO SANTANDER""",0.0000005,"""448575000008373010000144"""
2,"""10000144""","""3924750000069800""","""BANCO SANTANDER""",0.0000005,"""392475000006980010000144"""
4,"""10000144""","""23750000085670""","""BANCO SANTANDER""",0.0000005,"""2375000008567010000144"""
3,"""10000144""","""7750000075630""","""BANCO SANTANDER""",0.0000005,"""775000007563010000144"""
4,"""10000144""","""5457750000116500""","""BANCO SANTANDER""",0.0000005,"""545775000011650010000144"""
…,…,…,…,…,…
,"""10000420""","""7025216-52/GLOBA""","""BANCA MIFEL""",12.329918,"""7025216-52/GLOBA10000420"""
,"""10000420""","""7026955-50/GLOBA""","""BANCA MIFEL""",4.768597,"""7026955-50/GLOBA10000420"""
,"""10000420""","""7030857-50/GLOBA""","""BANCA MIFEL""",12.956762,"""7030857-50/GLOBA10000420"""
,"""10000420""","""7030855-50/GLOBA""","""BANCA MIFEL""",13.744299,"""7030855-50/GLOBA10000420"""


In [None]:
db_dwh_r_p = une_pagadas(db_dwh_r, pagadas_detalle_vf)
# db_dwh_nr_p = une_pagadas(db_dwh_nr, pagadas_detalle_vf)

z3_recup_cohort = z3_recup(recuperadas_global_vf, db_dwh_r_p)
# z3_recup(recuperadas_global_vf, db_dwh_nr_p)

vf_recuperadas = paso_2(z3_recup_cohort)

vf_pagadas_r = foto_saldo(db_dwh_r_p)

vf_foto_r = une_pagos_recuperaciones(vf_pagadas_r, vf_recuperadas)

genera_saldos(vf_foto_r)
