# Modelado — Próximo pedido **DIGITAL** (v3.1, 100% PySpark + Backtesting & Ensembles)

**Objetivo:** predecir si el **próximo pedido** de un cliente será **DIGITAL**.

**Fecha de generación del cuaderno:** 2025-09-17

## 1) Sesión Spark y parámetros

- Ejecuta local con todos los cores.
- Ajusta `DATA_DIR` si tu dataset está en otra ruta.
- `TEST_START_YM` define el **corte temporal** principal.
- `BACKTEST_SPLITS` permite evaluar **varios cortes** (opcional).
- **Memoria**: subimos `spark.driver.memory` y activamos Kryo para evitar OOM en árboles.

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T, Window
from pyspark import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer, VectorIndexer, FeatureHasher
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.functions import vector_to_array

# Crea/reinicia sesión
try:
    spark.stop()
except Exception:
    pass

spark = (
    SparkSession.builder
    .appName("modelado-proximo-pedido-digital-v3.1")
    .master("local[*]")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.driver.memory", "12g")              # subir si tu máquina lo permite (8–16g)
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryoserializer.buffer", "32m")
    .config("spark.kryoserializer.buffer.max", "512m")
    .config("spark.sql.warehouse.dir", "./spark-warehouse")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setCheckpointDir("/tmp/spark_chk")

DATA_DIR = "dataset/dataset"      # <--- Ajusta si tu ruta cambia
TEST_START_YM = "2024-01"         # Entrena con ym < TEST_START_YM | Evalúa con ym >= TEST_START_YM
BACKTEST_SPLITS = ["2023-08", "2023-10", "2023-12", "2024-01"]  # Opcional

print("Spark version:", spark.version)

## 2) Carga y preparación mínima

- Derivamos `month_first`, `ym` y `is_digital`.
- Mantenemos **nombres simples** y tipos adecuados.

In [None]:
df = spark.read.parquet(DATA_DIR)

expected = [
    "cliente_id","pais_cd","region_comercial_txt","agencia_id","ruta_id",
    "tipo_cliente_cd","madurez_digital_cd","estrellas_txt","frecuencia_visitas_cd",
    "fecha_pedido_dt","canal_pedido_cd","facturacion_usd_val",
    "materiales_distintos_val","cajas_fisicas"
]
present = [c for c in expected if c in df.columns]
print("Columnas presentes:", present)

df = (df
    .withColumn("month_first", F.trunc("fecha_pedido_dt", "month"))
    .withColumn("ym", F.date_format("month_first", "yyyy-MM"))
    .withColumn("is_digital", F.when(F.col("canal_pedido_cd")=="DIGITAL", 1).otherwise(0))
)

df.select("cliente_id","fecha_pedido_dt","ym","canal_pedido_cd","is_digital").show(5, truncate=False)

## 3) Etiqueta por **cliente-mes** (determinista, sin fuga)

- Orden por `fecha_pedido_dt` (y *tie-break* con hash de todas las columnas).
- Para cada mes de un cliente, tomamos el **último pedido** y definimos `label` como si el **próximo pedido** del cliente es DIGITAL.
- Agregamos `recency_days_last` (días desde el pedido previo).

In [None]:
all_cols = df.columns
w_client_order = Window.partitionBy("cliente_id").orderBy(F.col("fecha_pedido_dt").asc(),
                                                          F.hash(*[F.col(c) for c in all_cols]).asc())
w_client_month_desc = Window.partitionBy("cliente_id","month_first").orderBy(F.col("fecha_pedido_dt").desc(),
                                                                             F.hash(*[F.col(c) for c in all_cols]).desc())

orders = (df
    .withColumn("prev_dt", F.lag("fecha_pedido_dt").over(w_client_order))
    .withColumn("next_canal", F.lead("canal_pedido_cd").over(w_client_order))
    .withColumn("next_is_digital", F.when(F.col("next_canal")=="DIGITAL", 1).otherwise(0))
    .withColumn("recency_days", F.datediff(F.col("fecha_pedido_dt"), F.col("prev_dt")))
    .withColumn("rn_month_desc", F.row_number().over(w_client_month_desc))
)

last_in_month = (orders
    .filter(F.col("rn_month_desc")==1)
    .select("cliente_id","month_first","ym",
            F.col("recency_days").alias("recency_days_last"),
            F.col("next_is_digital").alias("label"))
)

last_in_month.show(5, truncate=False)

## 4) Feature engineering (RFM + rolling + ciclo de vida + priors de segmento)

**Señales por cliente-mes**:
- Volumen/valor: `n_orders`, `sum_fact`, `avg_fact`, `sum_cajas`, `avg_cajas`, `avg_mat_dist`.
- Comportamiento: `digital_ratio` del mes, `lag1_digital_ratio`, **rolling 3m** y `growth_digital_ratio`.
- Ciclo de vida: `months_since_first` (meses desde primer pedido).
- Priors de segmento (históricos, con **lag**): `region_digital_ratio_lag1`, `tipo_digital_ratio_lag1`.

In [None]:
# Agregados cliente-mes
monthly_agg = (df.groupBy("cliente_id","month_first","ym")
    .agg(
        F.count("*").alias("n_orders"),
        F.avg("is_digital").alias("digital_ratio"),
        F.sum(F.col("facturacion_usd_val").cast("double")).alias("sum_fact"),
        F.avg(F.col("facturacion_usd_val").cast("double")).alias("avg_fact"),
        F.sum(F.col("cajas_fisicas").cast("double")).alias("sum_cajas"),
        F.avg(F.col("cajas_fisicas").cast("double")).alias("avg_cajas"),
        F.avg(F.col("materiales_distintos_val").cast("double")).alias("avg_mat_dist"),
        F.first("tipo_cliente_cd", ignorenulls=True).alias("tipo_cliente_cd"),
        F.first("madurez_digital_cd", ignorenulls=True).alias("madurez_digital_cd"),
        F.first("frecuencia_visitas_cd", ignorenulls=True).alias("frecuencia_visitas_cd"),
        F.first("pais_cd", ignorenulls=True).alias("pais_cd"),
        F.first("region_comercial_txt", ignorenulls=True).alias("region_comercial_txt")
    )
)

# Ciclo de vida
w_client_month = Window.partitionBy("cliente_id").orderBy(F.col("month_first").asc())
first_month = (monthly_agg
               .withColumn("first_month", F.first("month_first", ignorenulls=True).over(w_client_month))
               .select("cliente_id","first_month").distinct())
monthly_agg = (monthly_agg
               .join(first_month, on="cliente_id", how="left")
               .withColumn("months_since_first", F.floor(F.months_between("month_first", "first_month"))))

# Ratios por segmento + lag
region_month = (df.groupBy("region_comercial_txt","month_first")
                  .agg(F.avg("is_digital").alias("region_digital_ratio")))
region_month = region_month.withColumn("ym", F.date_format("month_first", "yyyy-MM"))
w_region = Window.partitionBy("region_comercial_txt").orderBy(F.col("month_first").asc())
region_month = (region_month
                .withColumn("region_digital_ratio_lag1", F.lag("region_digital_ratio", 1).over(w_region))
                .select("region_comercial_txt","ym","region_digital_ratio_lag1"))

tipo_month = (df.groupBy("tipo_cliente_cd","month_first")
                .agg(F.avg("is_digital").alias("tipo_digital_ratio")))
tipo_month = tipo_month.withColumn("ym", F.date_format("month_first", "yyyy-MM"))
w_tipo = Window.partitionBy("tipo_cliente_cd").orderBy(F.col("month_first").asc())
tipo_month = (tipo_month
              .withColumn("tipo_digital_ratio_lag1", F.lag("tipo_digital_ratio", 1).over(w_tipo))
              .select("tipo_cliente_cd","ym","tipo_digital_ratio_lag1"))

# Dataset final
w_roll3 = w_client_month.rowsBetween(-3, -1)  # 3 meses previos
ds = (monthly_agg
    .join(last_in_month, on=["cliente_id","month_first","ym"], how="left")
    .withColumn("lag1_digital_ratio", F.lag("digital_ratio", 1).over(w_client_month))
    .withColumn("n_orders_3m", F.sum("n_orders").over(w_roll3))
    .withColumn("digital_ratio_3m", F.avg("digital_ratio").over(w_roll3))
    .withColumn("sum_fact_3m", F.sum("sum_fact").over(w_roll3))
    .withColumn("growth_digital_ratio", F.col("digital_ratio") - F.col("lag1_digital_ratio"))
    .join(region_month, on=["region_comercial_txt","ym"], how="left")
    .join(tipo_month, on=["tipo_cliente_cd","ym"], how="left")
    .filter(F.col("label").isNotNull())
)

ds.select("cliente_id","ym","n_orders","digital_ratio","lag1_digital_ratio",
          "n_orders_3m","digital_ratio_3m","growth_digital_ratio",
          "months_since_first","region_digital_ratio_lag1","tipo_digital_ratio_lag1",
          "label").show(5, truncate=False)

## 5) Split temporal y **balance de clases**

- Train: `ym < TEST_START_YM`.  
- Test:  `ym >= TEST_START_YM`.
- Mostramos la **distribución de la etiqueta**.

In [None]:
train = ds.filter(F.col("ym") < F.lit(TEST_START_YM))
test  = ds.filter(F.col("ym") >= F.lit(TEST_START_YM))

print("Train rows:", train.count(), " | Test rows:", test.count())
print("\nDistribución de la etiqueta:")
for name, d in [("train", train), ("test", test)]:
    print(f"--- {name} ---")
    d.groupBy("label").count().orderBy("label").show()

# Cachear para acelerar tuning
train = train.repartition(200).persist(StorageLevel.MEMORY_AND_DISK)
_ = train.count()

## 6) Pipelines de modelado (LR vs Árboles)

- **LR**: imputación + *One-Hot* solo para **baja cardinalidad**.
- **RF/GBT**: imputación + **solo indexación** (sin OHE) + `VectorIndexer` (reduce bins); usamos **priors** para `region_comercial_txt`.

In [None]:
# Columnas numéricas base
num_cols = ["n_orders","digital_ratio","lag1_digital_ratio","sum_fact","avg_fact",
            "sum_cajas","avg_cajas","avg_mat_dist","recency_days_last",
            "n_orders_3m","digital_ratio_3m","sum_fact_3m","growth_digital_ratio",
            "months_since_first","region_digital_ratio_lag1","tipo_digital_ratio_lag1"]

# Categóricas de baja cardinalidad (ajusta según tu data real)
cat_low  = ["madurez_digital_cd","frecuencia_visitas_cd","pais_cd"]
# `tipo_cliente_cd` suele ser manejable; si fuera muy alta, quítala de LR también
cat_lr   = cat_low + ["tipo_cliente_cd"]

# Para árboles evitamos OHE
cat_tree = cat_low + ["tipo_cliente_cd"]

# Imputación común
imputer = Imputer(inputCols=num_cols, outputCols=[c + "_imp" for c in num_cols])

# --- Pipeline base para LR ---
idx_lr = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in cat_lr]
ohe_lr = OneHotEncoder(inputCols=[c + "_idx" for c in cat_lr],
                       outputCols=[c + "_oh"  for c in cat_lr])
