# Predicción de Captura de Derivaciones Médicas

## Contexto del Problema

Este proyecto busca predecir la probabilidad de que un paciente capture una derivación médica (es decir, que agende y realice una prestación) dentro del sistema de salud.

El modelo tiene como objetivo anticipar el comportamiento del paciente para activar acciones comerciales que aumenten la tasa de captura sin impactar negativamente los ingresos, aplicando descuentos de copago según el riesgo de no captura.


## Dataset y definición de variables

El dataset incluye información de pacientes derivados a prestaciones, con variables demográficas, clínicas y comportamentales.

Se construyeron variables numéricas y categóricas, y se utilizaron técnicas de ingeniería de variables como:
- Indexación de variables categóricas
- Escalamiento de variables numéricas
- Ensamblaje de features


## Modelo de Machine Learning

Se utilizó un modelo Gradient Boosted Trees (`GBTClassifier`) con ajuste de hiperparámetros vía `CrossValidator`.

Para abordar el desbalance entre pacientes capturados y no capturados, se incluyó una columna de pesos (`weightCol`) que penaliza más los errores en pacientes que sí capturaron.

Se evaluó el modelo con métricas:
- Precisión general
- Área bajo curva ROC
- Área bajo curva de precisión (PR)
- F1 Score, Precisión y Recall (con ajuste de umbral)


## Guardado del modelo y MLflow

Se registró el modelo con MLflow y se guardó en Databricks para futuras inferencias. También se registraron:
- Hiperparámetros
- Métricas de evaluación
- Importancia de variables


## Ajuste de umbral y normalización de probabilidad

Se identificó que el mejor umbral para balancear precisión y recall es 0.80.

A partir de este umbral, se normalizó la probabilidad para llevarla a una escala 0–1, facilitando la interpretación comercial.

Esta probabilidad normalizada se usará en un segundo notebook para aplicar reglas de negocio (como copago 0% o 50%).


## 1. Extracción de Datos
Descripción de la fuente, filtros aplicados por fecha, y definición del set de entrenamiento y test.

In [0]:
# ================================
# 📦 Librerías Generales
# ================================
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tempfile
import pickle
import gc
import sys

# ================================
# 📅 Manejo de Fechas
# ================================
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
import dateutil.relativedelta

# ================================
# 📊 Librerías de Modelado (sklearn)
# ================================
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn import tree
from sklearn.metrics import accuracy_score

# ================================
# ⚙️ Configuración de visualización en Pandas
# ================================

# Mostrar los números decimales con 2 cifras
pd.options.display.float_format = '{:.2f}'.format

# Mostrar todas las columnas sin truncar en la salida
pd.set_option('display.max_columns', None)


In [0]:
# Función para calcular la edad a partir de la fecha de nacimiento
def calcular_edad(df, columna_fecha_nacimiento):
    # Convertir la columna de fecha de nacimiento a tipo datetime si no lo está
    df[columna_fecha_nacimiento] = pd.to_datetime(df[columna_fecha_nacimiento])

    # Obtener la fecha actual
    hoy = pd.to_datetime('today')

    # Calcular la edad fila por fila
    df['edad'] = df[columna_fecha_nacimiento].apply(
        lambda fecha: hoy.year - fecha.year - ((hoy.month, hoy.day) < (fecha.month, fecha.day))
    )

    return df

# Función para obtener la fecha actual con un ajuste de días
def FechaActual(d):
    from datetime import datetime
    import pytz
    zona_horaria = pytz.timezone("America/Santiago")  # Cambiar por la zona horaria que necesites  
    return (datetime.now(zona_horaria) + timedelta(days=d))



### 🗂️ BD: Derivación y Captura

In [0]:
#Definir variable fecha proceso
Fecha_presente = FechaActual(0) 
Fecha_presente

In [0]:
# --- SIMULACIÓN PARA PORTAFOLIO: reemplazo de extracción SQL ---
import numpy as np
import pandas as pd
from datetime import timedelta

# 1) Fechas
Fecha_presente = pd.Timestamp.today().normalize()
ffin = (Fecha_presente - timedelta(days=0)).strftime("%Y-%m-%d")      # hoy
finicio = (Fecha_presente - timedelta(days=91)).strftime("%Y-%m-%d")  # 91 días atrás

