In [0]:
# -----------------------------------------------------------
# 1. Preparación: Lectura y procesamiento de la data original
# -----------------------------------------------------------
from pyspark.sql.functions import expr, col, min, mean, greatest,max
from mlflow.models import infer_signature

In [0]:
# Cargamos las tablas base
base_atributos = spark.table("esan_202504.clientes.base_atributos")
base_cliente = spark.table("esan_202504.clientes.base_cliente")
base_trx = spark.table("esan_202504.ventas.base_trx")

In [0]:
# Realizamos los joins para consolidar la información
join_1 = base_atributos.join(base_cliente, on=["periodo", "id_cliente"], how="left")
tabla_consolidada = join_1.join(base_trx, on=["periodo", "id_cliente"], how="left")

# Imputación de valores nulos en columnas categóricas
tabla_consolidada = tabla_consolidada.fillna({
    "tipo_producto": "Desconocido",
    "departamento": "Desconocido",
    "canal": "Desconocido"
})

# Imputación en columnas numéricas con la mediana
numeric_cols = ["monto_1m", "monto_2m", "monto_3m", "frecuencia_1m", "frecuencia_2m", "frecuencia_3m"]
for col_name in numeric_cols:
    median_value = tabla_consolidada.approxQuantile(col_name, [0.5], 0.01)[0]
    tabla_consolidada = tabla_consolidada.fillna({col_name: median_value})

# Creación de nuevas características
tabla_consolidada = tabla_consolidada.withColumn(
    "monto_total",
    expr("monto_1m + monto_2m + monto_3m + monto_4m + monto_5m + monto_6m")
)
tabla_consolidada = tabla_consolidada.withColumn(
    "tendencia_monto",
    expr("(monto_1m - monto_6m) / monto_6m")
)

# Otros ajustes en la data
tabla_consolidada = tabla_consolidada.drop('__index_level_0__')
tabla_consolidada = tabla_consolidada.filter(col("flg_churn").isNotNull())

columnas_cero = ["incidencias_a", "incidencias_b", "crossell", "ultima_compra_2m", "ultima_compra_3m"]
tabla_consolidada = tabla_consolidada.fillna({col: 0 for col in columnas_cero})

periodo_minimo = tabla_consolidada.select(min("periodo")).collect()[0][0]
tabla_consolidada = tabla_consolidada.fillna({"periodo_creacion": periodo_minimo})

columnas_menos_uno = ["segmento_pago", "segmento_cliente"]
tabla_consolidada = tabla_consolidada.fillna({col: -1 for col in columnas_menos_uno})

tasa_media = tabla_consolidada.select(mean("tasa")).collect()[0][0]
tabla_consolidada = tabla_consolidada.fillna({"tasa": tasa_media})

In [0]:
periodo_max =tabla_consolidada.agg(max("periodo")).collect()[0][0]
tabla_consolidada_mensual = tabla_consolidada.filter(tabla_consolidada.periodo == periodo_max)

In [0]:
spark.sql("DROP TABLE IF EXISTS esan.ventas.base_consolidada PURGE")


In [0]:
%sql
CREATE TABLE esan_202504.ventas.base_consolidada (
    periodo BIGINT,
    id_cliente BIGINT,
    tiempo_permanencia BIGINT,
    flg_vip DOUBLE,
    incidencias_a DOUBLE NOT NULL,
    incidencias_b DOUBLE NOT NULL,
    tipo_producto STRING NOT NULL,
    periodo_creacion BIGINT NOT NULL,
    departamento STRING NOT NULL,
    segmento_pago BIGINT NOT NULL,
    canal STRING NOT NULL,
    segmento_cliente BIGINT NOT NULL,
    crossell DOUBLE NOT NULL,
    tasa DOUBLE NOT NULL,
    monto_1m DOUBLE NOT NULL,
    monto_2m DOUBLE NOT NULL,
    monto_3m DOUBLE NOT NULL,
    monto_4m DOUBLE,
    monto_5m DOUBLE,
    monto_6m DOUBLE,
    cantidad_1m DOUBLE,
    cantidad_2m DOUBLE,
    cantidad_3m DOUBLE,
    cantidad_6m DOUBLE,
    frecuencia_1m DOUBLE NOT NULL,
    frecuencia_2m DOUBLE NOT NULL,
    frecuencia_3m DOUBLE NOT NULL,
    ultima_compra_1m DOUBLE,
    ultima_compra_2m DOUBLE NOT NULL,
    ultima_compra_3m DOUBLE NOT NULL,
    flg_churn DOUBLE,
    monto_total DOUBLE,
    tendencia_monto DOUBLE
)
USING DELTA;


In [0]:
# Guardamos la tabla consolidada en Unity Catalog para persistencia
tabla_consolidada.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("esan_202504.ventas.base_consolidada")