feats_lr = [c + "_imp" for c in num_cols] + [c + "_oh" for c in cat_lr]
asm_lr   = VectorAssembler(inputCols=feats_lr, outputCol="features_lr")
pipe_lr_base = Pipeline(stages=[imputer] + idx_lr + [ohe_lr, asm_lr])

# --- Pipeline base para Árboles (sin OHE) ---
idx_tree = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in cat_tree]
feats_tree = [c + "_imp" for c in num_cols] + [c + "_idx" for c in cat_tree]
asm_tree   = VectorAssembler(inputCols=feats_tree, outputCol="features_raw")
vindex     = VectorIndexer(inputCol="features_raw", outputCol="features", maxCategories=64, handleInvalid="keep")
pipe_tree_base = Pipeline(stages=[imputer] + idx_tree + [asm_tree, vindex])

## 7) Constructores + grids compactos y **balanceo**

Usamos `TrainValidationSplit` con grids moderados para evitar OOM.

In [None]:
from typing import Tuple

e_pr = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

# Modelos
lr = LogisticRegression(featuresCol="features_lr", labelCol="label",
                        weightCol="weight", maxIter=80, regParam=0.01, elasticNetParam=0.0)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", weightCol="weight",
                            numTrees=100, maxDepth=8, maxBins=64,
                            subsamplingRate=0.7, featureSubsetStrategy="auto",
                            cacheNodeIds=True, seed=42)
