**Every step can become a script:**

- preprocess_data.py # preprocess the data
- hpo.py # train and tuning parameters
- register_model.py # test and use best model

### 1. Preprocessing data

In [2]:
import os
import pandas as pd
import mlflow
import pickle
from sklearn.metrics import mean_squared_error
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor

In [15]:
data_path = "./data/raw/"

In [16]:
def dump_pickle(obj, filename):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

In [28]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

In [29]:
# Preprocess the features that will be used and create a dictvectorizer
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv

In [32]:
def run(raw_data_path: str, dest_path: str, dataset: str = "green"):
    # load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-01.parquet")
    )
    df_valid = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-02.parquet")
    )
    df_test = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-03.parquet")
    )

    # extract the target
    target = 'duration'
    y_train = df_train[target].values
    y_valid = df_valid[target].values
    y_test = df_test[target].values

    # fit the dictvectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_valid, _ = preprocess(df_valid, dv, fit_dv=False)
    X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # create dest_path folder unless it already exists
    os.makedirs(dest_path, exist_ok=True)

    # save dictvectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_valid, y_valid), os.path.join(dest_path, "valid.pkl"))
    dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))

In [33]:
run(data_path, './output')

### 2. Train (Will skip because the next two steps cover the same knowledge)

### 3. HPO - Hyperparameter optimizer

In [58]:
import mlflow
import numpy as np
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

In [68]:
EXPERIMENT_NAME = "experiment-to-train"

# Setting the ip/file of the model tracking server
mlflow.set_tracking_uri("http://127.0.0.1:5000")
# Set a experiment, IF IT NOW EXIST IT WILL BE CREATED
mlflow.set_experiment(EXPERIMENT_NAME)

INFO: 'experiment-to-train' does not exist. Creating a new experiment


In [69]:
def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

In [70]:
def run (data_path, num_trials):
    # Load the files
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))
    
    def objective(params):
        
        with mlflow.start_run():
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_valid)
            rmse = mean_squared_error(y_valid, y_pred, squared=False)

            mlflow.log_params(params)
            mlflow.log_metric('rmse', rmse)
            #mlflow.log_model(rf, artifact_path='models')
        return {'loss': rmse, 'status': STATUS_OK}
    
    # Define search space
    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 42
    }
    
    rstate = np.random.seed(42)  # for reproducible results
    
    fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )

In [71]:
run('output', 25)

100%|█████████████████████████████████████████████████| 25/25 [06:30<00:00, 15.61s/trial, best loss: 6.626843308791357]


### 4 Choose the best top N models from last step, test and register the best one

In [105]:
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from hyperopt import hp, space_eval
from hyperopt.pyll import scope

In [72]:
NEW_EXPERIMENT_NAME = 'experiment-to-train-register-model'

In [74]:
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment(NEW_EXPERIMENT_NAME)

# Autolog parameters and informations
mlflow.sklearn.autolog()

In [104]:
# Set space
SPACE = {
    'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
    'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
    'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
    'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
    'random_state': 42
}

In [77]:
def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

In [106]:
def train_and_log_model(data_path, params):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))
    X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        params = space_eval(SPACE, params)
        rf = RandomForestRegressor(**params)
        rf.fit(X_train, y_train)

        # evaluate model on the validation and test sets
        valid_rmse = mean_squared_error(y_valid, rf.predict(X_valid), squared=False)
        mlflow.log_metric("valid_rmse", valid_rmse)
        test_rmse = mean_squared_error(y_test, rf.predict(X_test), squared=False)
        mlflow.log_metric("test_rmse", test_rmse)

In [112]:
def run(data_path, log_top, PAST_EXPERIMENT, NEW_EXPERIMENT_NAME, register_name):

    # Start the client
    client = MlflowClient()
    
    # retrieve the top_n model runs and log the models to MLflow
    experiment = client.get_experiment_by_name(PAST_EXPERIMENT)
    
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=log_top,
        order_by=["metrics.rmse ASC"]
    )
    
    # will rerun the best 5 experiments using train, val and test
    # it will show the experiments in the NEW EXPERIMENT SETTED
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)
        
    # select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(NEW_EXPERIMENT_NAME)
    
    best_run = client.search_runs(
        experiment_ids=experiment.experiment_id, 
        run_view_type=ViewType.ACTIVE_ONLY, 
        order_by=["metrics.rmse ASC"])[0]
    
    # get the best run uri
    run_id = best_run.to_dictionary()['info']['run_id']
    model_uri = f"runs:/{run_id}/models_pickle"
    
    # register the best model
    mlflow.register_model(model_uri=model_uri, name= register_name)
    
    # put in production
    version = dict(dict(client.list_registered_models()[-1])['latest_versions'][0])['version']
    name = dict(client.list_registered_models()[-1])['name']
    client.transition_model_version_stage(
        name=name,
        version=version,
        stage="Production"
    )

In [113]:
run('output', 5, EXPERIMENT_NAME, NEW_EXPERIMENT_NAME, "nyc-taxi-regressor")

Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
2022/05/27 18:19:01 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: nyc-taxi-regressor, version 3
Created version '3' of model 'nyc-taxi-regressor'.