# 2) Generador de datos sintéticos
def simular_derivacion_captura(
    inicio:str,
    fin:str,
    seed:int=42,
    media_registros_dia:int=120,  # controla volumen
):
    rng = np.random.default_rng(seed)
    fechas = pd.date_range(start=inicio, end=fin, freq="D")

    # Catálogos
    centros = ["CM Norte", "CM Sur", "CM Este", "CM Oeste", "Clínica Central"]
    zonas = ["Zona Centro", "Zona Norte", "Zona Sur", "Zona Oriente", "Zona Poniente"]
    especialidades = ["Resonancia", "Scanner"]  # <- según tu condición
    linea_negocio = ["Imagenología"]
    B8_cat = [
        "Neuro", "Abdomen", "Tórax", "Columna", "Músculo-esquelético",
        "Vascular", "Pélvico", "Otros"
    ]
    aseguradora_denom = ["Seguro_Publico"]  
    prestaciones_res = ["RM Cráneo", "RM Columna", "RM Abdomen", "RM Rodilla"]
    prestaciones_scn = ["TC Tórax", "TC Abdomen", "TC Pelvis", "TC Cerebro"]
    usuarios_ag = ["BOT_WA", "CALL_CENTER", "PORTAL", "PRESENCIAL"]

    # Función para generar un RUT
    def rut_fake():
        body = rng.integers(5_000_000, 25_000_000)
        dv = "K" if rng.random() < 0.05 else str(rng.integers(0,10))
        return f"{body}-{dv}"

    registros = []
    episodio_correlativo = 10_000_000

    for f in fechas:
        # Poisson para variabilidad diaria
        n = max(0, rng.poisson(media_registros_dia))
        if n == 0:
            continue

        # Mezclar especialidad para asignar prestaciones coherentes
        espec = rng.choice(especialidades, size=n, replace=True)
        prest = []
        for e in espec:
            if e == "Resonancia":
                prest.append(rng.choice(prestaciones_res))
            else:
                prest.append(rng.choice(prestaciones_scn))

        # Probabilidad de captura y de tener citación
        captura_flag = rng.binomial(1, 0.38, size=n)  # ~38% capturan
        tiene_cita = rng.binomial(1, 0.55, size=n)    # ~55% tienen cita (indep.)

        # Para cada registro del día
        for i in range(n):
            episodio_correlativo += 1
            precio_base = rng.lognormal(mean=np.log(180000), sigma=0.35)  # CLP aprox
            precio = float(np.clip(precio_base, 60_000, 600_000))

            # Si captura, definimos fecha_cap y centro_cap razonables
            if captura_flag[i] == 1:
                delta_cap = int(rng.integers(0, 15))  # 0–14 días después
                fecha_cap = f + pd.Timedelta(days=delta_cap)
                centro_cap = rng.choice(centros)
                episodio_cap = episodio_correlativo  # podría coincidir
                valor_liquidado = float(np.round(precio * rng.uniform(0.85, 1.1), 0))
            else:
                fecha_cap = pd.NaT
                centro_cap = np.nan
                episodio_cap = np.nan
                valor_liquidado = np.nan

            # Citación PA (puede existir aunque no capture)
            if tiene_cita[i] == 1:
                # citas entre f-2 y f+30 (simula agendamientos previos o futuros)
                delta_cita = int(rng.integers(-2, 31))
                fecha_cita_pa = f + pd.Timedelta(days=delta_cita)
                fecha_creacion_pa = fecha_cita_pa - pd.Timedelta(days=int(rng.integers(0,7)))
                # ~10% anulaciones
                if rng.random() < 0.10:
                    fecha_anulacion_pa = fecha_cita_pa - pd.Timedelta(days=int(rng.integers(0,3)))
                else:
                    fecha_anulacion_pa = pd.NaT
                id_citacion_pa = int(rng.integers(1_000_000, 9_999_999))
                usuario_ag = rng.choice(usuarios_ag)
            else:
                fecha_cita_pa = pd.NaT
                fecha_creacion_pa = pd.NaT
                fecha_anulacion_pa = pd.NaT
                id_citacion_pa = np.nan
                usuario_ag = np.nan

            # Diagnóstico “falso” coherente
            codigo_dx = rng.choice(["R10.4", "M54.5", "R51.9", "K76.9", "R93.7"])
            dx_texto = rng.choice([
                "Dolor abdominal", "Lumbalgia", "Cefalea", "Hallazgo imagenológico",
                "Control postquirúrgico", "Traumatismo reciente"
            ])

            # n° prestación derivada simulado
            num_actual_prest = int(rng.integers(1_000_000, 1_999_999))

            registros.append({
                # --- columnas equivalentes a `campos` (sin corchetes SQL) ---
                "fecha": f.normalize(),
                "episodio": episodio_correlativo,
                "rut": rut_fake(),
                "prestacion_der": prest[i],
                "centro_medico": rng.choice(centros),
                "zona": rng.choice(zonas),
                "linea_negocio": rng.choice(linea_negocio),
                "descr_especialidad": espec[i],
                "B8": rng.choice(B8_cat),
                "paciente": int(rng.integers(100_000, 999_999)),
                "aseguradora": "PUB",                     # código simulado
                "aseguradora_denom": "Seguro_Publico",            # condición cumplida
                "precio_promedio": float(np.round(precio, 0)),
                "derivacion": 1,                          # derivación presente
                "captura": int(captura_flag[i]),
                "episodio_der": episodio_correlativo,     # coherente
                "cod_uo_medica": int(rng.integers(1000, 9999)),
                "especialidad_der": espec[i],
                "cod_uo_medica_cap": int(rng.integers(1000, 9999)) if captura_flag[i] else np.nan,
                "especialidad_cap": espec[i] if captura_flag[i] else np.nan,
                "linea_negocio_cap": "Imagenología" if captura_flag[i] else np.nan,
                "prestacion_cap": prest[i] if captura_flag[i] else np.nan,
                "fecha_cap": fecha_cap,
                "episodio_cap": episodio_cap,
                "centro_medico_cap": centro_cap,
                "valor_liquidado": valor_liquidado,
                "id_citacion_pa": id_citacion_pa,
                "fecha_cita_pa": fecha_cita_pa,
                "fecha_creacion_pa": fecha_creacion_pa,
                "fecha_anulacion_pa": fecha_anulacion_pa,
                "usuario_agendamiento_pa": usuario_ag,
                "codigo_diagnostico": codigo_dx,
                "diagnostico": dx_texto,
                "numero_actual_prestacion_der": num_actual_prest,
            })

    df = pd.DataFrame(registros)

    # Asegurar tipos básicos coherentes
    fecha_cols = ["fecha", "fecha_cap", "fecha_cita_pa", "fecha_creacion_pa", "fecha_anulacion_pa"]
    for c in fecha_cols:
        df[c] = pd.to_datetime(df[c])

    # Reordenar columnas igual que tu lista `campos` (alias ya aplicados)
    orden = [
        "fecha","episodio","rut","prestacion_der","centro_medico","zona",
        "linea_negocio","descr_especialidad","B8","paciente","aseguradora",
        "aseguradora_denom","precio_promedio","derivacion","captura","episodio_der",
        "cod_uo_medica","especialidad_der","cod_uo_medica_cap",
        "especialidad_cap","linea_negocio_cap","prestacion_cap",
        "fecha_cap","episodio_cap","centro_medico_cap","valor_liquidado","id_citacion_pa",
        "fecha_cita_pa","fecha_creacion_pa","fecha_anulacion_pa","usuario_agendamiento_pa",
        "codigo_diagnostico","diagnostico","numero_actual_prestacion_der"
    ]
    return df[orden].copy()

