In [1]:
from mlflow.tracking import MlflowClient


MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"

client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

2022/06/02 19:06:59 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2022/06/02 19:06:59 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 451aebb31d03, add metric step
INFO  [alembic.runtime.migration] Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
INFO  [alembic.runtime.migration] Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
INFO  [alembic.runtime.migration] Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table
INFO  [alembic.runtime.migration] Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger limit
INFO  [alembic.runtime.migration] Running upgrade 7ac759974ad8 -> 89d4b8295536, create latest metrics table
INFO  [89d4b8295536_create_latest_metrics_table_py] Migration complete!
INFO  

### Experiments

In [2]:
# list_experiments
client.list_experiments()

[<Experiment: artifact_location='./mlruns/0', experiment_id='0', lifecycle_stage='active', name='Default', tags={}>]

In [4]:
client.list_experiments()[0].experiment_id

'0'

In [3]:
# create_experiment
client.create_experiment(name="my-experiment")

'2'

### Search Runs

In [19]:
from mlflow.entities import ViewType

# search specifics run
runs = client.search_runs(
    experiment_ids='1',
    filter_string="metrics.rmse < 6.3 ",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse ASC"]
)

In [20]:
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.3f}")

run id: e4a8f1020b9547bc9c059b1819303722, rmse: 6.281
run id: 2f0148a17fc44f43a4835f61187edb25, rmse: 6.290
run id: bc5d062a5c2a477b9f41680558e3ac48, rmse: 6.295
run id: 74b51dc508b0485abea1703151692621, rmse: 6.298


### Model

In [22]:
import mlflow

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

In [71]:
# register a model
run_id = "74b51dc508b0485abea1703151692621"
model_uri = f"runs/{run_id}/model" 

mlflow.register_model(model_uri=model_uri, name="nyc-taxi-regressor")

Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
2022/06/02 16:50:42 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: nyc-taxi-regressor, version 4
Created version '4' of model 'nyc-taxi-regressor'.


<ModelVersion: creation_timestamp=1654188642081, current_stage='None', description=None, last_updated_timestamp=1654188642081, name='nyc-taxi-regressor', run_id=None, run_link=None, source='runs/74b51dc508b0485abea1703151692621/model', status='READY', status_message=None, tags={}, user_id=None, version=4>

In [27]:
client.list_registered_models()

[<RegisteredModel: creation_timestamp=1654182606007, description='NYC Taxi Predictor for Trip Duration', last_updated_timestamp=1654184901649, latest_versions=[<ModelVersion: creation_timestamp=1654182606943, current_stage='Staging', description='', last_updated_timestamp=1654183029683, name='nyc-taxi-regressor', run_id='ec83b5fdf5ef4ff5be337123791ffe4f', run_link='', source='./mlruns/1/ec83b5fdf5ef4ff5be337123791ffe4f/artifacts/models_mlflow', status='READY', status_message=None, tags={}, user_id=None, version=1>,
  <ModelVersion: creation_timestamp=1654184901649, current_stage='None', description='ADD testing API', last_updated_timestamp=1654184957060, name='nyc-taxi-regressor', run_id=None, run_link=None, source='runs/74b51dc508b0485abea1703151692621/model', status='READY', status_message=None, tags={'model': 'xgboost'}, user_id=None, version=2>], name='nyc-taxi-regressor', tags={'model': 'xgboost'}>]

In [32]:
# list version of a model
model_name = "nyc-taxi-regressor"
latest_versions = client.get_latest_versions(name=model_name)

for version in latest_versions:
    print(f"version: {version.version}, stage: {version.current_stage}")

version: 1, stage: Staging
version: 2, stage: None


In [39]:
# change stage of a model
model_version = 2
stage = "Staging"
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=stage,
    archive_existing_versions=False
)

<ModelVersion: creation_timestamp=1654184901649, current_stage='Staging', description='Test in API - update description', last_updated_timestamp=1654186289667, name='nyc-taxi-regressor', run_id=None, run_link=None, source='runs/74b51dc508b0485abea1703151692621/model', status='READY', status_message=None, tags={'model': 'xgboost'}, user_id=None, version=2>

In [42]:
from datetime import datetime

date= datetime.today().date()

# change description of a model
client.update_model_version(
    name=model_name,
    version=model_version,
    description= f"Test in API - update description - model transitioned to {stage} on {date}" 
)

<ModelVersion: creation_timestamp=1654184901649, current_stage='Staging', description='Test in API - update description - model transitioned to Staging on 2022-06-02', last_updated_timestamp=1654186490662, name='nyc-taxi-regressor', run_id=None, run_link=None, source='runs/74b51dc508b0485abea1703151692621/model', status='READY', status_message=None, tags={'model': 'xgboost'}, user_id=None, version=2>

### Testing Model

In [44]:
from sklearn.metrics import mean_squared_error
import pandas as pd


def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df


def preprocess(df, dv):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(train_dicts)


def test_model(name, stage, X_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": mean_squared_error(y_test, y_pred, squared=False)}

In [62]:
# read data
df = read_dataframe('datasets/green_tripdata_2021-03.parquet')

In [63]:
import pickle

# get the preprocessor
run_id_pre = "ec83b5fdf5ef4ff5be337123791ffe4f"
client.download_artifacts(run_id=run_id_pre, path="preprocessor", dst_path=".")

with open("preprocessor/preprocessor.b", "rb") as f_in:
    dv = pickle.load(f_in)

In [67]:
# pre process
X_test = preprocess(df, dv)
X_test

<80372x13221 sparse matrix of type '<class 'numpy.float64'>'
	with 153356 stored elements in Compressed Sparse Row format>

In [68]:
# target
target='duration'
y_test = df[target].values

In [75]:
%time test_model(name=model_name, stage="Production", X_test=X_test, y_test=y_test)

CPU times: user 117 ms, sys: 25 µs, total: 117 ms
Wall time: 116 ms


{'rmse': 6.659623831152371}

In [74]:
%time test_model(name=model_name, stage="Staging", X_test=X_test, y_test=y_test)

CPU times: user 15.1 s, sys: 5.13 ms, total: 15.1 s
Wall time: 4.01 s


{'rmse': 6.282646828305195}

In [None]:
# if i want to trade 

# client.transition_model_version_stage(
#     name=model_name,
#     version=,
#     stage='Production',
#     archive_existing_versions=True
# )