In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt

spark = SparkSession.builder \
    .appName("HackathonForecast") \
    .master("local[*]") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print("SparkSession creada exitosamente!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/16 11:12:34 WARN Utils: Your hostname, QUIN-DAT-A0012, resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp2s0)
25/09/16 11:12:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/16 11:12:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession creada exitosamente!


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, avg, stddev, col, sum, weekofyear, month, floor

# --- 1. CARGAR DATOS LIMPIOS DE LA CAPA SILVER ---
base_path = "/home/quind/GIT/Desafio-Tecnico-Hackathon-Forecast-Big-Data-2025/"
silver_path = f"{base_path}silver/datos_limpios"

print("Cargando datos limpios desde la capa Silver...")
df_final = spark.read.parquet(silver_path)

# --- 2. CREAR TABLAS DE DIMENSIONES (DESCRIPTIVAS) ---
# Seleccionamos las características únicas de cada producto
features_produto = df_final.select("produto", "categoria", "label", "subcategoria", "marca").distinct()

# Seleccionamos las características únicas de cada PDV
# CAMBIO: Se ha eliminado "zipcode" de la selección de características
features_pdv = df_final.select("pdv", "premise", "categoria_pdv").distinct()

# --- 3. AGREGACIÓN SEMANAL ---
print("Agregando transacciones a nivel semanal...")
df_semanal = df_final.groupBy("pdv", "produto", weekofyear("transaction_date").alias("semana")) \
    .agg(sum("quantity").alias("cantidad_total_semanal"))

# --- 4. ENRIQUECER DATOS SEMANALES CON LAS DIMENSIONES ---
print("Reincorporando las features descriptivas...")
df_enriquecido = df_semanal.join(features_produto, "produto", "left") \
                           .join(features_pdv, "pdv", "left")

# --- 5. CREACIÓN DE FEATURES DE LAG Y VENTANA MÓVIL ---
print("Creando features de Lag y Ventana Móvil...")
# Se recomienda reparticionar antes de la ventana para optimizar el rendimiento
windowSpec = Window.partitionBy("pdv", "produto").orderBy("semana")

df_con_features = df_enriquecido \
    .withColumn("lag_1", lag("cantidad_total_semanal", 1, 0).over(windowSpec)) \
    .withColumn("lag_2", lag("cantidad_total_semanal", 2, 0).over(windowSpec)) \
    .withColumn("lag_4", lag("cantidad_total_semanal", 4, 0).over(windowSpec)) \
    .withColumn("media_movil_4_semanas", avg("cantidad_total_semanal").over(windowSpec.rowsBetween(-3, 0))) \
    .withColumn("stddev_movil_4_semanas", stddev("cantidad_total_semanal").over(windowSpec.rowsBetween(-3, 0)))

# --- 6. CREACIÓN DE FEATURES DE CALENDARIO ---
print("Creando features de calendario...")
df_con_features = df_con_features.withColumn("mes", floor((col("semana") - 1) / 4.34) + 1)

df_listo_para_modelo = df_con_features.fillna(0)

print("\n¡Ingeniería de Features completada! (Ahora con todas las columnas)")
print("Muestra del DataFrame final:")
df_listo_para_modelo.show()

print("\nEsquema final del DataFrame:")
df_listo_para_modelo.printSchema()

Cargando datos limpios desde la capa Silver...


                                                                                

Agregando transacciones a nivel semanal...
Reincorporando las features descriptivas...
Creando features de Lag y Ventana Móvil...
Creando features de calendario...

¡Ingeniería de Features completada! (Ahora con todas las columnas)
Muestra del DataFrame final:


[Stage 2:>                (0 + 16) / 16][Stage 5:>                  (0 + 0) / 6]

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, FeatureHasher
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# --- 1. IDENTIFICAR TIPOS DE FEATURES ---
# La variable objetivo que queremos predecir
TARGET_COL = "cantidad_total_semanal"

# Columnas categóricas que necesitamos codificar
CATEGORICAL_COLS = [
    "pdv", "produto", "categoria", "label", "subcategoria", 
    "marca", "premise", "categoria_pdv"
]

# Columnas numéricas que ya están listas para usar
# CAMBIO: Se ha eliminado "zipcode" de la lista
NUMERICAL_COLS = [
    "semana", "lag_1", "lag_2", "lag_4", 
    "media_movil_4_semanas", "stddev_movil_4_semanas", "mes"
]

# --- 2. DEFINIR LAS ETAPAS DEL PIPELINE ---

# Etapa 1: StringIndexer - Convierte texto a índices numéricos
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in CATEGORICAL_COLS
]
# Guardamos los nombres de las columnas de salida del indexer
indexed_cols = [f"{c}_idx" for c in CATEGORICAL_COLS]