# 3) “Extracción” simulada (reemplaza el bloque extraccion_paralela (placeholder))
derivacion_captura = simular_derivacion_captura(inicio=finicio, fin=ffin, seed=2025, media_registros_dia=110)

# 4) Limpieza similar a la tuya
import gc; gc.collect()

# 5) Transformación a tipo fecha (ya viene convertida, pero mantenemos tu línea)
derivacion_captura['fecha'] = pd.to_datetime(derivacion_captura['fecha'])

# Vistazo rápido
cols_safe = [c for c in derivacion_captura.columns if c not in ["rut","paciente","numero_actual_prestacion_der"]]
derivacion_captura[cols_safe].head(3)




### 🧱 Construcción de la base histórica enriquecida

In [0]:
# Intentar leer los archivos Parquet desde las rutas obtenidas
try:
    # Leer los archivos Parquet en un solo DataFrame
    final_df = spark.read.parquet(*ls_paths)
    print("Datos cargados correctamente")
except Exception as e:
    # Capturar y mostrar errores si ocurren
    print(f"Error al leer los archivos Parquet: {e}")

    
# Aplicar filtros y transformación a PySpark DataFrame
historico_derivacion_captura = final_df.filter(final_df.especialidad_cap.isin('Resonancia', 'Scanner'))