gbt = GBTClassifier(featuresCol="features", labelCol="label",
                    maxIter=80, maxDepth=6, maxBins=64,
                    stepSize=0.05, subsamplingRate=0.7, seed=42)

# Pipelines completos
pipe_lr  = Pipeline(stages=pipe_lr_base.getStages() + [lr])
pipe_rf  = Pipeline(stages=pipe_tree_base.getStages() + [rf])
pipe_gbt = Pipeline(stages=pipe_tree_base.getStages() + [gbt])

# Grids
grid_lr  = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.0, 0.01, 0.1])
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
            .build())
grid_rf  = (ParamGridBuilder()
            .addGrid(rf.numTrees, [80, 120])
            .addGrid(rf.maxDepth, [6, 8])
            .build())
grid_gbt = (ParamGridBuilder()
            .addGrid(gbt.maxIter, [60, 80])
            .addGrid(gbt.maxDepth, [5, 6])
            .build())

tvs_lr  = TrainValidationSplit(estimator=pipe_lr,  estimatorParamMaps=grid_lr,  evaluator=e_pr, trainRatio=0.8, parallelism=1)
tvs_rf  = TrainValidationSplit(estimator=pipe_rf,  estimatorParamMaps=grid_rf,  evaluator=e_pr, trainRatio=0.8, parallelism=1)
tvs_gbt = TrainValidationSplit(estimator=pipe_gbt, estimatorParamMaps=grid_gbt, evaluator=e_pr, trainRatio=0.8, parallelism=1)

