In [1]:
import pandas as pd
import numpy as np
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import os
from pathlib import Path

from sklearn.feature_extraction import DictVectorizer

from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from sklearn.svm import LinearSVR

from sklearn.metrics import mean_squared_error

import mlflow
mlflow.sklearn.autolog()

In [2]:
os.chdir("/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/02-training")

In [3]:
tracking_uri = "sqlite:///mlflow-run.sqlite.db"

mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment("nyc-taxi-duration-experiment-0529")

2023/05/29 03:20:45 INFO mlflow.tracking.fluent: Experiment with name 'nyc-taxi-duration-experiment-0529' does not exist. Creating a new experiment.


<Experiment: artifact_location='/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/02-training/mlruns/2', creation_time=1685310645412, experiment_id='2', last_update_time=1685310645412, lifecycle_stage='active', name='nyc-taxi-duration-experiment-0529', tags={}>

In [4]:
def read_dataframe(filename):
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)

        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    elif filename.endswith('.parquet'):
        df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df


def preprocess(df, dv):
    # creating target vector
    y = df["duration"].values

    # create feature matrix
    df.drop("duration", axis = 1)
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(train_dicts), y




In [5]:
train_data_path = "/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/01-intro/data/train/green_tripdata_2021-01.parquet"
val_data_path = "/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/01-intro/data/val/green_tripdata_2021-02.parquet"

df_train = read_dataframe(train_data_path)
df_val = read_dataframe(val_data_path)

df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
df_val['PU_DO'] = df_val['PULocationID'] + '_' + df_val['DOLocationID']

categorical = ['PU_DO'] #'PULocationID', 'DOLocationID']
numerical = ['trip_distance']

dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts = df_val[categorical + numerical].to_dict(orient='records')
X_val = dv.transform(val_dicts)

target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values

In [6]:
for model_class in (ExtraTreesRegressor, RandomForestRegressor,
                     GradientBoostingRegressor, LinearSVR):

    with mlflow.start_run():
        mlflow.log_params({"train_data": train_data_path,
                        "validation_data": val_data_path})
        mlflow.set_tags({"model": model_class, "developer": "Saheli"})
        mlflow.log_artifact(local_path= "models/preprocessor.b", 
                            artifact_path= "preprocessor")
        
        model = model_class()
        model.fit(X_train, y_train)

        y_pred = model.predict(X_val)
        rmse = mean_squared_error(y_val, y_pred, squared= False)
        mlflow.log_metric("rmse", rmse)




## Create experiment and train models using MLflowClient

In [7]:
from mlflow import MlflowClient

MLFLOW_TRACKING_URI = "sqlite:///mlflow-run.sqlite.db"
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

In [8]:
## Creating experiment using MLflow client
experiment_id = client.create_experiment(
                                            "nyc-taxi-duration-experiment-0528",
                                            artifact_location= "./mlruns/",
                                            tags = {"version": "v1", "priority": "P1"}
                                        )

client.set_experiment_tag(experiment_id, "regression.framework", "Duration prediction")

# Fetch experiment metadata information
experiment = client.get_experiment(experiment_id)
print("Name: {}".format(experiment.name))
print("Experiment_id: {}".format(experiment.experiment_id))
print("Artifact Location: {}".format(experiment.artifact_location))
print("Tags: {}".format(experiment.tags))
print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))

Name: nyc-taxi-duration-experiment-0528
Experiment_id: 3
Artifact Location: /Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/02-training/mlruns
Tags: {'version': 'v1', 'priority': 'P1', 'regression.framework': 'Duration prediction'}
Lifecycle_stage: active


In [9]:

for model_class in (LinearRegression, DecisionTreeRegressor):

    with mlflow.start_run(experiment_id= experiment_id):
        
        mlflow.log_params({"train_data": train_data_path,
                        "validation_data": val_data_path})
        mlflow.set_tags({"model": model_class, "developer": "Saheli"})
        mlflow.log_artifact(local_path= "models/preprocessor.b", 
                            artifact_path= "preprocessor")
        
        model = model_class()
        model.fit(X_train, y_train)

        y_pred = model.predict(X_val)
        rmse = mean_squared_error(y_val, y_pred, squared= False)
        mlflow.log_metric("validation-rmse", rmse)


In [10]:
from mlflow.entities import ViewType

runs = client.search_runs(
                            experiment_ids= ['2'],
                            filter_string= "metric.rmse < 7 and tags.developer = 'Saheli'",
                            run_view_type= ViewType.ACTIVE_ONLY,
                            max_results= 5,
                            order_by= ["metric.rmse ASC"]
                        )

for run in runs:
    print(f"run_id : {run.info.run_id}, model : {run.data.tags['estimator_name']}, rmse : {run.data.metrics['rmse']:.4f}")



run_id : 8823b247ecbf46b787cca32be1f1665f, model : GradientBoostingRegressor, rmse : 6.7423
run_id : 4709074be5fb4cb68e25c5c3a7635dfd, model : RandomForestRegressor, rmse : 6.9116


## Register models