#Filtra Isapre
historico_derivacion_captura = historico_derivacion_captura.filter(historico_derivacion_captura.aseguradora_denom.isin('Seguro_Publico'))

# Convertir a Pandas
historico_derivacion_captura = historico_derivacion_captura.toPandas()

# Eliminar el DataFrame original de PySpark
del(final_df)

In [0]:
#Concatenar historico + 3 meses
derivacion_captura = pd.concat([historico_derivacion_captura, derivacion_captura], axis=0)

# Analisis de fecha
# Formato fecha
derivacion_captura['fecha'] = pd.to_datetime(derivacion_captura['fecha'])

# Rango de fechas
print(derivacion_captura.fecha.min())
print(derivacion_captura.fecha.max())

# Eliminar duplicados
derivacion_captura = derivacion_captura.drop_duplicates()


## 2. Transformación y Limpieza

- **Variables categóricas**: transformadas mediante `StringIndexer`, permitiendo su uso en modelos como GBT.
- **Variables numéricas**: ensambladas con `VectorAssembler` y estandarizadas con `StandardScaler` para mejorar el entrenamiento.
- Se construye un `Pipeline` de transformación reutilizable, aplicado tanto al set de entrenamiento como de prueba.


### 🧍 Información del Paciente

### 🔁 Comportamiento Histórico del Paciente

In [0]:
# Transformaciones
pacientes = calcular_edad(pacientes, 'fecha_nacimiento')

# Mapear género
mapa_genero = {
    '1': 'Hombre',
    '2': 'Mujer',
    '3': 'No indica'
}
pacientes['Genero'] = pacientes['sexo'].map(mapa_genero).fillna('No indica')

# Merge de pacientes con derivacion_captura
derivacion_captura = derivacion_captura.merge(
    pacientes[['paciente', 'rut', 'sexo', 'edad', 'Genero']],
    on='paciente',
    how='left'
)

# Calcular la anticipación en días
derivacion_captura['antelacion'] = (
    pd.to_datetime(derivacion_captura['fecha_cap']) - 
    pd.to_datetime(derivacion_captura['fecha'])
).dt.days

# Extraer mes y año
derivacion_captura['mes'] = pd.to_datetime(derivacion_captura['fecha']).dt.month
derivacion_captura['año'] = pd.to_datetime(derivacion_captura['fecha']).dt.year

# Crear columna 'año-mes'
derivacion_captura['año-mes'] = (
    derivacion_captura['año'].astype(str) + '-' + 
    derivacion_captura['mes'].apply(lambda x: f'{x:02}')
)

# Calcular revenue
derivacion_captura['revenue'] = np.where(
    (derivacion_captura['antelacion'] >= 10) & 
    (derivacion_captura['antelacion'] <= 30) & 
    (pd.to_datetime(derivacion_captura['fecha']) >= '2024-01-01'), 
    1, 
    0
)


In [0]:
# Limpieza de datos
df = derivacion_captura.drop_duplicates()
df['fecha'] = pd.to_datetime(df['fecha'])

# Agrupación por episodio
group_ep = df.groupby(['paciente', 'linea_negocio_cap', 'fecha', 'episodio']).agg({
    'prestacion_der': 'nunique',
    'derivacion': 'last',
    'captura': 'sum',
    'antelacion': 'mean'
}).reset_index()

# Ordenar por paciente y fecha (más reciente primero)
group_ep = group_ep.sort_values(['paciente', 'fecha'], ascending=[True, False])
group_ep['rank'] = group_ep.groupby(['paciente', 'linea_negocio_cap']).cumcount() + 1

# Filtrar últimas 3 visitas (sin contar la más reciente)
last_three = group_ep[(group_ep['rank'] > 1) & (group_ep['rank'] <= 4)].copy()

