## APLICACIÓN DE MODELOS DE REGLOG Y KMEANS PARA PREDICCIÓN DE RIESGO Y CLUSTERING

#### NOTEBOOK: Aplicar_Modelos

![Secuencia de Modelos ML](../Imagenes/Secuencia%20de%20Modelos%20ML.png)

Flujo de ejecución automatizada de modelos de machine learning en MS Fabric

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.clustering import KMeansModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, when, count, isnan, isnull
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, DoubleType
import requests
import os
from datetime import datetime
import time

# Verificar y obtener ruta del lakehouse
lakehouse_paths = [
    "Tables",  # Ruta relativa
    "/lakehouse/default/Tables",  # Ruta absoluta
    "abfss://lakehouse@onelake.dfs.fabric.microsoft.com/Tables"  # URL ABFS
]

def test_lakehouse_access(path):
    try:
        test_path = f"{path}/test_aplicacion"
        test_df = spark.createDataFrame([(1,)], ["test"])
        test_df.write.mode("overwrite").format("delta").save(test_path)
        print(f"Escritura exitosa en {test_path}")
        return True, test_path
    except Exception as e:
        print(f"Error al escribir en {test_path}: {str(e)}")
        return False, None

lakehouse_path = None
for path in lakehouse_paths:
    success, test_path = test_lakehouse_access(path)
    if success:
        lakehouse_path = path
        break

if not lakehouse_path:
    raise Exception("No se pudo acceder a ninguna ruta del lakehouse. Verifica los permisos y la configuración.")

print(f"Usando ruta del lakehouse: {lakehouse_path}")

# Función para registrar logs
def log_reentrenamiento(step_name, status, metrics=None, error_message=None):
    log_schema = StructType([
        StructField("fecha_ejecucion", TimestampType(), False),
        StructField("paso", StringType(), False),
        StructField("estado", StringType(), False),
        StructField("error_mensaje", StringType(), True),
        StructField("metricas", StringType(), True)
    ])
    log_df = spark.createDataFrame([(
        datetime.now(),
        step_name,
        status,
        error_message,
        str(metrics) if metrics else None
    )], schema=log_schema)
    try:
        log_df.write.mode("append").format("delta").save(f"{lakehouse_path}/reentrenamiento_log")
        print(f"Log registrado para {step_name}: {status}")
    except Exception as e:
        print(f"Error al registrar log: {e}")

# Función para notificaciones Discord
discord_webhook_url = os.getenv('DISCORD_WEBHOOK_URL', "https://discord.com/api/webhooks/1362094514688360518/NZQcmWfVB3Isx1hbSC1_g2R-gDWmhUwaszrWZA21zHCpKcco2BHFHhOSf_tOzvzi11_p")

def crear_mensaje(step_name, status, metrics=None, error_message=None):
    color = 3066993 if status == "Éxito" else 15158332
    description = f"Aplicación de {step_name} completada con estado: {status}"
    if error_message:
        description += f"\nError: {error_message}"
    
    embed = {
        "title": f"{'✅' if status == 'Éxito' else '❌'} Aplicación de Modelos - {step_name}",
        "description": description,
        "color": color,
        "footer": {"text": "RiskApp - Sistema de Monitoreo"},
        "timestamp": datetime.now().isoformat()
    }
    
    if metrics:
        embed["fields"] = [
            {"name": key, "value": f"{value:.4f}", "inline": True}
            for key, value in metrics.items()
        ]
    
    return {"embeds": [embed]}

def enviar_notificacion(mensaje):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.post(
                discord_webhook_url, 
                json=mensaje,
                timeout=10  # Agregar timeout
            )
            response.raise_for_status()
            print(f"Notificación enviada para {mensaje['embeds'][0]['title']}")
            return
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:  # Si es el último intento
                print(f"Error al enviar notificación después de {max_retries} intentos: {str(e)}")
            else:
                print(f"Intento {attempt + 1} fallido, reintentando...")
                time.sleep(1)  # Esperar 1 segundo antes de reintentar

