In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import (
    KFold,
    StratifiedKFold,
    cross_validate,
    cross_val_score
)

from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import sys

# Carga del script que contiene las rutinas comúnes para el modelado.
sys.path.append('../scripts')
import modelos_common as mcom

In [None]:
# Se carga el dataset ya procesado y filtrado.
main_df = mcom.get_dataframe()

### Regresión - Validación Cruzada

In [None]:
kf = KFold(n_splits=5, shuffle=True, random_state=42)
regresion_scores = pd.DataFrame()

# Utiliza el iterador para volver a obtener (cadena, X, y).
for cadena_name, X, y, _ in mcom.iter_regresion_Xy(main_df):
    puntajes = []
    
	# Se realizará una validación cruzada de los modelos de regresión que se seleccionaron.
    for reg_nombre, reg in mcom.ALL_REGRESION.items():
        modelo = Pipeline([
            ('scaler', StandardScaler()),
            ('regresion', reg),
        ])
        
        # Cálculo de los puntajes de validación cruzada. Se utiliza la métrica de MAE para el cálculo.
        scores = cross_val_score(modelo, X, y, cv=kf, scoring='neg_mean_absolute_error')
        
        puntajes.append({
            'cadena': cadena_name,
            'regresion': reg_nombre,
            'MAE score': -np.mean(scores)
        })
    
	# Se utiliza la métrica MAE para evaluar el mejor modelo.
	# El mejor puntaje es aquel cuya MAE sea menor.
    puntajes = pd.DataFrame(puntajes)
    puntajes['mejor'] = puntajes['MAE score'] == puntajes['MAE score'].min()
    regresion_scores = pd.concat([regresion_scores, puntajes])

# Los puntajes de la evaluación cruzada se almacenan en un dataframe para su posterior uso.
display(regresion_scores)

In [None]:
# Graficar los mejores modelos de regresión para cada cadena de supermercados. Solo se utilizan los modelos catalogados como 'mejores'
# del dataframe 'regresion_scores'.

mejores_modelos = regresion_scores[regresion_scores.mejor == True][['cadena', 'regresion']]
reg_columns = ['supermercado_id', 'cadena_id', 'producto_id', 'anio', 'mes']

for cadena, regresion in mejores_modelos.values:
    # Se carga el modelo previamente guardado en disco.
    modelo = mcom.cargar_modelo('regresion/{} - {}.joblib'.format(cadena, regresion))
    
    if modelo:
        # se genera predicción de los datos reales. Esto es la regresión.
        df = main_df[main_df.cadena == cadena]
        y_pred0 = modelo.predict(df[reg_columns])

        # Se genera un dataset sintético utilizando los valores del último mes, para luego incrementarles la fecha en un mes.
        # Este dataset sintético es utilizado para calcular la predicción del siguiente mes.
        ficticio_df = mcom.get_ficticio_siguiente_mes(df)
        y_pred1 = modelo.predict(ficticio_df[reg_columns])
        
        # Generación de gráficos de los tres casos: valores reales, regresión total, y predicción del siguiente mes.
        fig, ax = plt.subplots(1, 1, figsize=(10, 5))
        ax.set_title('Estimaciones de costos para cadena de supermercados: %s' % cadena, fontsize=10)
        ax.tick_params(axis='x', labelsize=6)
        ax.tick_params(axis='y', labelsize=6)
        ax.grid(True, color='#dedede')
        
        for x,y,label in [(df.fecha, df.costo, 'real'), (df.fecha, y_pred0, 'regresión'), (ficticio_df.fecha, y_pred1, 'predicción')]:
            df = pd.DataFrame({'fecha':x, 'costo':y}).groupby(['fecha'])['costo'].mean().reset_index()
            ax.plot(df.fecha, df.costo, label=label, marker='o')

    plt.legend()
    plt.show()

### Clasificación - Validación Cruzada

In [None]:
# Validación cruzada para los modelos de clasificación.

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
clasificacion_scores = pd.DataFrame()

# Los puntajes estarán basados en las métricas F1 obligatorio, ROC-AUC y PR-AUC
scoring = {
    'f1_macro': 'f1_macro',
    'roc_auc': 'roc_auc',
    'pr_auc': 'average_precision'  # PR-AUC
}

for cadena_name, X, y, _ in mcom.iter_clasificacion_Xy(main_df):
    puntajes = []
    
    for cls_nombre, cls in mcom.ALL_CLASIFICACION.items():
        scores = cross_validate(cls, X, y, cv=cv, scoring=scoring, return_train_score=False)
        
        puntajes.append({
            'cadena': cadena_name,
            'clasificador': cls_nombre,
            'F1 score': np.mean(scores['test_f1_macro']),
            'ROC-AUC score': np.mean(scores['test_roc_auc']),
            'PR-AUC score': np.mean(scores['test_pr_auc'])
        })
    
	# Se utiliza F1 como métrica para determinar el mejor modelo. Esto debido a que el objetivo de la clasificación
    # es determinar si el costo subirá o no de valor al siguiente mes. Para dicho análisis, F1 es la mejor opción.
    # Mientras más alto el valor F1, mejor.
    puntajes = pd.DataFrame(puntajes)
    puntajes['mejor'] = puntajes['F1 score'] == puntajes['F1 score'].max()
    clasificacion_scores = pd.concat([clasificacion_scores, puntajes])