# Calcular nuevas features que capturen comportamiento errático
last_three['derivacion_no_nula'] = last_three['derivacion'].replace(0, np.nan)
last_three['conversion_rate'] = last_three['captura'] / last_three['derivacion_no_nula']
last_three['conversion_rate'] = last_three['conversion_rate'].fillna(0)
last_three['captura_binaria'] = (last_three['captura'] > 0).astype(int)

# Agregaciones con estadísticas robustas
result2 = last_three.groupby(['paciente', 'linea_negocio_cap']).agg({
    'derivacion': ['sum', lambda x: x.std(ddof=0)],
    'captura': ['sum', lambda x: x.std(ddof=0)],
    'prestacion_der': 'sum',
    'episodio': 'nunique',
    'antelacion': ['median', lambda x: x.std(ddof=0)],
    'conversion_rate': ['median', lambda x: x.std(ddof=0)],
    'captura_binaria': 'mean'
})

# Renombrar columnas
result2.columns = [
    'Q*Derivaciones', 'Derivacion_std',
    'Q*Captura', 'Captura_std',
    'Q*prestacion',
    'Q*episodio',
    'antelacion_prom', 'antelacion_std',
    'conversion_rate_median', 'conversion_rate_std',
    'frecuencia_captura'
]

result2 = result2.reset_index()

# Imputar valores nulos con sentinel (-1) y crear flag de historial
campos_historial = [
    'Q*Derivaciones', 'Derivacion_std', 'Q*Captura', 'Captura_std',
    'Q*prestacion', 'Q*episodio', 'antelacion_prom', 'antelacion_std',
    'conversion_rate_median', 'conversion_rate_std', 'frecuencia_captura'
]

for col in campos_historial:
    result2[col] = result2[col].fillna(-1)

# Flag binario: ¿tiene historial suficiente?
result2['has_historial'] = (result2['Q*Captura'] != -1).astype(int)

# Ordenar
result2 = result2.sort_values(by='Q*prestacion')



In [0]:
# Modelo de datos
base_modelo = (
    derivacion_captura[['fecha', 'episodio', 'zona', 'linea_negocio_cap', 'descr_especialidad',
                        'especialidad_cap', 'paciente', 'sexo', 'edad', 'Genero',
                        'aseguradora_denom', 'año', 'año-mes', 'mes', 'derivacion',
                        'captura', 'antelacion', 'revenue', 'precio_promedio']]
    .merge(result2, on=['paciente', 'linea_negocio_cap'], how='left')
)

## 3. Selección de Variables

Se definieron dos grupos de variables:

- **Categóricas**:
  - sexo, zona_t, descr_especialidad_t, aseguradora_t, especialidad_cap_t
- **Numéricas**:
  - edad, volumen histórico de derivaciones, capturas, prestaciones, episodios
  - estadísticas de antelación, tasas de conversión, historial, revenue

Estas variables reflejan comportamiento pasado del paciente y la naturaleza del servicio.

### 🔤 Codificación de Variables Categóricas

In [0]:
# Dataset
df = base_modelo.copy() 

# Transformación de variables
def t_encoder(var):
    label_encoder = LabelEncoder()
    return label_encoder.fit_transform(var)

# Nuevas variables transformadas
df['zona_t'] = t_encoder(df['zona'])
df['descr_especialidad_t'] = t_encoder(df['descr_especialidad'])
df['aseguradora_t'] = t_encoder(df['aseguradora_denom'])
df['especialidad_cap_t'] = t_encoder(df['especialidad_cap'])

### 📌 Selección de Variables Relevantes

In [0]:
# Seleccionar variables
features = [
    'paciente', 'fecha', 'sexo', 'edad', 'captura', 'antelacion', 'revenue',
    'Q*Derivaciones',
       'Derivacion_std', 'Q*Captura', 'Captura_std', 'Q*prestacion',
       'Q*episodio', 'antelacion_prom', 'antelacion_std',
       'conversion_rate_median', 'conversion_rate_std', 'frecuencia_captura',
       'has_historial', 'zona_t', 'descr_especialidad_t', 'aseguradora_t',
       'especialidad_cap_t'
]

df = df[features]

### ⚠️ Manejo de Valores Nulos

In [0]:
# Variables demográficas
df['sexo'] = df['sexo'].fillna(3)
df['edad'] = df['edad'].fillna(df['edad'].median())

# Variables derivadas de historial (usar sentinel)
campos_historial = [
    'Q*Derivaciones', 'Derivacion_std', 'Q*Captura', 'Captura_std',
    'Q*prestacion', 'Q*episodio', 'antelacion_prom', 'antelacion_std',
    'conversion_rate_median', 'conversion_rate_std', 'frecuencia_captura'
]

for col in campos_historial:
    df[col] = df[col].fillna(-1)

# Flag binario: ¿tiene historial?
df['has_historial'] = (df['Q*Captura'] != -1).astype(int)

# Para `antelacion`: si solo aplica cuando hay captura, rellena con -1
df['antelacion'] = df['antelacion'].fillna(-1)


## 4. Entrenamiento del Modelo

Se utilizó un modelo **Gradient-Boosted Trees (GBTClassifier)** por su capacidad de modelar relaciones no lineales y su buen rendimiento en problemas de clasificación binaria con variables mixtas.

- Se aplicó `CrossValidator` con 3 folds y búsqueda en grilla (`maxDepth` y `maxIter`).
- Se incorporó `weightCol` para **aumentar el peso de la clase positiva**, dada la desbalanceada entre pacientes capturados y no capturados.
- Se entrenó y registró el mejor modelo usando **MLflow**.

In [0]:
#Modelo para revenue
df_model = df

#Eliminar duplicados
df_model = df_model.drop_duplicates()

from pyspark.sql import functions as F

# Convert Pandas DataFrame to Spark DataFrame if necessary
if isinstance(df_model, pd.DataFrame):
    df_model = spark.createDataFrame(df_model)

# Ensure the correct data type for 'fecha'
df_model = df_model.withColumn("fecha", F.to_date(df_model["fecha"], "yyyy-MM-dd"))

### ML: GBTClassifier

In [0]:
import mlflow.spark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from io import StringIO
import matplotlib.pyplot as plt

# Crear sesión Spark
spark = SparkSession.builder.appName("GBT_Modelo").getOrCreate()

# ---------------------------------------------
# 1. Fechas y partición de datos
# ---------------------------------------------
fecha_corte_f = '2025-04-01'
fecha_corte_i = '2022-01-01'

train = df_model.filter((df_model['fecha'] >= fecha_corte_i) & (df_model['fecha'] < fecha_corte_f))
test = df_model.filter(df_model['fecha'] >= fecha_corte_f)

# ---------------------------------------------
# 2. Definición de variables
# ---------------------------------------------
cat_features = [
    'sexo', 'zona_t', 'descr_especialidad_t', 'aseguradora_t', 'especialidad_cap_t'
]

num_features = [
    'edad', 'Q*Derivaciones', 'Derivacion_std', 'Q*Captura', 'Captura_std',
    'Q*prestacion', 'Q*episodio', 'antelacion_prom', 'antelacion_std',
    'conversion_rate_median', 'conversion_rate_std', 'frecuencia_captura',
    'has_historial', 'revenue'
]

# ---------------------------------------------
# 3. Preprocesamiento
# ---------------------------------------------
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid='keep') for col in cat_features]
assembler_num = VectorAssembler(inputCols=num_features, outputCol="num_features_vec")
scaler = StandardScaler(inputCol="num_features_vec", outputCol="scaled_num_features")
final_assembler = VectorAssembler(
    inputCols=[f"{col}_index" for col in cat_features] + ["scaled_num_features"],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + [assembler_num, scaler, final_assembler])
pipeline_model = pipeline.fit(train)

train_data = pipeline_model.transform(train).cache()
test_data = pipeline_model.transform(test).cache()

# ---------------------------------------------
# 4. Agregar pesos a clases (weightCol)
# ---------------------------------------------
train_data = train_data.withColumn("weight", when(col("captura") == 1, 3.0).otherwise(1.0))

train_count = train_data.count()
test_count = test_data.count()