# Cargar y preparar datos
try:
    df = spark.read.format("delta").load(f"{lakehouse_path}/solicitudes_processed")
    total_records = df.count()
    
    # Verificar valores nulos por tipo de dato
    null_counts = {}
    for col_name, dtype in df.dtypes:
        if dtype in ("double", "float"):
            null_count = df.filter(col(col_name).isNull() | isnan(col(col_name))).count()
        else:
            null_count = df.filter(col(col_name).isNull()).count()
        null_counts[col_name] = null_count
    
    print(f"Datos cargados. Total de registros: {total_records}")
    print("\nAnálisis de valores nulos:")
    for col_name, null_count in null_counts.items():
        null_percentage = (null_count / total_records) * 100
        print(f"{col_name}: {null_count} nulos ({null_percentage:.2f}%)")
    
    # Imputar valores nulos con la mediana
    numeric_cols = ["edad", "ingresos_anuales", "puntaje_crediticio", "deuda_actual", "antiguedad_laboral"]
    for col_name in numeric_cols:
        median_value = df.approxQuantile(col_name, [0.5], 0.01)[0]
        df = df.fillna(median_value, subset=[col_name])
    
    # Rellenar valores nulos en columnas codificadas
    encoded_cols = ["historial_pagos_encoded", "estado_civil_encoded", "tipo_empleo_encoded"]
    df = df.fillna(0, subset=encoded_cols)
    
    log_reentrenamiento("Carga_datos", "Éxito", metrics={"total_records": total_records})
    mensaje = crear_mensaje("Carga_datos", "Éxito", metrics={"total_records": float(total_records)})
    enviar_notificacion(mensaje)
except Exception as e:
    error_msg = str(e)
    log_reentrenamiento("Carga_datos", "Error", error_message=error_msg)
    mensaje = crear_mensaje("Carga_datos", "Error", error_message=error_msg)
    enviar_notificacion(mensaje)
    raise Exception(f"Error al cargar datos: {error_msg}")

# Aplicar clustering
try:
    print("Cargando modelo K-means...")
    kmeans_model = KMeansModel.load(f"{lakehouse_path}/Models/KMeansClustering")
    
    # Preparar características para clustering
    feature_cols = ["edad", "ingresos_anuales", "puntaje_crediticio", "deuda_actual",
                   "antiguedad_laboral", "historial_pagos_encoded", "estado_civil_encoded",
                   "tipo_empleo_encoded", "numero_dependientes"]
    
    # Configurar preprocesamiento
    assembler = VectorAssembler(
        inputCols=feature_cols,
        outputCol="features",
        handleInvalid="skip"
    )
    
    scaler = StandardScaler(
        inputCol="features",
        outputCol="scaled_features",
        withStd=True,
        withMean=True
    )
    
    print("Aplicando transformación de clustering...")
    # Aplicar preprocesamiento y clustering
    assembled_df = assembler.transform(df)
    scaled_df = scaler.fit(assembled_df).transform(assembled_df)
    clustered_df = kmeans_model.transform(scaled_df)
    
    # Analizar resultados
    cluster_distribution = clustered_df.groupBy("prediction").count().toPandas()
    print("\nDistribución de clusters:")
    print(cluster_distribution)
    
    # Guardar resultados - Corregido para usar id_cliente
    result_df = clustered_df.select(
        "id_cliente",  # Cambiado de id_solicitud a id_cliente
        "estado_solicitud",
        col("prediction").alias("cluster")
    )
    
    result_df.write.mode("overwrite").format("delta").save(f"{lakehouse_path}/solicitudes_clustered")
    
    metrics = {
        "total_clustered": float(result_df.count()),
        "num_clusters": float(cluster_distribution.shape[0])
    }
    
    # Mejorar manejo de notificaciones
    try:
        log_reentrenamiento("Clustering", "Éxito", metrics=metrics)
        mensaje = crear_mensaje("Clustering", "Éxito", metrics=metrics)
        enviar_notificacion(mensaje)
    except Exception as notif_error:
        print(f"Error al enviar notificación: {str(notif_error)}")
        # Continuar con el proceso aunque falle la notificación
        
