# Week 2 - Experiment Tracking and Model Registry with MLFlow

## 1. Installation

In [1]:
!mlflow --version

mlflow, version 1.26.0


## 2. Download and preprocess

Use green taxi trip records from 2021-01 to 2021-03

In [2]:
# URL format: https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.parquet
# looking for 2021-01, 02, 03
# use requests

import requests
from pathlib import Path

def download_from_url(url: str, file_dir: str):
    """Wrapper function for requests library to stream download,
    i.e. without needing to store entire file in memory, and allows
    download to proceed in chunks

    Args:
    url: string
        direct url to the file for download, e.g.
        https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.parquet

    file_dir: string
        path to the download destination directory
    """
    file_dir = Path(file_dir)
    if not file_dir:
        Path.mkdir(file_dir)

    # local_file = url.split('/')[-1].replace(" ", "_")
    # use built-in method to extract filename:
    local_file = Path(url).name
    local_path = Path(file_dir) / local_file

    if not local_path.exists():
        r = requests.get(url, stream=True)
        if r.ok:
            print('Saving to ', local_path)
            with open(local_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024*8):
                    # if chunk:
                    # iter_content will never return None type
                    f.write(chunk)
                    # f.flush()
                    # os.fsync(f.fileno())
        else:
            # HTTP status 4xx/5xx
            print(f'Download failed with code {r.status_code}\n{r.text}')
    else:
        print(f'File already exists:\n{local_path}')

In [3]:
# url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.parquet'
parquet_urls = [f'https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-{month:02}.parquet'
                for month in range(1, 4)] 
dest_path = Path.cwd().parents[1] / 'data'

for url in parquet_urls:
    download_from_url(url, dest_path)

Saving to  /home/kohada/mlops-notes/data/green_tripdata_2021-01.parquet
Saving to  /home/kohada/mlops-notes/data/green_tripdata_2021-02.parquet
Saving to  /home/kohada/mlops-notes/data/green_tripdata_2021-03.parquet


### Preprocess

Run `preprocess_data.py` and examine the outputs

In [4]:
!python preprocess_data.py --raw_data_path ~/mlops-notes/data --dest_path ./output

In [5]:
output_path = Path.cwd() / 'output'
list(output_path.glob('*.*'))

[PosixPath('/home/kohada/mlops-notes/notebooks/w2-mlflow/output/dv.pkl'),
 PosixPath('/home/kohada/mlops-notes/notebooks/w2-mlflow/output/valid.pkl'),
 PosixPath('/home/kohada/mlops-notes/notebooks/w2-mlflow/output/test.pkl'),
 PosixPath('/home/kohada/mlops-notes/notebooks/w2-mlflow/output/train.pkl')]

## 3. Train with autolog

Use random forest regressor via `train.py`. The script loads the outputs from previous step, trains model on `train.pkl`, and calculates RMSE on `valid.pkl`.

Modify so that MLflow's **autolog** is enabled. Launch MLflow UI to confirm tracking

In [6]:
import pandas as pd
import xgboost as xgb
import mlflow
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge

from sklearn.metrics import mean_squared_error

import pickle

  from pandas import MultiIndex, Int64Index


In [None]:
# train.py
import argparse
import os
import pickle
import mlflow

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

def run(data_path):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))

    ## set mlflow here
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("nyc-taxi-experiment")
    
    mlflow.sklearn.autolog()

    with mlflow.start_run():
        
        rf = RandomForestRegressor(max_depth=10, random_state=0)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_valid)

        rmse = mean_squared_error(y_valid, y_pred, squared=False)


if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--data_path",
        default="./output",
        help="the location where the processed NYC taxi trip data was saved."
    )
    args = parser.parse_args()

    run(args.data_path)


In [7]:
!python train.py



MLflow autolog recorded 17 parameters:

* bootstrap
* ccp_alpha
* ...
* warm_start

## 4. Launch tracking server locally

Having tracking server allows access to model registry, and management of the entire ML model lifecycle.

If our tracking server relies on SQL backend (specified via `--backend-store-uri`), we must also specify `--default-artifact-root` URI if we want to store artifacts. Defaults to `./mlruns`

In [10]:
# !mlflow server \
#     --backend-store-uri sqlite:///mlflow.db \
#     --default-artifact-root ./mlruns

