# 🚀 Pipeline de Entrenamiento MLflow + DagsHub

Este notebook implementa un pipeline completo de entrenamiento de modelos de Machine Learning con las siguientes funcionalidades:

## 📋 Objetivos
- **Conexión a Hopsworks**: Obtener datos desde el Feature Store
- **Procesamiento de datos**: Transformar características y crear conjuntos de entrenamiento/test
- **Configuración MLflow**: Tracking de experimentos con backend DagsHub
- **Entrenamiento de modelos**: Múltiples algoritmos (Linear, RandomForest, XGBoost, etc.)
- **Evaluación y comparación**: Encontrar el mejor modelo por MAE
- **Carga de modelos**: Sistema robusto para cargar el mejor modelo entrenado

## 🛠️ Tecnologías
- **Hopsworks**: Feature Store para gestión de datos
- **MLflow**: Tracking de experimentos y gestión de modelos  
- **DagsHub**: Backend remoto para MLflow
- **Scikit-learn**: Modelos de ML
- **XGBoost**: Gradient boosting

## 🔄 Flujo del Pipeline
1. Configuración e importaciones
2. Conexión a Hopsworks Feature Store
3. Obtención y procesamiento de datos
4. Configuración MLflow + DagsHub
5. Entrenamiento de modelos múltiples
6. Evaluación y selección del mejor modelo
7. Carga robusta del modelo ganador

In [3]:
from datetime import datetime
from src import config
import hopsworks
import pandas as pd
import logging
import mlflow
import mlflow.pyfunc
from dagshub import dagshub_logger, init
import mlflow.sklearn
from xgboost import XGBRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

import sys
from pathlib import Path

# Añade src al path para importar los módulos
sys.path.append(str(Path().resolve().parent / 'src'))

## 1. 🔧 Configuración e Importaciones

Configuración inicial del entorno, importaciones necesarias y configuración de logging.

In [4]:
%reload_ext autoreload
%autoreload 2

In [5]:
# Configuración básica de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('feature_view_creation')

In [6]:
# Conectar a Hopsworks y al Feature Store
try:
    # Login y conexión al proyecto
    project = hopsworks.login(
        api_key_value=config.HOPSWORKS_API_KEY, 
        project=config.HOPSWORKS_PROJECT_NAME)
    
    # Conexión al feature store
    feature_store = project.get_feature_store()
    
    # Conexión al feature group
    feature_group = feature_store.get_feature_group(
        name=config.FEATURE_GROUP_NAME,
        version=config.FEATURE_GROUP_VERSION
    )
    
    logger.info(f"Conexión exitosa al Feature Group: {feature_group.name} (v{feature_group.version})")
    
except Exception as e:
    logger.error(f"Error en conexión: {e}")
    raise

2025-09-02 17:54:08,561 INFO: Initializing external client
2025-09-02 17:54:08,562 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-09-02 17:54:08,562 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-09-02 17:54:09,945 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1242272

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1242272
2025-09-02 17:54:11,203 INFO: Conexión exitosa al Feature Group: times_series_bolleria_feature_group (v1)
2025-09-02 17:54:11,203 INFO: Conexión exitosa al Feature Group: times_series_bolleria_feature_group (v1)


## 2. 🏪 Conexión a Hopsworks Feature Store

Conexión al Feature Store de Hopsworks para obtener los datos de entrenamiento.

In [7]:
# Crear/obtener feature view con características seleccionadas
try:
    # Características específicas a incluir
    selected_features = ['familia', 'base_imponible', 'is_summer_peak', 'is_easter', 'week_start']
    feature_view_name = config.FEATURE_VIEW_NAME
    feature_view_version = 1
    
    # Intentar obtener la feature view existente primero
    try:
        feature_view = feature_store.get_feature_view(
            name=feature_view_name,
            version=feature_view_version
        )
        logger.info(f"Feature view existente recuperada: {feature_view.name} (v{feature_view.version})")
    
    except:
        # Si no existe, crear una nueva
        # Obtener objetos Feature para las características seleccionadas
        selected_feature_objects = [f for f in feature_group.features if f.name in selected_features]
        
        # Crear query con características seleccionadas
        specific_query = feature_group.select(selected_feature_objects)
        
        # Crear la feature view
        feature_view = feature_store.create_feature_view(
            name=feature_view_name,
            version=feature_view_version,
            query=specific_query,
            description=f"Feature view con características: {', '.join(selected_features)}"
        )
        logger.info(f"Nueva feature view creada: {feature_view.name} (v{feature_view.version})")
    
