In [1]:
import os, mlflow
from dotenv import load_dotenv

load_dotenv(override=True)  
EXPERIMENT_NAME = "/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments"

mlflow.set_tracking_uri("databricks")
experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
print("tracking:", mlflow.get_tracking_uri())
print("experiment:", experiment)

tracking: databricks
experiment: <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/3998396881915768', creation_time=1761273605949, experiment_id='3998396881915768', last_update_time=1761611791442, lifecycle_stage='active', name='/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments', tags={'mlflow.experiment.sourceName': '/Users/esteban.berumen@iteso.mx/nyc-taxi-experiments',
 'mlflow.experimentKind': 'custom_model_development',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'esteban.berumen@iteso.mx',
 'mlflow.ownerId': '4857186035421233'}>


In [2]:
import pickle
import pandas as pd
import numpy as np
from sklearn.metrics import root_mean_squared_error
from sklearn.feature_extraction import DictVectorizer

DATA_DIR = "../data"  

In [3]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

df_train = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-01.parquet'))
df_val   = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-02.parquet'))
print(df_train.shape, df_val.shape)

(46307, 22) (44218, 22)


In [4]:
def preprocess(df, dv):
    df = df.copy()
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    records = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(records)

df_train = df_train.copy()
df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
categorical = ['PU_DO']
numerical = ['trip_distance']
dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)
X_val   = preprocess(df_val, dv)

target = 'duration'
y_train = df_train[target].values
y_val   = df_val[target].values
print(X_train.shape, X_val.shape)

(46307, 4159) (44218, 4159)


In [5]:
import math
import optuna
from optuna.samplers import TPESampler
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor

mlflow.sklearn.autolog(log_models=False)

def make_input_example(X_sparse, dv, n=5):
    arr = X_sparse[:n].toarray() if hasattr(X_sparse, "toarray") else np.asarray(X_sparse[:n])
    cols = dv.get_feature_names_out()
    return pd.DataFrame(arr, columns=cols)

def log_preprocessor(dv, artifact_dir="preprocessor"):
    os.makedirs(artifact_dir, exist_ok=True)
    with open(os.path.join(artifact_dir, "preprocessor.b"), "wb") as f_out:
        pickle.dump(dv, f_out)
    mlflow.log_artifact(os.path.join(artifact_dir, "preprocessor.b"), artifact_path="preprocessor")

def rmse_for(model, X, y):
    y_hat = model.predict(X)
    return float(root_mean_squared_error(y, y_hat))

In [6]:
import numpy as np
import pandas as pd
from time import perf_counter
import optuna

_cols = dv.get_feature_names_out()
X_train_df = pd.DataFrame(X_train.toarray(), columns=_cols)
X_val_df   = pd.DataFrame(X_val.toarray(),   columns=_cols)

# Submuestra para HPO (ajusta N a tu máquina; 50k–150k suele ir bien)
N = 100_000
rng = np.random.default_rng(42)
if len(X_train_df) > N:
    _idx = rng.choice(len(X_train_df), size=N, replace=False)
    Xtr_small = X_train_df.iloc[_idx]
    ytr_small = y_train[_idx]
else:
    Xtr_small = X_train_df
    ytr_small = y_train

Xva_small = X_val_df  

optuna.logging.set_verbosity(optuna.logging.INFO)
_t0 = perf_counter()

def trial_report_callback(study: optuna.Study, trial: optuna.trial.FrozenTrial):
    elapsed = perf_counter() - _t0
    best = study.best_value if study.best_value is not None else float('nan')
    print(f"[{elapsed:7.1f}s] Trial {trial.number:03d} finished -> value={trial.value:.5f} | best={best:.5f}")


In [7]:
from sklearn.ensemble import GradientBoostingRegressor
from mlflow.models.signature import infer_signature
import mlflow