# ---------------------------------------------
# 5. MLflow + Entrenamiento
# ---------------------------------------------
with mlflow.start_run(nested=True):

    mlflow.log_params({
        "fecha_corte_inicio": fecha_corte_i,
        "fecha_corte_fin": fecha_corte_f,
        "num_train_samples": train_count,
        "num_test_samples": test_count,
        "scaler": "StandardScaler",
        "model_type": "GBTClassifier",
        "weightCol": "yes",
        "positive_class_weight": 3.0
    })

    gbt = GBTClassifier(
        labelCol="captura",
        featuresCol="features",
        seed=42,
        maxIter=50,
        maxBins=100,
        weightCol="weight"
    )

    paramGrid = ParamGridBuilder() \
        .addGrid(gbt.maxDepth, [4, 6, 8]) \
        .addGrid(gbt.maxIter, [30, 50]) \
        .build()

    evaluator_pr = BinaryClassificationEvaluator(labelCol="captura", metricName="areaUnderPR")

    crossval = CrossValidator(
        estimator=gbt,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator_pr,
        numFolds=3
    )

    cvModel = crossval.fit(train_data)
    predictions = cvModel.transform(test_data)

    pr_auc = evaluator_pr.evaluate(predictions)
    roc_auc = BinaryClassificationEvaluator(labelCol="captura", metricName="areaUnderROC").evaluate(predictions)

    mlflow.log_metric("areaUnderPR", pr_auc)
    mlflow.log_metric("areaUnderROC", roc_auc)

    print(f"Área bajo curva PR: {pr_auc}")
    print(f"Área bajo curva ROC: {roc_auc}")

from pyspark.ml.functions import vector_to_array
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Convertir columna probability a vector accesible
predictions = predictions.withColumn("prob_array", vector_to_array("probability"))

# ---------------------------------------------
# 6. Buscar mejor umbral por F1
# ---------------------------------------------
def buscar_mejor_umbral(predictions, label_col="captura", prob_col="prob_array", metric="f1", plot=True):
    thresholds = [x / 100 for x in range(10, 90, 5)]
    scores = []
    evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction_temp", metricName=metric)

    for t in thresholds:
        temp_pred = predictions.withColumn(
            "prediction_temp", when(col(prob_col)[1] >= t, 1.0).otherwise(0.0)
        )
        score = evaluator.evaluate(temp_pred)
        scores.append(score)

    best_idx = scores.index(max(scores))
    best_threshold = thresholds[best_idx]
    best_score = scores[best_idx]

    if plot:
        plt.figure(figsize=(8, 4))
        plt.plot(thresholds, scores, marker="o")
        plt.title(f"{metric.upper()} vs Threshold")
        plt.xlabel("Threshold")
        plt.ylabel(metric.upper())
        plt.grid(True)
        plt.show()

    return {
        "mejor_umbral": best_threshold,
        f"mejor_{metric}": best_score,
        "todos_los_resultados": list(zip(thresholds, scores))
    }

# Buscar umbral óptimo por F1
resultado = buscar_mejor_umbral(predictions, metric="f1", plot=True)
mejor_umbral = resultado["mejor_umbral"]

mlflow.log_metric("mejor_umbral_f1", mejor_umbral)
mlflow.log_metric("mejor_f1", resultado["mejor_f1"])

print(f"✅ Mejor umbral F1: {mejor_umbral:.2f} - F1: {resultado['mejor_f1']:.4f}")

# Aplicar predicción con umbral óptimo
predictions = predictions.withColumn(
    "prediction_custom",
    when(col("prob_array")[1] >= mejor_umbral, 1.0).otherwise(0.0)
)

# ---------------------------------------------
# 7. Evaluar con Recall, Precision y F1
# ---------------------------------------------
print("📊 Métricas de evaluación con umbral ajustado:")

for metric in ["f1", "precisionByLabel", "recallByLabel"]:
    evaluator = MulticlassClassificationEvaluator(
        labelCol="captura",
        predictionCol="prediction_custom",
        metricName=metric
    )
    value = evaluator.evaluate(predictions)
    print(f"{metric}: {value:.4f}")
    mlflow.log_metric(f"{metric}_custom_threshold", value)

# ---------------------------------------------
# 8. Matriz de confusión
# ---------------------------------------------
print(f"📊 Matriz de confusión con umbral ajustado = {mejor_umbral}")
predictions.groupBy("captura", "prediction_custom").count().orderBy("captura", "prediction_custom").show()