In [11]:
run_id = "8823b247ecbf46b787cca32be1f1665f"
model_uri = f'runs:/{run_id}/model'
version_name = "GradientBoostingRegressor"

mlflow.register_model(model_uri, 
                      name = "NYC-Ride-Duration-Regressor", 
                      tags = {"model": version_name})

Registered model 'NYC-Ride-Duration-Regressor' already exists. Creating a new version of this model...
2023/05/29 03:38:41 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: NYC-Ride-Duration-Regressor, version 2
Created version '2' of model 'NYC-Ride-Duration-Regressor'.


<ModelVersion: aliases=[], creation_timestamp=1685311721801, current_stage='None', description=None, last_updated_timestamp=1685311721801, name='NYC-Ride-Duration-Regressor', run_id='8823b247ecbf46b787cca32be1f1665f', run_link=None, source='/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/02-training/mlruns/2/8823b247ecbf46b787cca32be1f1665f/artifacts/model', status='READY', status_message=None, tags={'model': 'GradientBoostingRegressor'}, user_id=None, version=2>

### Register model using MLflowClient

In [12]:
client.create_registered_model("NYC-Ride-Duration-Regressor-v2")


result = client.create_model_version(
    name = "NYC-Ride-Duration-Regressor-v2",
    source = model_uri,
    run_id = run_id,
    description= "GradientBoosting regression model"
)



2023/05/29 03:39:48 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: NYC-Ride-Duration-Regressor-v2, version 1


In [14]:
## delete registered model
client.delete_registered_model(name="NYC-Ride-Duration-Regressor-v2")

## Stage models in registry

In [15]:
model_name = "NYC-Ride-Duration-Regressor"

latest_versions = client.get_latest_versions(name = model_name)

for version in latest_versions:
    print(f"version: {version.version}, run_id: {version.run_id}, stage: {version.current_stage}")

version: 1, run_id: a6839b8f0da24a8a9c71f651b83b8e1b, stage: Staging
version: 2, run_id: 8823b247ecbf46b787cca32be1f1665f, stage: None


In [19]:
model_version = 3
new_stage = "Staging"

client.transition_model_version_stage(
                                        name = model_name,
                                        version= model_version,
                                        stage = new_stage,
                                        archive_existing_versions=False
                                    )

from datetime import date
client.update_model_version(
                                name = model_name,
                                version= model_version,
                                description= f"This model version {model_version} was transitioned to {new_stage} on {date.today()}"
                            )

<ModelVersion: aliases=[], creation_timestamp=1685312044679, current_stage='Staging', description='This model version 3 was trasitioned to Staging on 2023-05-29', last_updated_timestamp=1685312097391, name='NYC-Ride-Duration-Regressor', run_id='4709074be5fb4cb68e25c5c3a7635dfd', run_link='', source='/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/02-training/mlruns/2/4709074be5fb4cb68e25c5c3a7635dfd/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

## Prediction using registered model

In [20]:

latest_versions = client.get_latest_versions(name = model_name)

prod_run_id = ''
prod_version = ''
prod_artifacts_path = ''

for version in latest_versions:
    print(f"version: {version.version}, stage: {version.current_stage}")
    if version.current_stage == "Production":
        prod_run_id = version.run_id
        prod_version = version.version
        prod_artifacts_path = version.source.rsplit("/", 1)[0]


version: 3, stage: Staging
version: 2, stage: Production


In [21]:
def test_model(X_test, y_test, name, stage):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": mean_squared_error(y_test, y_pred, squared=False)}

In [24]:
prod_preprocess_uri = prod_artifacts_path + "/preprocessor/preprocessor.b"

with open(prod_preprocess_uri, 'rb') as f:
    dv = pickle.load(f)

test_data_path = "/Users/sahelimukherjee/Documents/Personal/Learning/MLOps/projects/NYC_Ride_Duration_Prediction/Predicting-Ride-Duration/01-intro/data/test/green_tripdata_2021-03.parquet"

df = read_dataframe(test_data_path)
X_test, y_test = preprocess(df, dv)

In [25]:
%time test_model(X_test, y_test, name = model_name, stage = "Production")

 - mlflow (current: 2.3.2, required: mlflow==2.3)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


CPU times: user 134 ms, sys: 7.01 ms, total: 141 ms
Wall time: 183 ms


{'rmse': 6.659623830022514}

In [26]:
%time test_model(X_test, y_test, name = model_name, stage="Staging")

 - mlflow (current: 2.3.2, required: mlflow==2.3)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


CPU times: user 6.6 s, sys: 544 ms, total: 7.14 s
Wall time: 7.33 s


{'rmse': 6.888288175453307}

In [28]:
client.delete_model_version(name = model_name, version='1')

In [29]:
client.transition_model_version_stage(name= model_name, 
                                      version=3, 
                                      stage = "production",
                                      archive_existing_versions= True,
                                      description = f"Move to Production on {date.today()}")

%time test_model(X_test, y_test, name = model_name, stage = "Production")

 - mlflow (current: 2.3.2, required: mlflow==2.3)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


CPU times: user 6.52 s, sys: 469 ms, total: 6.99 s
Wall time: 7.05 s


{'rmse': 6.888288175453307}