# Silver — Limpieza y Enriquecimiento

Propósito: Limpiar duplicados e inconsistencias, imputar nulos, marcar outliers y generar las tablas Silver (limpios y rechazados).

Rol: transformar y curar los datos desde Bronze para producir datasets confiables y documentados que alimenten la capa Gold.

In [0]:
# Importar librerias
import traceback

from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col,
    trim,
    upper,
    when,
    lit,
    concat_ws,
    md5,
    dense_rank,
    monotonically_increasing_id,
    percentile_approx,
    mean as spark_mean,
    row_number,
    current_timestamp,
)
from pyspark.sql.types import StringType
from pyspark.sql.window import Window


In [0]:
# Cargar Bronze
df_bronze = spark.table("workspace.credit_risk.bronze_credit_risk")

In [0]:
df_silver = (
    df_bronze
    .select(
        F.col("edad").cast("int"),
        F.col("ingreso_anual").cast("double"),
        F.col("tipo_vivienda").alias("tipo_vivienda"),
        F.col("anios_empleo").cast("double"),
        F.col("proposito"),
        F.col("calificacion"),
        F.col("monto").cast("double"),
        F.col("tasa_interes").cast("double"),
        F.col("estado_pago").cast("int"),
        F.col("pct_ingreso").cast("double"),
        F.col("historial_default"),
        F.col("anios_hist_credito").cast("double"),
        F.col("fecha_ingesta")
    )
)


In [0]:
# 1. IDENTIFICAR Y SEPARAR DUPLICADOS

# Marcar duplicados (mantenemos el primero)
window_spec = Window.partitionBy(
    "edad", "ingreso_anual", "tipo_vivienda", "anios_empleo", 
    "proposito", "calificacion", "monto", "tasa_interes", 
    "estado_pago", "pct_ingreso", "historial_default", "anios_hist_credito"
).orderBy("fecha_ingesta")

df_con_marca = df_silver.withColumn("fila_num", row_number().over(window_spec))

# Separar duplicados
df_duplicados = df_con_marca.filter(col("fila_num") > 1).withColumn("razon_rechazo", lit("Duplicado"))
df_sin_duplicados = df_con_marca.filter(col("fila_num") == 1).drop("fila_num")

print(f"Duplicados identificados: {df_duplicados.count()}")

In [0]:
# 2. IDENTIFICAR Y SEPARAR REGISTROS INCONSISTENTES

df_inconsistentes = df_sin_duplicados.filter(
    (col("edad") < 18) | (col("edad") > 100) |
    (col("anios_empleo") < 0) | (col("anios_empleo") > 50)
).withColumn("razon_rechazo", 
    when((col("edad") < 18) | (col("edad") > 100), "Edad fuera de rango")
    .when((col("anios_empleo") < 0) | (col("anios_empleo") > 50), "Anios empleo inválidos")
)

df_consistentes = df_sin_duplicados.filter(
    (col("edad") >= 18) & (col("edad") <= 100) &
    (col("anios_empleo") >= 0) & (col("anios_empleo") <= 50)
)

print(f"Registros inconsistentes identificados: {df_inconsistentes.count()}")

In [0]:
# Tabla de rechazados (duplicados + inconsistentes)
df_duplicados_clean = df_duplicados.drop("fila_num")
df_inconsistentes_clean = df_inconsistentes  # Ya no tiene fila_num
df_rechazados = df_duplicados_clean.union(df_inconsistentes_clean)\
    .withColumn("fecha_rechazo", current_timestamp())

In [0]:
# 3. LIMPIAR COLUMNAS STRING

df_strings_limpios = (df_consistentes
    .withColumn("tipo_vivienda", upper(trim(col("tipo_vivienda"))))
    .withColumn("proposito", upper(trim(col("proposito"))))
    .withColumn("calificacion", upper(trim(col("calificacion"))))
    .withColumn("historial_default", upper(trim(col("historial_default"))))
)

# 4. IMPUTAR NULOS

# Para años_empleo: imputar con mediana
mediana_empleo = df_strings_limpios.approxQuantile("anios_empleo", [0.5], 0.01)[0]

