In [None]:
from pathlib import Path

import joblib
import mlflow
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

# MLflow in Local

For this demo, we'll implement MLflow scenario where we will use a SQLite database as the backend store and `mlruns` directory as the artifact store.

The objective is we want to predict the trip duration based on the trip distance.

## Configure MLflow Tracking URI

In [None]:
EXPERIMENT_NAME = "duration-prediction"

In [None]:
mlflow.set_tracking_uri("sqlite:///mlflow.db")

In [None]:
mlflow.get_tracking_uri()

In [None]:
mlflow.set_experiment(EXPERIMENT_NAME)

After we set the experiment name, there'll be `mlruns` directory and a SQLite database `mlflow.db` generated automatically. We can access the database using `sqlite3` if we want.

## Load Dataset

In [None]:
DATA_DIR = Path("../data")
DATA_RAW_DIR = DATA_DIR / "raw"
GREEN_TRIP_JAN = DATA_RAW_DIR / "green_tripdata_2022-01.csv"
GREEN_TRIP_FEB = DATA_RAW_DIR / "green_tripdata_2022-02.csv"
GREEN_TRIP_MAR = DATA_RAW_DIR / "green_tripdata_2022-03.csv"

if not DATA_RAW_DIR.exists():
    DATA_RAW_DIR.mkdir(parents=True)

After prototyping the dataset loading process, let's create a function to load other datasets.

In [None]:
def read_data(filename: Path):
    df = pd.read_csv(filename)

    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

    df["duration"] = (df.lpep_dropoff_datetime - df.lpep_pickup_datetime).apply(
        lambda dur: dur.total_seconds() / 60
    )
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    df.PULocationID = df.PULocationID.astype(str)
    df.DOLocationID = df.DOLocationID.astype(str)
    df["PU_DO"] = df.PULocationID + "_" + df.DOLocationID

    return df

In [None]:
df_train = read_data(GREEN_TRIP_JAN)
df_val = read_data(GREEN_TRIP_FEB)
df_test = read_data(GREEN_TRIP_MAR)

In [None]:
df_train.head()

In [None]:
df_val.head()

In [None]:
df_test.head()

## Start Experiments

In [None]:
OUTPUT_DIR = Path("../outputs")
MODEL_DIR = OUTPUT_DIR / "models"
if not MODEL_DIR.exists():
    MODEL_DIR.mkdir(parents=True)

In [None]:
target = "duration"
numerical_features = ["trip_distance"]
categorical_features = ["PU_DO"]

In [None]:
X_train = df_train[numerical_features + categorical_features]
y_train = df_train[target]

X_val = df_val[numerical_features + categorical_features]
y_val = df_val[target]

X_test = df_test[numerical_features + categorical_features]
y_test = df_test[target]

### Without MLflow

In [None]:
model = Pipeline(
    [("featurizer", ColumnTransformer([
        ("encoder", OneHotEncoder(drop="first", handle_unknown="ignore"), categorical_features)])),
     ("estimator", LinearRegression())],
    verbose=True
)

model.fit(X_train, y_train)
train_pred = model.predict(X_train)
train_mse = mean_squared_error(y_train, train_pred, squared=False)

val_pred = model.predict(X_val)
val_mse = mean_squared_error(y_val, val_pred, squared=False)

test_pred = model.predict(X_test)
test_mse = mean_squared_error(y_test, test_pred, squared=False)

print(f"Train MSE: {train_mse:.3f}.. Val MSE: {val_mse:.3f}.. Test MSE: {test_mse:.3f}")

In [None]:
model = Pipeline(
    [("featurizer", ColumnTransformer([
        ("encoder", OneHotEncoder(drop="first", handle_unknown="ignore"), categorical_features)])),
     ("estimator", Lasso())],
    verbose=True
)

model.fit(X_train, y_train)
train_pred = model.predict(X_train)
train_rmse = mean_squared_error(y_train, train_pred, squared=False)

val_pred = model.predict(X_val)
val_rmse = mean_squared_error(y_val, val_pred, squared=False)

test_pred = model.predict(X_test)
test_rmse = mean_squared_error(y_test, test_pred, squared=False)

print(f"Train MSE: {train_rmse:.3f}.. Val MSE: {val_rmse:.3f}.. Test MSE: {test_rmse:.3f}")

### With MLflow

In [None]:
with mlflow.start_run() as run:
    mlflow.set_tag("developer", "dani")

    mlflow.log_param("train_data_path", str(GREEN_TRIP_JAN))
    mlflow.log_param("val_data_path", str(GREEN_TRIP_FEB))
    mlflow.log_param("test_data_path", str(GREEN_TRIP_MAR))

    alpha = 1.
    mlflow.log_param("alpha", alpha)

    model = Pipeline(
        [("featurizer", ColumnTransformer([
            ("encoder", OneHotEncoder(drop="first", handle_unknown="ignore"), categorical_features)])),
         ("estimator", Lasso(alpha=alpha))],
        verbose=True
    )
    mlflow.set_tag("estimator", model.named_steps["estimator"].__class__.__name__)
    model.fit(X_train, y_train)
    train_pred = model.predict(X_train)
    train_rmse = mean_squared_error(y_train, train_pred, squared=False)
    mlflow.log_metric("train_rmse", train_rmse)

    val_pred = model.predict(X_val)
    val_rmse = mean_squared_error(y_val, val_pred, squared=False)
    mlflow.log_metric("val_rmse", val_rmse)

    test_pred = model.predict(X_test)
    test_rmse = mean_squared_error(y_test, test_pred, squared=False)
    mlflow.log_metric("test_rmse", test_rmse)

    joblib.dump(model, MODEL_DIR / "model.joblib")
    mlflow.log_artifact(str(MODEL_DIR / "model.joblib"), artifact_path="artifacts")

## MLflow Tracking Server

Since we now use a SQLite database to store the experiments metadata, we can't use the previous command `mlflow ui`. Instead, we should run a tracking server using command `mlflow server --backend-store-uri sqlite:///mlflow.db`.

> The `--backend-store-uri URI` is used to tell the server to use the specified database to fetch all related metadata, such as experiments, runs, parameters, etc.

In [None]:
model_artifact_path = mlflow.artifacts.download_artifacts(
    run.info.artifact_uri + "/artifacts/model.joblib",
    dst_path="outputs"
)

In [None]:
model_artifact = joblib.load(model_artifact_path)

In [None]:
preds = model_artifact.predict(X_test)
mean_squared_error(y_test, preds, squared=False)