[2022-05-27 01:09:26 +0800] [9867] [INFO] Starting gunicorn 20.1.0
[2022-05-27 01:09:26 +0800] [9867] [INFO] Listening at: http://127.0.0.1:5001 (9867)
[2022-05-27 01:09:26 +0800] [9867] [INFO] Using worker: sync
[2022-05-27 01:09:26 +0800] [9868] [INFO] Booting worker with pid: 9868
[2022-05-27 01:09:26 +0800] [9869] [INFO] Booting worker with pid: 9869
[2022-05-27 01:09:26 +0800] [9870] [INFO] Booting worker with pid: 9870
[2022-05-27 01:09:26 +0800] [9871] [INFO] Booting worker with pid: 9871
^C
[2022-05-27 01:11:03 +0800] [9867] [INFO] Handling signal: int
[2022-05-27 01:11:03 +0800] [9869] [INFO] Worker exiting (pid: 9869)
[2022-05-27 01:11:03 +0800] [9868] [INFO] Worker exiting (pid: 9868)
[2022-05-27 01:11:03 +0800] [9870] [INFO] Worker exiting (pid: 9870)
[2022-05-27 01:11:03 +0800] [9871] [INFO] Worker exiting (pid: 9871)


## 5. Tune hyperparameter

Using `hyperopt` and the `hpo.py` script, tune the hyperparams of our random forest regressor.

Modify `hpo.py` to log validation RMSE is logged to MLflow.

Track experiment named `random-forest-hperopt`.

Log only:

* list of hypeparams passed to `objective` func
* RMSE of validation (2021-02)

What's the best validation RMSE?

Modified `objective` function below:
```python
    def objective(params):

        with mlflow.start_run():
            mlflow.set_tag('model', 'random-forest-regressor')
            # log only the hyperparameters passed
            mlflow.log_params(params)

            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_valid)
            rmse = mean_squared_error(y_valid, y_pred, squared=False)

            mlflow.log_metric("rmse", rmse)
```
Lowest RMSE: 6.628

## 6. Promote best model to model registry

`register_model.py` is a script which checks previous step's results and selects top 5 runs to test using the 2021-03 dataset. Results will be saved to a new experiment - `random-forest-best-models`.

Modify `register_model.py` so it selects the lowest RMSE.

* use `search_runs` from MLflowClient
* `mlflow.register_model` to register
    * pass model_uri: `"runs:/<RUN_ID>/model"` and name of model.

What is test RMSE of best model?

In [2]:
import mlflow

from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

mlflow.set_tracking_uri("http://127.0.0.1:5000")

In [3]:
def print_run_info(runs):
    """
    From https://www.mlflow.org/docs/latest/python_api/mlflow.tracking.html
    """
    for r in runs:
        print("run_id: {}".format(r.info.run_id))
        print("lifecycle_stage: {}".format(r.info.lifecycle_stage))
        print("metrics: {}".format(r.data.metrics))

        # Exclude mlflow system tags
        tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
        print("tags: {}".format(tags))

In [4]:
HPO_EXPERIMENT_NAME = "random-forest-hyperopt"

client = MlflowClient()
exp = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
runs = client.search_runs(
    experiment_ids=exp.experiment_id,
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=3,
    order_by=["metrics.rmse ASC"]
)
print_run_info(runs)

run_id: a7b143508b21408b992792adee0de367
lifecycle_stage: active
metrics: {'rmse': 6.6284257482044735}
tags: {'model': 'random-forest-regressor'}
run_id: ec5a85d045af48d6a842d3a603bbeb21
lifecycle_stage: active
metrics: {'rmse': 6.629728007710133}
tags: {'model': 'random-forest-regressor'}
run_id: 21f7637a401c41a3b8c06efb58fd9ee2
lifecycle_stage: active
metrics: {'rmse': 6.629851022038263}
tags: {'model': 'random-forest-regressor'}


In [5]:
runs[0].info.run_id

'a7b143508b21408b992792adee0de367'

Modified `run()` inside `register_model.py`:

```python
def run(data_path, log_top):

    client = MlflowClient()

    # retrieve the top_n model runs and log the models to MLflow
    experiment = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=log_top,
        order_by=["metrics.rmse ASC"]
    )
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)

    ## SEARCH FOR TOP RUN ##
    # returns PagedList of Runs
    best_run = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=1,
        order_by=["metrics.rmse ASC"]
    )

    ## register the best model ##
    # I think the '/sklearn-model' part is just decided by user
    model_uri = f'runs:/{best_run[0].info.run_id}/sklearn-model'
    mlflow.register_model(
        model_uri,
        # this part is definitely user-defined.
        'sklearn-random-forest-reg'
    )
```