except Exception as e:
    logger.error(f"Error al crear/obtener feature view: {e}")
    raise

2025-09-02 17:54:12,077 INFO: Feature view existente recuperada: times_series_bolleria_feature_view (v1)


In [8]:
# Obtener datos de la feature view
try:
    # Obtener datos en batch normal
    df_ts = feature_view.get_batch_data()
    
    # Mostrar resumen de los datos obtenidos
    logger.info(f"Datos obtenidos: {df_ts.shape[0]} filas, {df_ts.shape[1]} columnas")
    logger.info(f"Columnas disponibles: {list(df_ts.columns)}")
    print("Muestra de datos:")
    print(df_ts.head(3))
    
except Exception as e:
    logger.error(f"Error al obtener datos: {e}")
    raise

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.71s) 
2025-09-02 17:54:15,565 INFO: Datos obtenidos: 133 filas, 5 columnas
2025-09-02 17:54:15,566 INFO: Columnas disponibles: ['familia', 'base_imponible', 'is_summer_peak', 'is_easter', 'week_start']
Muestra de datos:
    familia  base_imponible  is_summer_peak  is_easter  \
0  BOLLERIA          641.56               0          0   
1  BOLLERIA          725.72               0          0   
2  BOLLERIA          950.70               0          0   

                 week_start  
0 2023-02-06 00:00:00+00:00  
1 2025-02-24 00:00:00+00:00  
2 2023-09-18 00:00:00+00:00  
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.71s) 
2025-09-02 17:54:15,565 INFO: Datos obtenidos: 133 filas, 5 columnas
2025-09-02 17:54:15,566 INFO: Columnas disponibles: ['familia', 'base_imponible', 'is_summer_peak', 'is_easter', 'week_start']
Muestra de datos:
    familia  base_imponible  is_summer_peak  is

In [9]:
# Obtener datos de entrenamiento (training_data)
try:
    # Obtener datos de entrenamiento (X, y) desde la feature view
    df_ts = feature_view.training_data()

except Exception as e:
    print(f"Error al obtener datos de entrenamiento: {e}")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.71s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.71s) 




In [10]:
# Procesar datos para entrenamiento
from src.data_utils import transformar_features_target

try:
       
    # Procesar datos usando la función mejorada que acepta tuplas directamente
    features_and_target = transformar_features_target(
        df_ts,
        lags_list=[1, 2, 3, 52], 
        columna_target='base_imponible',
        cols_exogenas=['is_easter', 'is_summer_peak'],
        periodos_adelante=1,
        eliminar_nulos=True,
        return_format='dataframe'  # Obtenemos un único DataFrame con features y target
    )
    
    # Mostrar información de los datos procesados
    logger.info(f"Datos procesados: {features_and_target.shape[0]} filas, {features_and_target.shape[1]} columnas")
    logger.info(f"Variables disponibles: {list(features_and_target.columns)}")
    print("\nMuestra de datos procesados:")
    print(features_and_target.head(3))
    
except Exception as e:
    logger.error(f"Error al procesar datos: {e}")
    raise

2025-09-02 17:54:19,314 INFO: Detectada entrada tipo tupla con 2 elementos
2025-09-02 17:54:19,314 INFO: Usando el primer elemento de la tupla como DataFrame: (133, 5)
2025-09-02 17:54:19,322 INFO: Retornando DataFrame combinado: (80, 8)
2025-09-02 17:54:19,322 INFO: Datos procesados: 80 filas, 8 columnas
2025-09-02 17:54:19,323 INFO: Variables disponibles: ['base_imponible_lag1', 'base_imponible_lag2', 'base_imponible_lag3', 'base_imponible_lag52', 'is_easter', 'is_summer_peak', 'week_start', 'target']

Muestra de datos procesados:
     base_imponible_lag1  base_imponible_lag2  base_imponible_lag3  \
41                572.51               534.79               563.18   
72                597.65               572.51               534.79   
114               680.30               597.65               572.51   

     base_imponible_lag52  is_easter  is_summer_peak  \
41                 825.11          0               0   
72                 658.40          0               0   
114         

## 3. 🔄 Procesamiento de Datos

Transformación de características, creación de variables lag y división en conjuntos de entrenamiento y test.

In [11]:
# Split temporal (automático 80/20 ya incluido en la función)
from src.data_split import train_test_split