# Para tasa_interes: imputar con mediana por calificación
medianas_tasa = df_strings_limpios.groupBy("calificacion").agg(
    spark_mean("tasa_interes").alias("tasa_media")
)

df_con_imputacion = (df_strings_limpios
    .withColumn("anios_empleo", 
        when(col("anios_empleo").isNull(), mediana_empleo).otherwise(col("anios_empleo"))
    )
    .join(medianas_tasa, "calificacion", "left")
    .withColumn("tasa_interes",
        when(col("tasa_interes").isNull(), col("tasa_media")).otherwise(col("tasa_interes"))
    )
    .drop("tasa_media")
)

print(f"Nulos imputados - Años empleo: mediana={mediana_empleo}, Tasa interés: por calificación")

In [0]:
# 4.5. GENERAR ID ÚNICO

# ID ÚNICO DE REGISTRO de cliente: Cada fila es única
df_con_ids = df_con_imputacion.withColumn(
    "id_cliente",
    monotonically_increasing_id()
)

print(f" ID generado:")
print(f"   - id_cliente: basado en registros únicos")

# Verificar cuántos clientes únicos hay
total_registros = df_con_ids.count()
print(f"\n Total de registros: {total_registros:,}")

In [0]:
from pyspark.sql.functions import percentile_approx

columnas_outliers = [
    "edad",
    "ingreso_anual",
    "anios_empleo",
    "monto",
    "pct_ingreso",
    "anios_hist_credito",
]

# Calcular Q1, Q3 para cada columna
limites = {}
for col_name in columnas_outliers:
    stats = df_con_ids.select(
        percentile_approx(col_name, 0.25).alias("q1"),
        percentile_approx(col_name, 0.75).alias("q3"),
    ).collect()[0]

    q1 = stats["q1"]
    q3 = stats["q3"]
    iqr = q3 - q1
    limites[col_name] = {
        "lower": q1 - 1.7 * iqr,
        "upper": q3 + 1.7 * iqr,
    }
    print(f"{col_name}: Q1={q1:.2f}, Q3={q3:.2f}, IQR={iqr:.2f}")


In [0]:
# 6. IDENTIFICAR OUTLIERS CON FLAG

# Crear condición para marcar outliers
condicion_outlier = None
for col_name, bounds in limites.items():
    condicion = (col(col_name) < bounds["lower"]) | (col(col_name) > bounds["upper"])
    condicion_outlier = condicion if condicion_outlier is None else (condicion_outlier | condicion)

# Agregar columna flag de outlier
df_con_flag = df_con_ids.withColumn(
    "es_outlier", 
    when(condicion_outlier, 1).otherwise(0)
)

count_outliers = df_con_flag.filter(col("es_outlier") == 1).count()
count_normales = df_con_flag.filter(col("es_outlier") == 0).count()

print(f"\n Outliers identificados: {count_outliers:,} ({round(count_outliers/df_con_flag.count()*100, 2)}%)")
print(f" Registros normales: {count_normales:,} ({round(count_normales/df_con_flag.count()*100, 2)}%)")

In [0]:
try:
    # 4. GUARDAR EN TABLAS SILVER

    # Tabla de rechazados (duplicados + inconsistentes)
    df_rechazados.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("workspace.credit_risk.silver_registros_rechazados")


    # Tabla principal: datos limpios con flag de outliers
    df_con_flag.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("workspace.credit_risk.silver_credit_risk_limpio")

except Exception as e:
    # Tipo de error
    error_type = type(e).__name__
    # Descripcion de error
    error_summary = str(e)
    # Trazar el error
    error_trace = traceback.format_exc()

    # Error completo
    error_msg_full = "f{error_type}: {error_sumamary}/n{error_trace}"

    if len(error_msg_full) > 500:
        error_msg = error_msg_full[:500] + "\n[...]Error Truncado[...]"
    else:
        error_msg = error_msg_full

    dbutils.jobs.taskValues.set(key="error", value=error_msg)
    raise e
