In [10]:
!sudo apt-get update
!sudo apt-get install gcsfuse

Get:1 https://nvidia.github.io/libnvidia-container/stable/deb/amd64  InRelease [1477 B]
Hit:2 https://deb.debian.org/debian bullseye InRelease                         
Hit:3 https://download.docker.com/linux/debian bullseye InRelease   
Hit:4 https://deb.debian.org/debian-security bullseye-security InRelease
Get:5 https://deb.debian.org/debian bullseye-updates InRelease [44.0 kB]
Get:6 https://deb.debian.org/debian bullseye-backports InRelease [48.9 kB]
Hit:7 https://packages.cloud.google.com/apt gcsfuse-bullseye InRelease
Hit:8 https://packages.cloud.google.com/apt google-compute-engine-bullseye-stable InRelease
Hit:9 https://packages.cloud.google.com/apt cloud-sdk-bullseye InRelease
Hit:10 https://packages.cloud.google.com/apt google-fast-socket InRelease
Fetched 94.4 kB in 1s (115 kB/s)
Reading package lists... Done
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
gcsfuse is already the newest version (3.1.0).
0 upgraded, 0 newly insta

In [2]:
!mkdir -p /home/jupyter/franco_maestria/prophet_temporal_features_batch

In [12]:
!fusermount -u /home/jupyter/franco_maestria/prophet_temporal_features_batch

In [13]:
!gcsfuse forecasting_customer_product /home/jupyter/franco_maestria/prophet_temporal_features_batch