def objective_gbr(trial: optuna.trial.Trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 120),
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.5, log=True),
        "max_depth": trial.suggest_int("max_depth", 2, 4),
        "subsample": trial.suggest_float("subsample", 0.7, 1.0),
        "random_state": 42,
        "verbose": 1,  
    }

    print(f"[GBR] Trial {trial.number} → {params}")  

    with mlflow.start_run(nested=True, run_name=f"gbr-{trial.number:03d}"):
        mlflow.set_tag("model_family", "gradient_boosting")
        mlflow.log_params(params)

        model = GradientBoostingRegressor(**params)
        model.fit(Xtr_small, ytr_small)  
        rmse = rmse_for(model, Xva_small, y_val)
        mlflow.log_metric("rmse", rmse)

        input_example = Xva_small.head(5)
        signature = infer_signature(input_example, model.predict(input_example))

        log_preprocessor(dv)
        mlflow.sklearn.log_model(
            sk_model=model,
            name="model",
            input_example=input_example,
            signature=signature
        )
    return rmse

from optuna.samplers import TPESampler
sampler = TPESampler(seed=42)
study_gbr = optuna.create_study(direction="minimize", sampler=sampler)
with mlflow.start_run(run_name="Parent: GradientBoosting (Optuna)"):
    # ⏱️ límite duro y callback de progreso
    study_gbr.optimize(objective_gbr, n_trials=15, timeout=900, callbacks=[trial_report_callback])
    mlflow.log_params({f"best_{k}": v for k, v in study_gbr.best_params.items()})
    mlflow.log_metric("best_rmse", study_gbr.best_value)
print("GBR best RMSE:", study_gbr.best_value)

[I 2025-10-28 19:09:27,041] A new study created in memory with name: no-name-831b25fb-995d-4225-b47f-5145c121bcbc


[GBR] Trial 0 → {'n_estimators': 107, 'learning_rate': 0.36808608148776095, 'max_depth': 4, 'subsample': 0.8795975452591109, 'random_state': 42, 'verbose': 1}
      Iter       Train Loss      OOB Improve   Remaining Time 
         1          46.8029          27.7628           38.90m
         2          35.7744          14.1128           41.24m
         3          30.9456           4.4122           40.36m
         4          28.6129           0.4772           39.74m
         5          27.5149           0.2560           38.12m
         6          26.9268          -0.3016           37.21m
         7          26.4978          -0.7494           37.63m
         8          26.5262           1.7761           38.25m
         9          26.4284           0.3995           39.59m
        10          26.5189           1.3849           41.34m
        20          25.3560           0.3493           41.84m
        30          24.6279          -1.8712           36.55m
🏃 View run gbr-000 at: https://adb

[W 2025-10-28 19:38:51,562] Trial 0 failed with parameters: {'n_estimators': 107, 'learning_rate': 0.36808608148776095, 'max_depth': 4, 'subsample': 0.8795975452591109} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "c:\Users\esteb\apps\PC-20205\nyc-taxi-predictions-2025\.venv\Lib\site-packages\optuna\study\_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "C:\Users\esteb\AppData\Local\Temp\ipykernel_8980\1682021020.py", line 22, in objective_gbr
    model.fit(Xtr_small, ytr_small)
  File "c:\Users\esteb\apps\PC-20205\nyc-taxi-predictions-2025\.venv\Lib\site-packages\mlflow\utils\autologging_utils\safety.py", line 483, in safe_patch_function
    patch_function(call_original, *args, **kwargs)
  File "c:\Users\esteb\apps\PC-20205\nyc-taxi-predictions-2025\.venv\Lib\site-packages\mlflow\utils\autologging_utils\safety.py", line 181, in patch_with_managed_run
    result = patch_fu

🏃 View run Parent: GradientBoosting (Optuna) at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768/runs/c3a2d0506d05408ca48a190824fe35bf
🧪 View experiment at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768


KeyboardInterrupt: 

In [None]:
from sklearn.ensemble import RandomForestRegressor

def objective_rf(trial: optuna.trial.Trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 200, 220),
        "max_depth": trial.suggest_int("max_depth", 5, 10),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 10),
        "max_features": trial.suggest_categorical("max_features", ["sqrt", "log2", None]),
        "random_state": 42,
        "n_jobs": -1,
        "verbose": 1,  
    }

    print(f"[RF ] Trial {trial.number} → {params}")  

    with mlflow.start_run(nested=True, run_name=f"rf-{trial.number:03d}"):
        mlflow.set_tag("model_family", "random_forest")
        mlflow.log_params(params)

        model = RandomForestRegressor(**params)
        model.fit(Xtr_small, ytr_small)  
        rmse = rmse_for(model, Xva_small, y_val)
        mlflow.log_metric("rmse", rmse)

        input_example = Xva_small.head(5)
        signature = infer_signature(input_example, model.predict(input_example))

        log_preprocessor(dv)
        mlflow.sklearn.log_model(
            sk_model=model,
            name="model",
            input_example=input_example,
            signature=signature
        )
    return rmse