except Exception as e:
    error_msg = str(e)
    try:
        log_reentrenamiento("Clustering", "Error", error_message=error_msg)
        mensaje = crear_mensaje("Clustering", "Error", error_message=error_msg)
        enviar_notificacion(mensaje)
    except Exception as notif_error:
        print(f"Error al enviar notificación de error: {str(notif_error)}")
    raise Exception(f"Error al aplicar clustering: {error_msg}")

# Aplicar clasificación
try:
    print("Cargando modelo de clasificación...")
    clf_model = PipelineModel.load(f"{lakehouse_path}/Models/ClasificadorCredito")
    print("Filtrando solicitudes pendientes...")
    pending_df = df.filter(col("solicitud_credito").isNull())
    pending_count = pending_df.count()
    
    if pending_count > 0:
        print(f"Procesando {pending_count} solicitudes pendientes...")
        predictions = clf_model.transform(pending_df)
        
        # Analizar predicciones
        pred_distribution = predictions.groupBy("prediction").count().toPandas()
        print("\nDistribución de predicciones:")
        print(pred_distribution)
        
        # Preparar resultados finales
        from pyspark.sql.functions import udf, avg
        from pyspark.sql.types import DoubleType
        
        # UDF para extraer el score de aprobación
        def get_probability_class_1(prob_vector):
            return float(prob_vector.values[1]) if len(prob_vector.values) > 1 else 0.0
        
        get_prob_udf = udf(get_probability_class_1, DoubleType())
        
        final_predictions = predictions.select(
            "id_cliente",
            "estado_solicitud",
            col("prediction").cast("double"),
            "probability",
            when(col("prediction") == 1.0, "Aprobado")
            .otherwise("Rechazado").alias("prediccion_aprobado"),
            get_prob_udf("probability").alias("score_aprobacion")
        )
        
        final_predictions.write.mode("overwrite").format("delta").save(f"{lakehouse_path}/predicciones_pendientes")
        
        # Calcular métricas usando funciones de agregación de PySpark
        avg_score = final_predictions.agg(avg("score_aprobacion").alias("avg_score")).collect()[0]["avg_score"]
        
        metrics = {
            "total_pendientes": float(pending_count),
            "aprobados_predichos": float(pred_distribution[pred_distribution["prediction"] == 1.0]["count"].iloc[0]),
            "score_promedio": float(avg_score)
        }
        
        log_reentrenamiento("Clasificación", "Éxito", metrics=metrics)
        mensaje = crear_mensaje("Clasificación", "Éxito", metrics=metrics)
    else:
        mensaje = crear_mensaje("Clasificación", "Éxito", error_message="No hay solicitudes pendientes")
        log_reentrenamiento("Clasificación", "Éxito", error_message="No hay solicitudes pendientes")
    
    enviar_notificacion(mensaje)
except Exception as e:
    error_msg = str(e)
    log_reentrenamiento("Clasificación", "Error", error_message=error_msg)
    mensaje = crear_mensaje("Clasificación", "Error", error_message=error_msg)
    enviar_notificacion(mensaje)
    raise Exception(f"Error al aplicar clasificación: {error_msg}")

# Notificación final
try:
    mensaje = crear_mensaje(
        "Proceso Completo",
        "Éxito",
        metrics={
            "total_registros": float(total_records),
            "solicitudes_pendientes": float(pending_count if 'pending_count' in locals() else 0)
        }
    )
    enviar_notificacion(mensaje)
except Exception as e:
    print(f"Error al enviar notificación final: {str(e)}")

## Respuesta de aplicación automatizada de modelos en MS Fabric

![Respuesta de Aplicacion de Modelos](../Imagenes/Respuesta%20de%20Aplicacion%20de%20Modelos.png)