{"timestamp":{"seconds":1752836581,"nanos":862825615},"severity":"INFO","message":"Start gcsfuse/3.1.0 (Go version go1.24.0) for app \"\" using mount point: /home/jupyter/franco_maestria/prophet_temporal_features_batch\n"}
{"timestamp":{"seconds":1752836581,"nanos":862868082},"severity":"INFO","message":"GCSFuse config","config":{"AppName":"","CacheDir":"","Debug":{"ExitOnInvariantViolation":false,"Fuse":false,"Gcs":false,"LogMutex":false},"DisableAutoconfig":false,"EnableAtomicRenameObject":true,"EnableGoogleLibAuth":false,"EnableHns":true,"EnableNewReader":false,"FileCache":{"CacheFileForRangeRead":false,"DownloadChunkSizeMb":200,"EnableCrc":false,"EnableODirect":false,"EnableParallelDownloads":false,"ExperimentalExcludeRegex":"","ExperimentalParallelDownloadsDefaultOn":true,"MaxParallelDownloads":96,"MaxSizeMb":-1,"ParallelDownloadsPerFile":16,"WriteBufferSize":4194304},"FileSystem":{"DirMode":"755","DisableParallelDirops":false,"ExperimentalEnableDentryCache":false,"ExperimentalEna

In [14]:
# --------------------------------------------
# 0️⃣ Librerías necesarias
# --------------------------------------------
import pandas as pd
from prophet import Prophet
from tqdm import tqdm
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import glob
import os
from joblib import Parallel, delayed

In [15]:
# --------------------------------------------
# 2️⃣ Cargar dataset parquet
# --------------------------------------------

# Ruta local (ajusta si hace falta)
file_path = "panel_cliente_producto_fe.parquet"

df = pd.read_parquet(file_path)

df['fecha'] = pd.to_datetime(df['fecha'])

# Filtrar clientes a partir de 10614 (inclusive)
df = df[df['customer_id'] > 10614]

print(f"✅ Dataset cargado con forma: {df.shape}")
df.head()



✅ Dataset cargado con forma: (55399, 194)


Unnamed: 0,customer_id,product_id,periodo,tn,IPC,inflacion,cambio_dolar,dias_feriados,cat1,cat2,...,cluster_dtw_factorized,cluster_x_month,periodo_factorized,cat1_factorized,cat2_factorized,cat3_factorized,brand_factorized,descripcion_factorized,product_target_enc,customer_target_enc
12082787,10615,20001,201701,0.0,102,2.0,16.08,1,HC,ROPA LAVADO,...,0,0,0,0,0,0,0,0,2.882842,0.000264
12082788,10615,20001,201702,0.0,104,1.960784,15.8,2,HC,ROPA LAVADO,...,0,0,1,0,0,0,0,0,2.882842,0.000264
12082789,10615,20001,201703,0.0,107,2.884615,15.645,1,HC,ROPA LAVADO,...,0,0,2,0,0,0,0,0,2.882842,0.000264
12082790,10615,20001,201704,0.0,109,1.869159,15.49,1,HC,ROPA LAVADO,...,0,0,3,0,0,0,0,0,2.882842,0.000264
12082791,10615,20001,201705,0.0,111,1.834862,16.185,2,HC,ROPA LAVADO,...,0,0,4,0,0,0,0,0,2.882842,0.000264


In [16]:
# --------------------------------------------
# 3️⃣ Verificar columnas clave
# --------------------------------------------
print(df.columns)

# Nos aseguramos de tener: product_id, fecha, tn_total
df[['customer_id','product_id', 'fecha', 'tn']].head()

Index(['customer_id', 'product_id', 'periodo', 'tn', 'IPC', 'inflacion',
       'cambio_dolar', 'dias_feriados', 'cat1', 'cat2',
       ...
       'cluster_dtw_factorized', 'cluster_x_month', 'periodo_factorized',
       'cat1_factorized', 'cat2_factorized', 'cat3_factorized',
       'brand_factorized', 'descripcion_factorized', 'product_target_enc',
       'customer_target_enc'],
      dtype='object', length=194)


Unnamed: 0,customer_id,product_id,fecha,tn
12082787,10615,20001,2017-01-01,0.0
12082788,10615,20001,2017-02-01,0.0
12082789,10615,20001,2017-03-01,0.0
12082790,10615,20001,2017-04-01,0.0
12082791,10615,20001,2017-05-01,0.0


In [20]:
# --------------------------------------------
# 1️⃣ Configuración inicial
# --------------------------------------------
output_dir = "prophet_temporal_features_batch"
os.makedirs(output_dir, exist_ok=True)
BATCH_SIZE = 500
CORES = 40

In [21]:
# --------------------------------------------
# 2️⃣ Función de procesamiento por serie
# --------------------------------------------
def procesar_serie(cid, pid, group):
    try:
        group = group.sort_values('fecha').set_index('fecha').asfreq('MS')
        group['customer_id'] = cid
        group['product_id'] = pid

        if group['tn'].count() < 12 or group['tn'].nunique() < 2:
            return None

        df_prophet = pd.DataFrame({
            'ds': group.index,
            'y': group['tn']
        })

        m = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=False,
            daily_seasonality=False
        )
        m.fit(df_prophet)

        forecast = m.predict(df_prophet[['ds']])
        forecast = forecast.set_index('ds')

        forecast['tn'] = df_prophet.set_index('ds')['y']
        forecast['customer_id'] = cid
        forecast['product_id'] = pid
        forecast['fecha'] = forecast.index

        for col in ['seasonal', 'additive_terms', 'multiplicative_terms']:
            if col not in forecast.columns:
                forecast[col] = 0

        forecast['residual'] = forecast['tn'] - forecast['yhat']
        forecast['slope_trend_3'] = forecast['trend'].diff().rolling(3).mean()

        keep_cols = [
            'customer_id', 'product_id', 'fecha',
            'tn', 'trend', 'seasonal',
            'additive_terms', 'residual', 'slope_trend_3'
        ]
        return forecast[keep_cols].reset_index(drop=True)

    except Exception as e:
        print(f"❌ Error en cid={cid}, pid={pid}: {e}")
        return None


In [22]:
# --------------------------------------------
# 3️⃣ Ejecutar en paralelo por bloques
# --------------------------------------------
from math import ceil

series_list = list(df.groupby(['customer_id', 'product_id']))
num_blocks = ceil(len(series_list) / BATCH_SIZE)

for i in range(num_blocks):
    block = series_list[i * BATCH_SIZE: (i + 1) * BATCH_SIZE]
    print(f"🌀 Procesando bloque {i+1}/{num_blocks} con {len(block)} series")

    resultados = Parallel(n_jobs=CORES)(
        delayed(procesar_serie)(cid, pid, g) for (cid, pid), g in block
    )

    # Filtrar nulos y unir resultados
    df_result = pd.concat([r for r in resultados if r is not None], ignore_index=True)

    # Guardar CSV del batch
    fecha_actual = datetime.today().strftime('%Y%m%d')
    output_path = os.path.join(output_dir, f"prophet_batch_{i+1:03d}_{fecha_actual}.csv")
    df_result.to_csv(output_path, index=False)

    print(f"✅ Guardado: {output_path} | {df_result.shape[0]} filas")


🌀 Procesando bloque 1/15 con 500 series


ValueError: No objects to concatenate

In [23]:
# --------------------------------------------
# 1️⃣ Consolidar todos los archivos Prophet en un único DataFrame
# --------------------------------------------

# Directorio donde guardamos los batches
input_dir = "prophet_temporal_features_batch"

# Buscar todos los .csv de output
csv_files = sorted(glob.glob(os.path.join(input_dir, "prophet_batch_*.csv")))

print(f"🔍 Encontrados {len(csv_files)} archivos a consolidar.")

# Leer todos y concatenar
df_prophet_final = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)


# Verificación rápida
print(f"✅ Consolidación completa. Total filas: {df_prophet_final.shape[0]}")
print(df_prophet_final.head())


🔍 Encontrados 89 archivos a consolidar.
✅ Consolidación completa. Total filas: 7249573
   customer_id  product_id       fecha         tn       trend  seasonal  \
0        10001       20001  2017-01-01   99.43861  110.691026         0   
1        10001       20001  2017-02-01  198.84365  114.229187         0   
2        10001       20001  2017-03-01   92.46537  117.424945         0   
3        10001       20001  2017-04-01   13.29728  120.963106         0   
4        10001       20001  2017-05-01  101.00563  124.387133         0   

   additive_terms    residual  slope_trend_3  
0      -46.415291   35.162876            NaN  
1       92.249763   -7.635300            NaN  
2       85.674158 -110.633733            NaN  
3      -63.645006  -44.020820       3.424027  
4      -23.848351    0.466848       3.385982  


In [24]:
df_prophet_final.to_csv('prophet_features_customer_product.csv', index=False)