try:
    X_train, y_train, X_test, y_test = train_test_split(
        features_and_target,
        target='target'  # o 'base_imponible' según tu pipeline
        # split_ratio=0.8  # puedes cambiar el porcentaje si lo necesitas
    )
    print(f"Train: {X_train.shape}, Test: {X_test.shape}")
except Exception as e:
    logger.error(f"Error en el split temporal: {e}")
    raise


Train: (65, 7), Test: (15, 7)


In [12]:
# Eliminar columnas datetime antes de entrenar
for df in [X_train, X_test]:
    if 'week_start' in df.columns:
        df.drop('week_start', axis=1, inplace=True)

## 4. 📊 Configuración MLflow + DagsHub

Configuración del tracking de experimentos con MLflow usando DagsHub como backend remoto.

In [13]:
import dagshub
dagshub.init(repo_owner='joquifer2', repo_name='mlops_fleca_project', mlflow=True)

# Opcional: Configuración de seguimiento de experimentos
mlflow.set_experiment("fleca_bolleria_experiments_v2")

2025-09-02 17:54:19,917 INFO: HTTP Request: GET https://dagshub.com/api/v1/user "HTTP/1.1 200 OK"


2025-09-02 17:54:19,927 INFO: Accessing as joquifer2
2025-09-02 17:54:20,199 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/joquifer2/mlops_fleca_project "HTTP/1.1 200 OK"
2025-09-02 17:54:20,199 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/joquifer2/mlops_fleca_project "HTTP/1.1 200 OK"
2025-09-02 17:54:20,383 INFO: HTTP Request: GET https://dagshub.com/api/v1/user "HTTP/1.1 200 OK"
2025-09-02 17:54:20,383 INFO: HTTP Request: GET https://dagshub.com/api/v1/user "HTTP/1.1 200 OK"


2025-09-02 17:54:20,386 INFO: Initialized MLflow to track repo "joquifer2/mlops_fleca_project"


2025-09-02 17:54:20,387 INFO: Repository joquifer2/mlops_fleca_project initialized!


<Experiment: artifact_location='mlflow-artifacts:/5024cd21a4ba41be8e29da0a760140c9', creation_time=1756810934638, experiment_id='3', last_update_time=1756810934638, lifecycle_stage='active', name='fleca_bolleria_experiments_v2', tags={}>

## 5. 🤖 Entrenamiento de Modelos

Entrenamiento de múltiples algoritmos de ML y registro de métricas en MLflow.

In [14]:
# Definimos los modelos a entrenar

models ={
    "LinearRegresion": LinearRegression(),
    "Ridge": Ridge(alpha=1.0),
    "Lasso": Lasso(alpha=0.1),
    "RandomForest10": RandomForestRegressor(n_estimators=100),
       }

In [15]:
# Entrenamos y evaluamos varios modelos a la vez
import joblib  # Para guardar modelos scikit-learn
import pickle
import os

for model_name, model in models.items():
    with mlflow.start_run(run_name=model_name) as run:
        # Entrenamos el modelo
        model.fit(X_train, y_train)
        # Realizamos predicciones
        y_pred = model.predict(X_test)

        # Calculamos las métricas
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        # Registramos las métricas en mlflow
        mlflow.log_metrics({
            "mae": mae,
            "mse": mse,
            "r2": r2
        })
        # Registramos los hiperparámetros en mlflow
        mlflow.log_params({
            "model": model_name,
            "n_estimators": model.n_estimators if hasattr(model, "n_estimators") else None,
            "max_depth": model.max_depth if hasattr(model, "max_depth") else None,
            "learning_rate": model.learning_rate if hasattr(model, "learning_rate") else None
        })

        # ✅ SOLUCIÓN: Guardar modelo localmente y subir como artefacto
        try:
            # Crear directorio temporal para el modelo
            model_dir = f"temp_model_{model_name}"
            os.makedirs(model_dir, exist_ok=True)
            
            # Guardar modelo con pickle
            model_path = os.path.join(model_dir, "model.pkl")
            with open(model_path, 'wb') as f:
                pickle.dump(model, f)
            
            # Registrar como artefacto en MLflow
            mlflow.log_artifacts(model_dir, "model")
            
            # Limpiar directorio temporal
            import shutil
            shutil.rmtree(model_dir)
            
            print(f"✅ {model_name} guardado como artefacto - MAE: {mae:.2f} - Run ID: {run.info.run_id}")
            
        except Exception as e:
            print(f"❌ Error guardando {model_name}: {e}")

        logger.info(f"Modelo: {model_name} - MAE: {mae}, MSE: {mse}, R2: {r2}")

