In [1]:
import os, mlflow
from dotenv import load_dotenv

load_dotenv(override=True)  # Carga las variables del archivo .env
EXPERIMENT_NAME = "/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments"

mlflow.set_tracking_uri("databricks")
experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
print("tracking:", mlflow.get_tracking_uri())
print("experiment:", experiment)

tracking: databricks
experiment: <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/3998396881915768', creation_time=1761273605949, experiment_id='3998396881915768', last_update_time=1761590368439, lifecycle_stage='active', name='/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments', tags={'mlflow.experiment.sourceName': '/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments',
 'mlflow.experimentKind': 'custom_model_development',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'esteban.berumen@iteso.mx',
 'mlflow.ownerId': '4857186035421233'}>


In [2]:
import pickle
import pandas as pd
import numpy as np
from sklearn.metrics import root_mean_squared_error
from sklearn.feature_extraction import DictVectorizer

DATA_DIR = "../data"  # ajusta si tu estructura difiere

In [3]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

df_train = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-01.parquet'))
df_val   = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-02.parquet'))
print(df_train.shape, df_val.shape)

(46307, 22) (44218, 22)


In [4]:
def preprocess(df, dv):
    df = df.copy()
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    records = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(records)

df_train = df_train.copy()
df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
categorical = ['PU_DO']
numerical = ['trip_distance']
dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)
X_val   = preprocess(df_val, dv)

target = 'duration'
y_train = df_train[target].values
y_val   = df_val[target].values
print(X_train.shape, X_val.shape)

(46307, 4159) (44218, 4159)


In [5]:
import math
import optuna
from optuna.samplers import TPESampler
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor

mlflow.sklearn.autolog(log_models=False)

# Utilidades comunes
def make_input_example(X_sparse, dv, n=5):
    arr = X_sparse[:n].toarray() if hasattr(X_sparse, "toarray") else np.asarray(X_sparse[:n])
    cols = dv.get_feature_names_out()
    return pd.DataFrame(arr, columns=cols)

def log_preprocessor(dv, artifact_dir="preprocessor"):
    os.makedirs(artifact_dir, exist_ok=True)
    with open(os.path.join(artifact_dir, "preprocessor.b"), "wb") as f_out:
        pickle.dump(dv, f_out)
    mlflow.log_artifact(os.path.join(artifact_dir, "preprocessor.b"), artifact_path="preprocessor")

def rmse_for(model, X, y):
    y_hat = model.predict(X)
    return float(root_mean_squared_error(y, y_hat))

## Parent 1: Gradient Boosting (Optuna)

In [None]:
# ========= Gradient Boosting (Optuna) CORREGIDO =========
def objective_gbr(trial: optuna.trial.Trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 120),
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.5, log=True),
        "max_depth": trial.suggest_int("max_depth", 2, 4),
        "subsample": trial.suggest_float("subsample", 0.7, 1.0),
        "random_state": 42,
    }

    with mlflow.start_run(nested=True):
        mlflow.set_tag("model_family", "gradient_boosting")
        mlflow.log_params(params)

        # Densificar y mantener nombres de columnas (evita el warning)
        cols = dv.get_feature_names_out()
        Xtr = pd.DataFrame(X_train.toarray(), columns=cols)
        Xva = pd.DataFrame(X_val.toarray(),   columns=cols)

        model = GradientBoostingRegressor(**params)
        model.fit(Xtr, y_train)
        rmse = rmse_for(model, Xva, y_val)
        mlflow.log_metric("rmse", rmse)

        # Input example y signature consistentes (DataFrame con columnas)
        input_example = Xva.head(5)
        signature = infer_signature(input_example, model.predict(input_example))

        log_preprocessor(dv)
        mlflow.sklearn.log_model(
            sk_model=model,
            name="model",  # <-- en lugar de artifact_path
            input_example=input_example,
            signature=signature
        )
    return rmse

sampler = TPESampler(seed=42)
study_gbr = optuna.create_study(direction="minimize", sampler=sampler)
with mlflow.start_run(run_name="Parent: GradientBoosting (Optuna)"):
    study_gbr.optimize(objective_gbr, n_trials=15)  # si quieres, agrega timeout=
    mlflow.log_params({f"best_{k}": v for k, v in study_gbr.best_params.items()})
    mlflow.log_metric("best_rmse", study_gbr.best_value)
print("GBR best RMSE:", study_gbr.best_value)


[I 2025-10-27 13:17:41,811] A new study created in memory with name: no-name-44335e74-2d2f-4049-8971-c76e3d081638


## Parent 2: Random Forest (Optuna)

In [None]:
# ========= Random Forest (Optuna) CORREGIDO =========
def objective_rf(trial: optuna.trial.Trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 200, 250),
        "max_depth": trial.suggest_int("max_depth", 5, 40),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 10),
        "max_features": trial.suggest_categorical("max_features", ["sqrt", "log2", None]),
        "random_state": 42,
        "n_jobs": -1,
    }

    with mlflow.start_run(nested=True):
        mlflow.set_tag("model_family", "random_forest")
        mlflow.log_params(params)

        # RF: también en DataFrame con columnas para total consistencia
        cols = dv.get_feature_names_out()
        Xtr = pd.DataFrame(X_train.toarray(), columns=cols)
        Xva = pd.DataFrame(X_val.toarray(),   columns=cols)

        model = RandomForestRegressor(**params)
        model.fit(Xtr, y_train)
        rmse = rmse_for(model, Xva, y_val)
        mlflow.log_metric("rmse", rmse)

        input_example = Xva.head(5)
        signature = infer_signature(input_example, model.predict(input_example))

        log_preprocessor(dv)
        mlflow.sklearn.log_model(
            sk_model=model,
            name="model",  # <-- en lugar de artifact_path
            input_example=input_example,
            signature=signature
        )
    return rmse