# -----------------------------------------------
# 9. Guardar Modelo
# -----------------------------------------------
import time, json
from pyspark.ml import PipelineModel
from mlflow.exceptions import MlflowException

# A) Ensamblar un modelo final end-to-end: preproceso + bestModel
final_model = PipelineModel(stages=pipeline_model.stages + [cvModel.bestModel])

# B) Paths de guardado en DBFS (uno fijo y otro versionado por timestamp)
BASE_PATH = "dbfs:/Modelos/model_gbt_fonasa"
VERSION_PATH = f"{BASE_PATH}/v={int(time.time())}"

 # Guardar modelo
best_mlp_model = cvModel.bestModel
mlflow.spark.log_model(best_mlp_model, "dbfs:/Modelos/model_gbt_fonasa/best_model")
best_mlp_model.write().overwrite().save("dbfs:/Modelos/model_gbt_fonas")

# Guardar en DBFS
final_model.write().overwrite().save(BASE_PATH)
final_model.write().overwrite().save(VERSION_PATH)

# C) Asegurar una run activa para loguear a MLflow (por si el 'with ...' anterior ya terminó)
active_run = mlflow.active_run()
if active_run is None:
    mlflow.start_run(nested=True)

# Loguear metadatos útiles
mlflow.log_param("decision_threshold", float(mejor_umbral))
mlflow.log_dict(
    {
        "fecha_corte_inicio": fecha_corte_i,
        "fecha_corte_fin": fecha_corte_f,
        "cat_features": cat_features,
        "num_features": num_features,
        "weightCol": "weight",
        "positive_class_weight": 3.0,
        "dbfs_save_path_latest": BASE_PATH,
        "dbfs_save_path_versioned": VERSION_PATH
    },
    "model_metadata.json"
)

# (Opcional) imprimir confirmación
print("✅ Modelo guardado en:")
print(f"   - Última versión: {BASE_PATH}")
print(f"   - Versionado:     {VERSION_PATH}")
print("✅ Modelo logueado en MLflow (artefacto 'model').")



## 5. Evaluación del Modelo

El modelo fue evaluado con las siguientes métricas:

- **Área bajo curva PR (Precision-Recall)**: 0.712
- **Área bajo curva ROC**: 0.769
- Se ajustó el umbral de decisión usando F1-score, resultando en:
  - **Umbral óptimo**: 0.80
  - **F1-score**: 0.761
  - **Precisión clase positiva**: 0.78
  - **Recall clase positiva**: 0.94

In [0]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, when

# 1. Convertir probabilidad a array
predictions = predictions.withColumn("prob_array", vector_to_array("probability"))

# 2. Definir umbral óptimo (puedes cambiarlo si encontraste otro mejor)
umbral_optimo = 0.80

# 3. Crear predicción customizada (opcional, para métricas)
predictions = predictions.withColumn(
    "prediction_custom",
    when(col("prob_array")[1] >= umbral_optimo, 1.0).otherwise(0.0)
)

# 4. Normalizar la probabilidad de captura
predictions = predictions.withColumn(
    "prob_normalizada",
    (col("prob_array")[1] / umbral_optimo).cast("double")
)


## 6. Aplicación y Lógica de Negocio

Las predicciones del modelo se utilizan como insumo para diseñar acciones diferenciadas según el nivel de probabilidad estimada.

Para los casos con **baja probabilidad de conversión**, se aplican **incentivos más atractivos** con el fin de aumentar la motivación.

Para los casos con **probabilidad intermedia**, se ofrece un **estímulo moderado**, buscando equilibrar costo y beneficio.

En los casos con **alta probabilidad**, se **evita otorgar beneficios** innecesarios, ya que es probable que el cliente concrete la acción de todas formas.

De esta manera, se logra **maximizar la efectividad de las acciones comerciales**, concentrando los recursos donde son más necesarios y reduciendo el costo de incentivos superfluos.


## 7. Conclusiones

El modelo entrega una predicción robusta de la probabilidad de que un paciente se atienda, lo cual permite implementar estrategias de pricing inteligente con descuentos personalizados.

Su rendimiento permite impactar directamente la **rentabilidad de la operación**, enfocando recursos comerciales en los pacientes de menor probabilidad de captura sin sobreincentivar al resto.