✅ LinearRegresion guardado como artefacto - MAE: 205.51 - Run ID: ac79f950067d4b7d86979647d2b8ad07
2025-09-02 17:54:22,818 INFO: Modelo: LinearRegresion - MAE: 205.51486658938077, MSE: 63302.69911598183, R2: 0.30649562972125177
🏃 View run LinearRegresion at: https://dagshub.com/joquifer2/mlops_fleca_project.mlflow/#/experiments/3/runs/ac79f950067d4b7d86979647d2b8ad07
🧪 View experiment at: https://dagshub.com/joquifer2/mlops_fleca_project.mlflow/#/experiments/3
🏃 View run LinearRegresion at: https://dagshub.com/joquifer2/mlops_fleca_project.mlflow/#/experiments/3/runs/ac79f950067d4b7d86979647d2b8ad07
🧪 View experiment at: https://dagshub.com/joquifer2/mlops_fleca_project.mlflow/#/experiments/3
✅ Ridge guardado como artefacto - MAE: 181.89 - Run ID: 6f205d03c8cb46938d9c771d844bdcac
2025-09-02 17:54:26,856 INFO: Modelo: Ridge - MAE: 181.89402786424205, MSE: 53159.90578118782, R2: 0.41761366422442714
✅ Ridge guardado como artefacto - MAE: 181.89 - Run ID: 6f205d03c8cb46938d9c771d844bdcac
2

In [16]:
import mlflow
experiment = mlflow.get_experiment_by_name("fleca_bolleria_experiments_v2")
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
best_run = runs.sort_values('metrics.mae')
print("Mejor run ID:", best_run.run_id)

Mejor run ID: 0     36e51ff774c8440e85f092c35be63fd2
4     06e656762a774e97b8435a9a605ddc55
2     6f205d03c8cb46938d9c771d844bdcac
6     cf13b2ca47c64f468d34ebb4fb294886
10    2d326df0ebea4db8a5ec5072d28fcf12
12    4746935cfcb74069ac1316a8eb5db44b
8     81c963339bcf48f393aef0bee4e35787
1     ac3eca7aa0f84f78a9ebda2472bf854e
5     4d5da9206f4f4eb29d3cfe546163f16c
9     2754a0bb85b84a60a2e2bda64becc8ee
13    433cfb25b4b444d88e9c3cbd6b87055b
3     ac79f950067d4b7d86979647d2b8ad07
7     0dc390a7e2a244ee986bc831670ed4c1
11    90033ecef3404b739909348ef57da139
14    cf4d0fe8934049ec995627d6bd19f5ba
Name: run_id, dtype: object


In [17]:
# Registramos el mejor modelo en MLflow
# Id del run del RandonForest

run_id = "06e656762a774e97b8435a9a605ddc55"
model_uri = f"runs:/{run_id}/model"

# Registrar el modelo en MLflow Model Registry
mlflow.register_model(
    model_uri=model_uri,
    name="RandomForest_10"
)
   



Registered model 'RandomForest_10' already exists. Creating a new version of this model...


RestException: INTERNAL_ERROR: Response: {'error': 'unsupported endpoint, please contact support@dagshub.com'}

In [18]:
# Cargamos el modelo directamente desde los artefactos del run (no desde Model Registry)
import pickle

# Usa el run_id del mejor modelo (por ejemplo, de la variable best_run o MODELO_REGISTRADO_INFO)
run_id = "06e656762a774e97b8435a9a605ddc55"  # Puedes usar best_run.run_id si lo tienes dinámico
artifact_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model")
model_file = artifact_path + "/model.pkl"

with open(model_file, "rb") as f:
	model = pickle.load(f)

# El modelo está listo para usar: model.predict(X_test)

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

In [19]:
# Realizamos una predcción con el modelo cargado
sample_data = X_test
predictions = model.predict(sample_data)
print(predictions)

[ 768.5264  713.5075  768.7331  753.5327  740.1118  734.7466  694.5313
  747.4214  860.2902 1409.664  1500.501  1540.3167 1562.5022 1577.8163
 1654.0649]


In [20]:
# Evaluamos el model cargado
y_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
print(f"MAE: {mae}")

MAE: 178.9245266666668