sampler2 = TPESampler(seed=42)
study_rf = optuna.create_study(direction="minimize", sampler=sampler2)
with mlflow.start_run(run_name="Parent: RandomForest (Optuna)"):
    study_rf.optimize(objective_rf, n_trials=15)  # si quieres, agrega timeout=
    mlflow.log_params({f"best_{k}": v for k, v in study_rf.best_params.items()})
    mlflow.log_metric("best_rmse", study_rf.best_value)
print("RF best RMSE:", study_rf.best_value)


## Selección de Challenger (mejor RMSE global) y registro en Model Registry

In [None]:
from mlflow import MlflowClient

model_registry_name = "time_series.default.nyc-taxi-model"  # mismo modelo del proyecto

# Buscar el mejor run global por RMSE entre ambos parents dentro del experimento
runs = mlflow.search_runs(
    experiment_names=[EXPERIMENT_NAME],
    filter_string="",
    order_by=["metrics.rmse ASC"],
    output_format="list"
)
assert len(runs) > 0, "No se encontraron runs con métrica RMSE"
best_run = runs[0]
print("\n🏆 Challenger candidate:")
print("Run ID:", best_run.info.run_id)
print("RMSE:", best_run.data.metrics.get("rmse"))
print("Tags:", best_run.data.tags)

# Registrar el modelo del mejor run como nueva versión
result = mlflow.register_model(
    model_uri=f"runs:/{best_run.info.run_id}/model",
    name=model_registry_name
)
print("Registered as version:", result.version)

client = MlflowClient()

# Asignar alias 'challenger'
client.set_registered_model_alias(
    name=model_registry_name,
    alias="challenger",
    version=result.version
)

# Documentar
client.update_model_version(
    name=model_registry_name,
    version=result.version,
    description=f"Challenger registrado desde run {best_run.info.run_id} con RMSE={best_run.data.metrics.get('rmse')} el {datetime.today()}"
)
print("Alias 'challenger' actualizado a la versión", result.version)

## Evaluación en Marzo 2025: Champion vs Challenger

In [None]:
# Asegúrate de colocar el archivo en ../data/green_tripdata_2025-03.parquet
df_mar = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-03.parquet'))
X_mar  = preprocess(df_mar, dv)
y_mar  = df_mar[target].values
input_example_mar = make_input_example(X_mar, dv)
print(df_mar.shape, X_mar.shape)

In [None]:
import mlflow.pyfunc

champion_uri   = f"models:/{model_registry_name}@Champion"
challenger_uri = f"models:/{model_registry_name}@Challenger"

print("Loading Champion:", champion_uri)
print("Loading Challenger:", challenger_uri)

champion = mlflow.pyfunc.load_model(champion_uri)
challenger = mlflow.pyfunc.load_model(challenger_uri)

# Algunos modelos registrados con sklearn pueden requerir arrays densos
X_mar_dense = X_mar.toarray() if hasattr(X_mar, "toarray") else X_mar

yhat_champion   = champion.predict(input_example_mar)
yhat_challenger = challenger.predict(input_example_mar)

# Para la métrica real usamos todo el set de marzo
yhat_c_full = champion.predict(X_mar_dense)
yhat_n_full = challenger.predict(X_mar_dense)

rmse_champion   = float(root_mean_squared_error(y_mar, yhat_c_full))
rmse_challenger = float(root_mean_squared_error(y_mar, yhat_n_full))

print({
    "rmse_champion": rmse_champion,
    "rmse_challenger": rmse_challenger
})

## Decisión de promoción automática (opcional)

In [None]:
PROMOTION_DELTA = 0.05  # mejora mínima absoluta en RMSE (minutos)

promote = (rmse_challenger <= rmse_champion - PROMOTION_DELTA)
print("Candidato a promover a Champion?", promote)

if promote:
    # Obtener versión apuntada por 'challenger'
    model = client.get_registered_model(model_registry_name)
    # Buscar la versión por alias
    versions = client.search_model_versions(f"name='{model_registry_name}'")
    challenger_version = None
    for v in versions:
        aliases = set(getattr(v, 'aliases', []) or [])
        if 'challenger' in aliases:
            challenger_version = v.version
            break
    assert challenger_version is not None, "No se encontró versión con alias 'challenger'"

    # Mover alias 'Champion' a la versión challenger
    client.set_registered_model_alias(
        name=model_registry_name,
        alias="Champion",
        version=challenger_version
    )

    client.update_model_version(
        name=model_registry_name,
        version=challenger_version,
        description=(
            f"Promovido a Champion el {datetime.today()} tras evaluación en marzo 2025. "
            f"RMSE Champion previo: {rmse_champion:.4f}, RMSE Challenger: {rmse_challenger:.4f}."
        )
    )
    print(f"✔ Alias 'Champion' ahora apunta a la versión {challenger_version}")
else:
    print("❌ No se promueve: ganancia insuficiente según criterio actual.")

## Evidencia para el PR
- Prints de `run_id`, `rmse`, versión registrada y aliases.
- Diccionario con RMSE de Champion vs Challenger en marzo.
- Decisión (y, si aplica, confirmación de movimiento de alias).