# CAMBIO: Etapa 2: FeatureHasher en lugar de OneHotEncoder
# Convierte múltiples columnas categóricas indexadas en un solo vector de características.
# Es mucho más eficiente en memoria. Puedes ajustar 'numFeatures' según sea necesario.
hasher = FeatureHasher(
    inputCols=indexed_cols,
    outputCol="hashed_features",
    numFeatures=1024  # Potencia de 2, ajusta este valor si es necesario
)

# Etapa 3: VectorAssembler - Une todas las features en un solo vector
# CAMBIO: Ahora usamos la salida del hasher y las columnas numéricas
feature_sources = ["hashed_features"] + NUMERICAL_COLS
assembler = VectorAssembler(
    inputCols=feature_sources,
    outputCol="features"
)

# Etapa 4: Modelo - Usaremos Gradient Boosted Trees Regressor
gbt = GBTRegressor(featuresCol="features", labelCol=TARGET_COL)

# CAMBIO: Unimos las nuevas etapas en el Pipeline
pipeline = Pipeline(stages=indexers + [hasher, assembler, gbt])

# --- 3. DIVIDIR LOS DATOS (ENTRENAMIENTO Y PRUEBA) ---
# Para series temporales, la división DEBE ser cronológica.
print("Dividiendo los datos en entrenamiento y prueba...")
train_data = df_listo_para_modelo.filter(col("semana") <= 40)
test_data = df_listo_para_modelo.filter(col("semana") > 40)

# (Opcional) Cachear los dataframes de entrenamiento y prueba si tienes memoria suficiente
# train_data.cache()
# test_data.cache()

print(f"Filas de entrenamiento: {train_data.count()}")
print(f"Filas de prueba: {test_data.count()}")

# --- 4. ENTRENAR EL MODELO ---
print("\nEntrenando el pipeline del modelo ML...")
model = pipeline.fit(train_data)
print("¡Entrenamiento completado!")

# --- 5. REALIZAR PREDICCIONES Y EVALUAR ---
print("\nRealizando predicciones en el conjunto de prueba...")
predictions = model.transform(test_data)

# Mostrar algunas predicciones
predictions.select("pdv", "produto", "semana", TARGET_COL, "prediction").show(10)

# Evaluar el rendimiento del modelo usando RMSE (Error Cuadrático Medio Raíz)
evaluator = RegressionEvaluator(
    labelCol=TARGET_COL,
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"\nRoot Mean Squared Error (RMSE) en los datos de prueba = {rmse}")

# (Opcional) Limpiar el cache
# train_data.unpersist()
# test_data.unpersist()

Dividiendo los datos en entrenamiento y prueba...


                                                                                

Filas de entrenamiento: 4296393


                                                                                

Filas de prueba: 1424401

Entrenando el pipeline del modelo ML...


25/09/15 20:15:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/15 20:15:55 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/09/15 20:15:56 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/09/15 20:16:07 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/09/15 20:16:15 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
25/09/15 20:16:22 WARN MemoryStore: Not enough space to cache rdd_426_6 in memory! (computed 413.6 MiB so far)
25/09/15 20:16:22 WARN BlockManager: Persisting block rdd_426_6 to disk instead.
25/09/15 20:16:23 WARN MemoryStore: Not enough space to cache rdd_426_15 in memory! (computed 413.6 MiB so far)
25/09/15 20:16:23 WARN BlockManager: Persisting block rdd_426_15 to disk instead.
25/09/15 20:16:23 WARN MemoryStore: Not enough space to cache rdd_426_13 in memor

In [None]:
from pyspark.sql.functions import lit, round

# --- 1. RE-ENTRENAMIENTO DEL MODELO FINAL CON TODOS LOS DATOS DE 2022 ---
print("Re-entrenando el pipeline con todos los datos de 2022...")
# 'pipeline' es el pipeline que definiste en el paso anterior
# 'df_listo_para_modelo' es tu DataFrame con todas las features de 2022
final_model = pipeline.fit(df_listo_para_modelo)
print("Modelo final entrenado.")