display(clasificacion_scores)

In [None]:
# Graficar los mejores modelos de clasificación para cada cadena de supermercados. Solo se utilizan los modelos catalogados como 'mejores'
# del dataframe 'clasificacion_scores'.

cls_columns = ['supermercado_id', 'cadena_id', 'producto_id', 'anio', 'mes']

# Variable para establecer el nivel de variación de costo. Default = 5%
variacion = 0.05

# Se recorre el iterador de cadenas, con el umbral ya establecido.
for cadena_name, _, _, sub_df in mcom.iter_clasificacion_Xy(main_df, umbral_pct=variacion):
    best_score = clasificacion_scores[(clasificacion_scores.mejor == True) & (clasificacion_scores.cadena == cadena_name)]
    clasificador = best_score['clasificador'].values[0]
    modelo = mcom.cargar_modelo('clasificacion/{} - {}.joblib'.format(cadena_name, clasificador))

    if modelo:
        # Generación del siguiente mes ficticio para estimar si se incrementará el costo o no.
        ficticio_df = mcom.get_ficticio_siguiente_mes(sub_df)[[*cls_columns, 'costo', 'fecha']]
        ficticio_df['target'] = modelo.predict(ficticio_df[cls_columns])
        
        fig, ax = plt.subplots(1, 1, figsize=(10, 5))
        ax.set_title('Alza: %s' % cadena_name, fontsize=10)
        ax.tick_params(axis='x', labelsize=6)
        ax.tick_params(axis='y', labelsize=6)
        ax.grid(True, color='#dedede')
        
        for i, (dff, label, color_alert) in enumerate([(sub_df, 'real', 'red'), (ficticio_df, 'predicción', 'green')]):
            df = pd.DataFrame({'fecha':dff.fecha, 'costo':dff.costo, 'target':dff.target}).groupby(['fecha']).agg({
                'costo':'mean',
                'target': lambda x: 1 if 1 in pd.Series.mode(x).tolist() else 0
            }).reset_index()
            
            # Los markers triangulares indican la alerta de incremento de costo.
            incrementos = df[df["target"] == 1]
            ax.plot(df.fecha, df.costo, label='%s costo' % label, marker='o' if i else None, zorder=0)
            ax.scatter(incrementos.fecha, incrementos.costo, label='%s alerta' % label, marker='v', color=color_alert, zorder=10)

    plt.legend()
    plt.show()


### Se muestran las imagenes (ROC Curve) guardadas de los modelos con mejor desempeño. 

In [None]:

# Variable para establecer el nivel de variación de costo. Default = 5%
variacion = 0.05

fig, axs = plt.subplots(4, 2, figsize=(10,15))
axs = axs.reshape(-1)

# Se recorre el iterador de cadenas, con el umbral ya establecido.
for idx, (cadena_name, X, y, _) in enumerate(mcom.iter_clasificacion_Xy(main_df, umbral_pct=variacion)):
    ax = axs[idx]
    
    best_score = clasificacion_scores[(clasificacion_scores.mejor == True) & (clasificacion_scores.cadena == cadena_name)]
    clasificador, roc_auc = best_score[['clasificador', 'ROC-AUC score']].values[0]
    modelo = mcom.cargar_modelo('clasificacion/{} - {}.joblib'.format(cadena_name, clasificador))

    if modelo:
        X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)
        modelo.fit(X_train, y_train)
        
        if hasattr(modelo.named_steps["clasificador"], "predict_proba"):
            y_proba = modelo.predict_proba(X_test)[:, 1]
        elif hasattr(modelo.named_steps["clasificador"], "decision_function"):
            y_proba = modelo.decision_function(X_test)
        else:
            y_proba = None
        
        name = f"{cadena_name}_{clasificador}" 
        fpr, tpr, thr = roc_curve(y_test, y_proba)
        
        ax.plot(fpr, tpr, label=f"(AUC={roc_auc:.3f})")
        ax.plot([0, 1], [0, 1], linestyle="--")
        ax.set_title(f"ROC — {name}", fontsize=9)
        ax.tick_params(axis='x', labelsize=7)
        ax.tick_params(axis='y', labelsize=7)
        ax.grid(True, color='#dedede')
        ax.set_xlabel("False Positive Rate")
        ax.set_ylabel("True Positive Rate (Recall)")
        ax.legend(loc="lower right", fontsize=9)

plt.tight_layout()
plt.show()	