def fit_model(name: str, train_df) -> Tuple:
    # balanceo simple (positivos con weight > 1 si desbalance)
    pos = train_df.filter(F.col("label")==1).count()
    neg = train_df.filter(F.col("label")==0).count()
    ratio = (neg / float(max(pos,1))) if pos else 1.0
    train_w = train_df.withColumn("weight", F.when(F.col("label")==1, F.lit(ratio)).otherwise(F.lit(1.0)))

    if name == "lr":
        return tvs_lr.fit(train_w)
    elif name == "rf":
        return tvs_rf.fit(train_w)
    elif name == "gbt":
        # Nota: en algunas versiones GBT no soporta weightCol; usamos dataset sin weight
        return tvs_gbt.fit(train_df)
    else:
        raise ValueError("Modelo no soportado: " + name)

## 8) Evaluación: AUC ROC / PR + matriz de confusión y métricas derivadas

In [None]:
def evaluate_model(model, test_df, model_name="model"):
    pred = model.transform(test_df).withColumn("p_digital", vector_to_array("probability")[1]).cache()
    e_auc  = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    e_aupr = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
    auc = e_auc.evaluate(pred)
    aupr = e_aupr.evaluate(pred)

    cm = (pred.groupBy("label","prediction").count().toPandas())
    tp = int(cm[(cm["label"]==1) & (cm["prediction"]==1)]["count"].sum())
    tn = int(cm[(cm["label"]==0) & (cm["prediction"]==0)]["count"].sum())
    fp = int(cm[(cm["label"]==0) & (cm["prediction"]==1)]["count"].sum())
    fn = int(cm[(cm["label"]==1) & (cm["prediction"]==0)]["count"].sum())

    accuracy  = (tp + tn) / max(tp + tn + fp + fn, 1)
    precision = tp / max(tp + fp, 1)
    recall    = tp / max(tp + fn, 1)
    f1        = (2 * precision * recall) / max(precision + recall, 1e-9)

    print(f"[{model_name}]  AUC ROC: {auc:.4f} | AUC PR: {aupr:.4f} | Acc: {accuracy:.4f} | Prec: {precision:.4f} | Rec: {recall:.4f} | F1: {f1:.4f}")
    return pred, {"auc_roc": auc, "auc_pr": aupr, "accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

## 9) Entrenamiento y comparación (LR, RF, GBT)
Seleccionamos el **mejor** por **AUC PR** (prioriza casos positivos).

In [None]:
results = {}
models  = {}
for name in ["lr","rf","gbt"]:
    print("Entrenando modelo:", name)
    m = fit_model(name, train)
    pred, mtx = evaluate_model(m, test, model_name=name)
    results[name] = mtx
    models[name]  = m

print("\nResumen (AUC PR):", {k: round(v["auc_pr"],4) for k,v in results.items()})
best_name = max(results.items(), key=lambda kv: kv[1]["auc_pr"])[0]
print("Mejor por AUC PR:", best_name)

## 10) Ensemble ligero (promedio de probabilidades)

Tip: combinar `LR` + `GBT` suele mejorar AUC PR.

In [None]:
if "lr" in models and "gbt" in models:
    pred_lr  = models["lr"].transform(test).select("cliente_id","ym","label", F.element_at("probability",2).alias("p_lr"))
    pred_gbt = models["gbt"].transform(test).select("cliente_id","ym", F.element_at("probability",2).alias("p_gbt"))
    pred_ens = (pred_lr.join(pred_gbt, ["cliente_id","ym"])
                .withColumn("p_digital", (F.col("p_lr")+F.col("p_gbt"))/2))
    # Umbral 0.5 de referencia
    pred_ens = pred_ens.withColumn("prediction", F.when(F.col("p_digital")>=0.5, F.lit(1.0)).otherwise(F.lit(0.0)))                        .withColumn("rawPrediction", F.array(F.lit(1.0)-F.col("p_digital"), F.col("p_digital")))

    e_auc  = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    e_aupr = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
    auc = e_auc.evaluate(pred_ens)
    aupr = e_aupr.evaluate(pred_ens)

    cm = (pred_ens.groupBy("label","prediction").count().toPandas())
    tp = int(cm[(cm["label"]==1) & (cm["prediction"]==1)]["count"].sum())
    tn = int(cm[(cm["label"]==0) & (cm["prediction"]==0)]["count"].sum())
    fp = int(cm[(cm["label"]==0) & (cm["prediction"]==1)]["count"].sum())
    fn = int(cm[(cm["label"]==1) & (cm["prediction"]==0)]["count"].sum())

    accuracy  = (tp + tn) / max(tp + tn + fp + fn, 1)
    precision = tp / max(tp + fp, 1)
    recall    = tp / max(tp + fn, 1)
    f1        = (2 * precision * recall) / max(precision + recall, 1e-9)

    print(f"[Ensemble LR+GBT]  AUC ROC: {auc:.4f} | AUC PR: {aupr:.4f} | Acc: {accuracy:.4f} | Prec: {precision:.4f} | Rec: {recall:.4f} | F1: {f1:.4f}")

## 11) (Opcional) Backtesting por cortes temporales

Repite entrenamiento (grids compactos) para cada `ym` en `BACKTEST_SPLITS` y reporta AUC PR.

In [None]:
bt_rows = []
for cut in BACKTEST_SPLITS:
    train_bt = ds.filter(F.col("ym") < F.lit(cut)).repartition(200).persist(StorageLevel.MEMORY_AND_DISK)
    test_bt  = ds.filter(F.col("ym") >= F.lit(cut))
    print(f"\n>>> Backtest corte {cut} | train={train_bt.count()} test={test_bt.count()}")

    metrics_cut = {}
    for name in ["lr","gbt"]:  # rf opcional si hay suficiente memoria/tiempo
        print("  Entrenando:", name)
        m = fit_model(name, train_bt)
        _, mtx = evaluate_model(m, test_bt, model_name=name)
        metrics_cut[name] = mtx["auc_pr"]
    bt_rows.append((cut, metrics_cut.get("lr"), metrics_cut.get("gbt")))

print("\nAUC PR por corte (LR, GBT):")
for cut, lr_auc, gbt_auc in bt_rows:
    print(cut, " | LR:", round(lr_auc or 0, 4), " | GBT:", round(gbt_auc or 0, 4))

## 12) (Opcional) Guardado de artefactos

Descomenta para guardar el mejor modelo y las predicciones del test.

In [None]:
# from pathlib import Path
# base_path = Path("artifacts")
# base_path.mkdir(parents=True, exist_ok=True)

# # Guardar mejor modelo por AUC PR
# best_model = models[best_name]
# best_model.bestModel.write().overwrite().save(str(base_path / f"best_pipeline_{best_name}"))

# # Guardar predicciones del mejor
# pred_best = best_model.transform(test).withColumn("p_digital", vector_to_array("probability")[1])
# pred_best.select("cliente_id","ym","label","p_digital","prediction").write.mode("overwrite").parquet(str(base_path / "predicciones_best"))

---

### Notas
- Evitamos **OutOfMemory** en árboles al **no usar One-Hot** y limitar `maxDepth`, `numTrees` y `maxBins`.
- Usamos **priors lagged** por segmento para mantener señal de `region_comercial_txt` sin inflar dimensionalidad.
- Para más velocidad, puedes reducir aún más el grid o usar `TrainValidationSplit.trainRatio=0.9`.