# --- 2. GENERACIÓN ITERATIVA DE PREDICCIONES PARA ENERO 2023 ---

# Preparar los datos históricos (todo 2022) para calcular los primeros lags
datos_historicos = df_listo_para_modelo

# Obtener la lista única de PDV/Producto para los que necesitamos predecir
pdv_produto_unicos = df_listo_para_modelo.select("pdv", "produto").distinct()

# Lista para guardar los DataFrames de predicción de cada semana
predicciones_finales = []

print("\nIniciando bucle de predicción para las 5 semanas de Enero/2023...")
for i in range(1, 6):
    semana_a_predecir = 52 + i # Las semanas 53, 54, 55, 56, 57
    print(f"--- Prediciendo para la semana {i}/5 (semana continua {semana_a_predecir}) ---")

    # A. Crear el DataFrame para la semana que queremos predecir
    df_pred_semana_actual = pdv_produto_unicos.withColumn("semana", lit(semana_a_predecir))

    # B. Enriquecer con features descriptivas (label, categoria, etc.)
    # (Reutilizamos el código del notebook anterior para esto)
    features_produto = df_final.select("produto", "categoria", "label", "subcategoria", "marca").distinct()
    features_pdv = df_final.select("pdv", "premise", "categoria_pdv", "zipcode").distinct()
    
    df_pred_semana_actual = df_pred_semana_actual.join(features_produto, "produto", "left") \
                                                 .join(features_pdv, "pdv", "left")

    # C. Crear features de lag y ventana móvil usando los datos históricos
    windowSpec = Window.partitionBy("pdv", "produto").orderBy("semana")
    df_para_predecir_features = df_pred_semana_actual.unionByName(datos_historicos.select(df_pred_semana_actual.columns), allowMissingColumns=True) \
        .withColumn("lag_1", lag("cantidad_total_semanal", 1, 0).over(windowSpec)) \
        .withColumn("lag_2", lag("cantidad_total_semanal", 2, 0).over(windowSpec)) \
        .withColumn("lag_4", lag("cantidad_total_semanal", 4, 0).over(windowSpec)) \
        .withColumn("media_movil_4_semanas", avg("cantidad_total_semanal").over(windowSpec.rowsBetween(-3, 0))) \
        .withColumn("stddev_movil_4_semanas", stddev("cantidad_total_semanal").over(windowSpec.rowsBetween(-3, 0))) \
        .withColumn("mes", floor((col("semana") - 1) / 4.34) + 1) \
        .filter(col("semana") == semana_a_predecir) \
        .fillna(0)

    # D. Realizar la predicción
    prediccion_actual = final_model.transform(df_para_predecir_features)
    
    # E. Guardar la predicción de esta semana
    prediccion_formateada = prediccion_actual.select(
        col("semana"),
        col("pdv"),
        col("produto"),
        col("prediction").alias("cantidad_total_semanal") # Usamos la predicción como si fuera la venta real
    )
    predicciones_finales.append(prediccion_formateada)

    # F. Actualizar los datos históricos con nuestra nueva predicción para el siguiente ciclo
    datos_historicos = datos_historicos.unionByName(prediccion_formateada)

# --- 3. FORMATEO Y GUARDADO DEL ARCHIVO DE SUBMISIÓN ---
print("\nBucle finalizado. Uniendo y formateando el archivo de submisión...")

# Unir las 5 semanas de predicciones en un solo DataFrame
from functools import reduce
df_submision = reduce(lambda df1, df2: df1.union(df2), predicciones_finales)

# Formatear según las especificaciones del hackathon
df_submision_final = df_submision \
    .withColumn("semana_real", col("semana") - 52) \
    .select(
        col("semana_real").alias("semana"),
        col("pdv"),
        col("produto"),
        # Asegurarse de que la cantidad no sea negativa, redondear y convertir a entero
        when(col("cantidad_total_semanal") < 0, 0)
            .otherwise(round(col("cantidad_total_semanal")))
            .cast("integer").alias("quantidade")
    )

# Mostrar una muestra del archivo final
df_submision_final.show(10)

# Guardar el archivo en formato CSV con ';' como separador y sin header
submission_path = f"{base_path}submission"
print(f"Guardando archivo de submisión en: {submission_path}")

df_submision_final.repartition(1).write.mode("overwrite") \
    .option("header", "false") \
    .option("sep", ";") \
    .csv(submission_path)

print("¡Archivo de submisión generado exitosamente!")