sampler2 = TPESampler(seed=42)
study_rf = optuna.create_study(direction="minimize", sampler=sampler2)
with mlflow.start_run(run_name="Parent: RandomForest (Optuna)"):
    # ⏱️ límite duro y callback de progreso
    study_rf.optimize(objective_rf, n_trials=15, timeout=900, callbacks=[trial_report_callback])
    mlflow.log_params({f"best_{k}": v for k, v in study_rf.best_params.items()})
    mlflow.log_metric("best_rmse", study_rf.best_value)
print("RF best RMSE:", study_rf.best_value)

[I 2025-10-27 18:23:28,407] A new study created in memory with name: no-name-c091a8c6-f816-44f2-a822-82367b2f896e


[RF ] Trial 0 → {'n_estimators': 219, 'max_depth': 39, 'min_samples_split': 15, 'min_samples_leaf': 6, 'max_features': 'sqrt', 'random_state': 42, 'n_jobs': -1, 'verbose': 1}


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:   20.2s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:  1.9min
[Parallel(n_jobs=-1)]: Done 219 out of 219 | elapsed:  2.2min finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.1s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    1.0s
[Parallel(n_jobs=8)]: Done 219 out of 219 | elapsed:    1.1s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.1s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.7s
[Parallel(n_jobs=8)]: Done 219 out of 219 | elapsed:    0.8s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.4s
[Parallel(n_jobs=8)]: Done 184 tasks   

Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 219 out of 219 | elapsed:    0.0s finished
2025/10/27 18:36:12 INFO mlflow.models.model: Found the following environment variables used during model inference: [DATABRICKS_HOST, DATABRICKS_TOKEN]. Please check if you need to set them when deploying the model. To disable this message, set environment variable `MLFLOW_RECORD_ENV_VARS_IN_MODEL_LOGGING` to `false`.


🏃 View run rf-000 at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768/runs/df399841868e467891ef0139b0a22880
🧪 View experiment at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768


[I 2025-10-27 18:36:31,433] Trial 0 finished with value: 7.1339753625389895 and parameters: {'n_estimators': 219, 'max_depth': 39, 'min_samples_split': 15, 'min_samples_leaf': 6, 'max_features': 'sqrt'}. Best is trial 0 with value: 7.1339753625389895.


[ 2581.5s] Trial 000 finished -> value=7.13398 | best=7.13398
[RF ] Trial 1 → {'n_estimators': 244, 'max_depth': 26, 'min_samples_split': 15, 'min_samples_leaf': 1, 'max_features': 'sqrt', 'random_state': 42, 'n_jobs': -1, 'verbose': 1}


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:   14.5s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:  1.3min
[Parallel(n_jobs=-1)]: Done 244 out of 244 | elapsed:  1.7min finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.2s
[Parallel(n_jobs=8)]: Done 244 out of 244 | elapsed:    0.3s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.2s
[Parallel(n_jobs=8)]: Done 244 out of 244 | elapsed:    0.3s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.3s
[Parallel(n_jobs=8)]: Done 184 tasks   

Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=8)]: Done 244 out of 244 | elapsed:    0.0s finished
2025/10/27 18:50:10 INFO mlflow.models.model: Found the following environment variables used during model inference: [DATABRICKS_HOST, DATABRICKS_TOKEN]. Please check if you need to set them when deploying the model. To disable this message, set environment variable `MLFLOW_RECORD_ENV_VARS_IN_MODEL_LOGGING` to `false`.
[I 2025-10-27 18:50:24,261] Trial 1 finished with value: 7.617126751954723 and parameters: {'n_estimators': 244, 'max_depth': 26, 'min_samples_split': 15, 'min_samples_leaf': 1, 'max_features': 'sqrt'}. Best is trial 0 with value: 7.1339753625389895.


