In [3]:
import pandas as pd
import mlflow
import pickle
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope
import numpy as np
import os
from mlflow.models.signature import infer_signature
pd.set_option('display.float_format', '{:.2f}'.format)


ModuleNotFoundError: No module named 'mlflow'

In [None]:
def set_experiment():
    mlflow.set_tracking_uri("sqlite:///mlflow_test.db")
    mlflow.set_experiment("nyc_taxi_hw3_test")

In [8]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.apply(lambda td: td.total_seconds()/60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    categorical = ['PULocationID','DOLocationID']
    df[categorical] = df[categorical].astype(str)
    print(f"{filename} has {len(df)} records.")
    return df

In [9]:
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    
    categorical = ['PULocationID', 'DOLocationID']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv

In [10]:
def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

In [11]:
def run_data_prep(raw_data_path: str, dest_path: str, dataset: str = "green"):
    # print(raw_data_path)
    # Load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-03.parquet")
    )
    df_val = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-04.parquet")
    )
    # df_test = read_dataframe(
    #     os.path.join(raw_data_path, f"{dataset}_tripdata_2023-03.parquet")
    # )

    # Extract the target
    target = 'duration'
    y_train = df_train[target].values
    y_val = df_val[target].values
    # y_test = df_test[target].values

    # Fit the DictVectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_val, _ = preprocess(df_val, dv, fit_dv=False)
    # X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # Create dest_path folder unless it already exists
    # os.makedirs(dest_path, exist_ok=True)

    
    # Save DictVectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
    # dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))

In [41]:
def load_pickle(filename: str):
    print(filename)

    with open(filename, "rb") as f_in:
        return pickle.load(f_in)
def train_model(data_path):
    
    with mlflow.start_run() as run:
        X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
        X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
        
        lr = LinearRegression()
        
        lr.fit(X_train, y_train)

      
        mlflow.log_param("intercept_", lr.intercept_)
        y_pred = lr.predict(X_val)
        # ตัวอย่างข้อมูลที่ใช้ infer
        input_example = X_train[:5]
        signature = infer_signature(X_train[:5], lr.predict(X_train[:5]))

        rmse = root_mean_squared_error(y_val, y_pred)
        mlflow.log_metric("rmse", rmse)
        mlflow.sklearn.log_model(sk_model=lr, artifact_path="model",
                                 input_example=input_example,
                                 signature=signature)
        run_id = run.info.run_id
        
        
    return run_id
        


In [24]:
def register_model_to_mlflow(run_id: str, model_name: str, artifact_path: str = "model") -> str:
    """
    Register a model from a completed MLflow run to the MLflow Model Registry.

    Parameters:
        run_id (str): The run ID of the MLflow run.
        model_name (str): The name to register the model under in the MLflow Model Registry.
        artifact_path (str): The artifact path used when logging the model. Default is "model".

    Returns:
        str: The model URI that was registered.
    """
    model_uri = f"runs:/{run_id}/{artifact_path}"

    mlflow.register_model(
        model_uri=model_uri,
        name=model_name
    )

    print(f"✅ Model registered: {model_name} from run_id={run_id}")
    return model_uri

In [14]:
set_experiment()

In [10]:
df = read_dataframe("./airflow_db/data/yellow_tripdata_2023-03.parquet")

./airflow_db/data/yellow_tripdata_2023-03.parquet has 3316216 records.


In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 3316216 entries, 0 to 3403765
Data columns (total 20 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           object        
 8   DOLocationID           object        
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  Airport_fee            floa

In [12]:
run_data_prep("./airflow_db/data","./airflow_db/data_pickled","yellow")

./airflow_db/data/yellow_tripdata_2023-03.parquet has 3316216 records.
./airflow_db/data/yellow_tripdata_2023-04.parquet has 3199715 records.


In [None]:
mlflow ui --port 5001 --backend-store-uri sqlite:///mlflow_test.db --default-artifact-root ./mlruns


In [42]:
run_id = train_model('./airflow_db/data_pickled')



./airflow_db/data_pickled/train.pkl
./airflow_db/data_pickled/val.pkl




In [43]:
register_model_to_mlflow(run_id, 'linearregression')

✅ Model registered: linearregression from run_id=9262b79673d1459d84d2c1fde7690c8a


Registered model 'linearregression' already exists. Creating a new version of this model...
Created version '4' of model 'linearregression'.


'runs:/9262b79673d1459d84d2c1fde7690c8a/model'