In [86]:
import pickle

import polars as pl 
import numpy as np

from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error

from sklearn.pipeline import make_pipeline

import os
from dotenv import load_dotenv

load_dotenv()

True

In [87]:
import mlflow

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("green-taxi-experiment")

os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY")

artifact_location = "s3://mlops-bucket-orchestration/"

In [88]:
def read_dataframe(filename: str) -> pl.DataFrame: 
    """
    Read a dataframe from a parquet file and return a dataframe with the duration of the trip in minutes.
    """
    df = (
        pl.read_parquet(filename)
        .with_columns(
            (pl.col("lpep_dropoff_datetime") - pl.col("lpep_pickup_datetime"))
            .dt.total_seconds()
            .alias("duration")
        )
        .with_columns(
            (pl.col("duration") / 60)
            .alias("duration_minutes")
        )
        .with_columns(
            pl.concat_str(pl.col("PULocationID"), pl.lit("_"), pl.col("DOLocationID")).alias("PU_DO")
        )
        .filter((pl.col("duration_minutes") > 1) & (pl.col("duration_minutes") <= 60))
        .with_columns(pl.col(["PULocationID", "DOLocationID"]).cast(pl.Utf8))
    )

    return df

def prepare_dictionaries(df: pl.DataFrame) -> tuple[np.ndarray, DictVectorizer]:
    """
    Prepare the dictionaries for the model.
    """
    return df.select(["PU_DO", "trip_distance"]).to_dicts()

In [89]:
df_train = read_dataframe("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-01.parquet")
df_val = read_dataframe("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet")

target = "duration_minutes"
y_train = df_train[target]
y_val = df_val[target]

dict_train = prepare_dictionaries(df_train)
dict_val = prepare_dictionaries(df_val)


In [90]:
with mlflow.start_run():
    params = {
        "max_depth": 20,
        "n_estimators": 100,
        "min_samples_leaf": 10,
        "random_state": 0
    }

    mlflow.log_params(params)
    
    # we can use make_pipeline to create a pipeline with the DictVectorizer and the RandomForestRegressor
    # so we don't need to store the DictVectorizer separately
    pipeline = make_pipeline(
        DictVectorizer(),
        RandomForestRegressor(**params, n_jobs=-1)
    )
    
    pipeline.fit(dict_train, y_train)
    y_pred = pipeline.predict(dict_val)

    rmse = root_mean_squared_error(y_val, y_pred)
    print(f"RMSE: {rmse}")
    
    mlflow.log_metric("rmse", rmse)

    # save the DictVectorizer
    # dv = pipeline.steps[0][1]
    # with open("models/dict_vectorizer.bin", "wb") as f_out:
    #     pickle.dump(dv, f_out)

    # mlflow.log_artifact(local_path="models/dict_vectorizer.bin", artifact_path="model")
    
    mlflow.sklearn.log_model(pipeline, artifact_path="model")
    
    print(f"Model saved in run {mlflow.active_run().info.run_id}")
    

RMSE: 6.754913985309662




Model saved in run 3d2f54b35a4743b4af6ee34e3415c787
🏃 View run melodic-squirrel-753 at: http://127.0.0.1:5000/#/experiments/992595661440936711/runs/3d2f54b35a4743b4af6ee34e3415c787
🧪 View experiment at: http://127.0.0.1:5000/#/experiments/992595661440936711


In [91]:
pipeline.predict(dict_val)[0]

18.078530539860687