🏃 View run rf-001 at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768/runs/9dd974e7226548ac9e8627024b6181a1
🧪 View experiment at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768
[ 3414.4s] Trial 001 finished -> value=7.61713 | best=7.13398
🏃 View run Parent: RandomForest (Optuna) at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768/runs/827bc88b4efe4703bd3bebea33dabbaa
🧪 View experiment at: https://adb-3598518169245956.16.azuredatabricks.net/ml/experiments/3998396881915768
RF best RMSE: 7.1339753625389895


In [None]:
from mlflow import MlflowClient
from datetime import datetime

model_registry_name = "time_series.default.nyc-taxi-model"  

runs = mlflow.search_runs(
    experiment_names=[EXPERIMENT_NAME],
    filter_string="",
    order_by=["metrics.rmse ASC"],
    output_format="list"
)
assert len(runs) > 0, "No se encontraron runs con métrica RMSE"
best_run = runs[0]
print("\n🏆 Challenger candidate:")
print("Run ID:", best_run.info.run_id)
print("RMSE:", best_run.data.metrics.get("rmse"))
print("Tags:", best_run.data.tags)

result = mlflow.register_model(
    model_uri=f"runs:/{best_run.info.run_id}/model",
    name=model_registry_name
)
print("Registered as version:", result.version)

client = MlflowClient()

client.set_registered_model_alias(
    name=model_registry_name,
    alias="challenger",
    version=result.version
)

client.update_model_version(
    name=model_registry_name,
    version=result.version,
    description=f"Challenger registrado desde run {best_run.info.run_id} con RMSE={best_run.data.metrics.get('rmse')} el {datetime.today()}"
)
print("Alias 'challenger' actualizado a la versión", result.version)


🏆 Challenger candidate:
Run ID: 391a4d75c8944b6aaf705c0df3fbfd62
RMSE: 5.410463874732254
Tags: {'mlflow.parentRunId': 'cdd16d4123a34a59be4e2ab1f2fa30b9', 'mlflow.rootRunId': 'cdd16d4123a34a59be4e2ab1f2fa30b9', 'mlflow.runColor': '#229487', 'mlflow.runName': 'bustling-vole-670', 'mlflow.source.name': 'c:\\Users\\esteb\\apps\\PC-20205\\nyc-taxi-predictions-2025\\.venv\\Lib\\site-packages\\ipykernel_launcher.py', 'mlflow.source.type': 'LOCAL', 'mlflow.user': 'esteban.berumen@iteso.mx', 'model_family': 'xgboost'}


Registered model 'time_series.default.nyc-taxi-model' already exists. Creating a new version of this model...


Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/8 [00:00<?, ?it/s]

Created version '7' of model 'time_series.default.nyc-taxi-model'.


Registered as version: 7
Alias 'challenger' actualizado a la versión 7


In [None]:
# df_mar = read_dataframe(os.path.join(DATA_DIR, 'green_tripdata_2025-03.parquet'))
# X_mar  = preprocess(df_mar, dv)
# y_mar  = df_mar[target].values
# input_example_mar = make_input_example(X_mar, dv)
# print(df_mar.shape, X_mar.shape)

FileNotFoundError: [Errno 2] No such file or directory: '../data\\green_tripdata_2025-03.parquet'

In [None]:
import mlflow.pyfunc

champion_uri   = f"models:/{model_registry_name}@Champion"
challenger_uri = f"models:/{model_registry_name}@Challenger"

print("Loading Champion:", champion_uri)
print("Loading Challenger:", challenger_uri)

champion = mlflow.pyfunc.load_model(champion_uri)
challenger = mlflow.pyfunc.load_model(challenger_uri)

# Algunos modelos registrados con sklearn pueden requerir arrays densos
X_mar_dense = X_mar.toarray() if hasattr(X_mar, "toarray") else X_mar

yhat_champion   = champion.predict(input_example_mar)
yhat_challenger = challenger.predict(input_example_mar)

# Para la métrica real usamos todo el set de marzo
yhat_c_full = champion.predict(X_mar_dense)
yhat_n_full = challenger.predict(X_mar_dense)

rmse_champion   = float(root_mean_squared_error(y_mar, yhat_c_full))
rmse_challenger = float(root_mean_squared_error(y_mar, yhat_n_full))

print({
    "rmse_champion": rmse_champion,
    "rmse_challenger": rmse_challenger
})