In [2]:
!python -V

Python 3.10.14


# **Q1. Install MLflow**

In [16]:
!pip install mlflow scikit-learn pandas fastparquet hyperopt matplotlib



In [3]:
!mlflow --version

mlflow, version 2.13.0


# **Q2. Download and preprocess the data**

## Write the code for `preprocess_data.py`

In [4]:
%%writefile preprocess_data.py
import os
import pickle
import click
import pandas as pd

from sklearn.feature_extraction import DictVectorizer


def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)


def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df


def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv


@click.command()
@click.option(
    "--raw_data_path",
    help="Location where the raw NYC taxi trip data was saved"
)
@click.option(
    "--dest_path",
    help="Location where the resulting files will be saved"
)
def run_data_prep(raw_data_path: str, dest_path: str, dataset: str = "green"):
    # Load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-01.parquet")
    )
    df_val = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-02.parquet")
    )
    df_test = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-03.parquet")
    )

    # Extract the target
    target = 'duration'
    y_train = df_train[target].values
    y_val = df_val[target].values
    y_test = df_test[target].values

    # Fit the DictVectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_val, _ = preprocess(df_val, dv, fit_dv=False)
    X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # Create dest_path folder unless it already exists
    os.makedirs(dest_path, exist_ok=True)

    # Save DictVectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
    dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))


if __name__ == '__main__':
    run_data_prep()

Writing preprocess_data.py


In [5]:
!python preprocess_data.py --raw_data_path hw2_data --dest_path output

In [6]:
# Calculate how many files were saved to OUTPUT_FOLDER
!ls output | wc -l

       4


# **Q3. Train a model with autolog**

## Write the code for `train.py`

In [14]:
%%writefile train.py
import os
import pickle
import click
import mlflow

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
def run_train(data_path: str):
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("homework-2")
    mlflow.sklearn.autolog()
    with mlflow.start_run():
                
        mlflow.set_tag("Developer", "Burak")
        mlflow.log_param("train-data-path", "./hw2_data/green_tripdata_2023-01.parquet")
        mlflow.log_param("val-data-path", "./hw2_data/green_tripdata_2023-02.parquet")
        mlflow.log_param("test-data-path", "./hw2_data/green_tripdata_2023-03.parquet")
        
        X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
        X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
        
        max_depth, random_state = 10, 0
        rf = RandomForestRegressor(max_depth=max_depth, random_state=random_state)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_val)
        
        mlflow.log_param("max_depth", max_depth)
        mlflow.log_param("random_state", random_state)

        rmse = mean_squared_error(y_val, y_pred, squared=False)
        
        mlflow.log_metric("rmse", rmse)


if __name__ == '__main__':
    run_train()


Overwriting train.py


In [4]:
!python train.py



# **Q4. Launch the tracking server locally**

### Launched the tracking server locally on MacOS terminal using the command below.

`mlflow ui --backend-store-uri sqlite:///mlflow.db --default-artifact-root artifacts`

# **Q5. Tune model hyperparameters**

## Write the code for `hpo.py`

In [8]:
%%writefile hpo.py
import os
import pickle
import click
import mlflow
import numpy as np
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

mlflow.set_tracking_uri("http://127.0.0.1:5000") #"sqlite:///mlflow.db"
mlflow.set_experiment("random-forest-hyperopt")


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--num_trials",
    default=15,
    help="The number of parameter evaluations for the optimizer to explore"
)
def run_optimization(data_path: str, num_trials: int):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    def objective(params):
        
        with mlflow.start_run():
            mlflow.set_tag("model", "RandomForestRegressor")
            mlflow.log_params(params)
            
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_val)
            rmse = mean_squared_error(y_val, y_pred, squared=False)
            
            mlflow.log_metric("rmse", rmse)

        return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 42
    }

    rstate = np.random.default_rng(42)  # for reproducible results
    best_result = fmin(
                fn=objective,
                space=search_space,
                algo=tpe.suggest,
                max_evals=num_trials,
                trials=Trials(),
                rstate=rstate
                )


if __name__ == '__main__':
    run_optimization()

Writing hpo.py


In [9]:
!python hpo.py

2024/05/31 23:04:36 INFO mlflow.tracking.fluent: Experiment with name 'random-forest-hyperopt' does not exist. Creating a new experiment.















100%|██████████| 15/15 [00:32<00:00,  2.14s/trial, best loss: 5.335419588556921]


# **Q6. Promote the best model to the model registry**

## Write the code for `register_model.py`

In [15]:
%%writefile register_model.py
import os
import pickle
import click
import mlflow

from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

HPO_EXPERIMENT_NAME = "random-forest-hyperopt"
EXPERIMENT_NAME = "random-forest-best-models"
RF_PARAMS = ['max_depth', 'n_estimators', 'min_samples_split', 'min_samples_leaf', 'random_state']

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.sklearn.autolog()


def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


def train_and_log_model(data_path, params):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
    X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        new_params = {}
        for param in RF_PARAMS:
            new_params[param] = int(params[param])

        rf = RandomForestRegressor(**new_params)
        rf.fit(X_train, y_train)

        # Evaluate model on the validation and test sets
        val_rmse = mean_squared_error(y_val, rf.predict(X_val), squared=False)
        mlflow.log_metric("val_rmse", val_rmse)
        test_rmse = mean_squared_error(y_test, rf.predict(X_test), squared=False)
        mlflow.log_metric("test_rmse", test_rmse)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--top_n",
    default=5,
    type=int,
    help="Number of top models that need to be evaluated to decide which one to promote"
)
def run_register_model(data_path: str, top_n: int):

    client = MlflowClient()

    # Retrieve the top_n model runs and log the models
    experiment = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=top_n,
        order_by=["metrics.rmse ASC"]
    )
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # Select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
    best_run = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ALL,
        order_by=["metrics.test_rmse ASC"]
        )[0]
    
    print(f"Best run test RMSE is: {best_run.data.metrics['test_rmse']:.4f}")
    model_uri = f"runs:/{best_run.info.run_id}/model"
    # Register the best model
    mlflow.register_model(model_uri=model_uri, name="nyc-taxi-best-regressor-model")


if __name__ == '__main__':
    run_register_model()

Overwriting register_model.py


In [16]:
!python register_model.py

Best run test RMSE is: 5.5674
Registered model 'nyc-taxi-best-regressor-model' already exists. Creating a new version of this model...
2024/06/01 00:29:21 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: nyc-taxi-best-regressor-model, version 2
Created version '2' of model 'nyc-taxi-best-regressor-model'.


# **References**

[1] [https://mlflow.org/docs/latest/tracking.html#can-i-directly-access-remote-storage-without-running-the-tracking-server](https://mlflow.org/docs/latest/tracking.html#can-i-directly-access-remote-storage-without-running-the-tracking-server)

[2] [https://mlflow.org/docs/latest/tracking.html#how-runs-and-artifacts-are-recorded](https://mlflow.org/docs/latest/tracking.html#how-runs-and-artifacts-are-recorded)

[3] [https://mlflow.org/docs/latest/tracking/autolog.html#scikit-learn](https://mlflow.org/docs/latest/tracking/autolog.html#scikit-learn)

[4] [https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs](https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs)