In [23]:
import pathlib
# import prefect
import pickle
import pandas as pd
# import numpy as np
# import scipy
# import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import mlflow
import xgboost as xgb
from prefect import task, flow

In [24]:
@task(retries=3, retry_delay_seconds=2)
def read_data(filename: str) -> pd.DataFrame:
    """Read data into dataframe"""
    df = pd.read_parquet(filename)
    
    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df
    

In [25]:
@task
def add_features(df_train: pd.DataFrame, df_val: pd.DataFrame) -> tuple:
    """Adds features to the dataset"""
    
    df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
    df_val['PU_DO'] = df_val['PULocationID'] + '_' + df_val['DOLocationID']
    
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    
    dv = DictVectorizer()
    
    train_dicts = df_train[categorical + numerical].to_dict(orient='records')
    val_dicts = df_val[categorical + numerical].to_dict(orient='records')
    
    X_train = dv.fit_transform(train_dicts)
    X_val = dv.transform(val_dicts)
    
    y_train = df_train['duration'].values
    y_val = df_val['duration'].values
    
    return X_train,X_val, y_train, y_val, dv
    

In [26]:
@task
def train_best_model(X_train,X_val, y_train, y_val, dv):
    """Trains the model with the best parameters"""
    with mlflow.start_run():
        
        train = xgb.DMatrix(X_train, label=y_train)
        valid = xgb.DMatrix(X_val, label=y_val)
        
        best_parms = {
            'learning_rate' : 0.6620269443945731,
            'max_depth' : 75,
            'min_child_weight' : 0.6681121434468128,
            'objective' : 'reg:linear',
            'reg_alpha' : 0.009835858863989126,
            'reg_lambda' : 0.23315101108258487,
            'seed' : 42
        }
        
        mlflow.log_params(best_parms)
        
        booster = xgb.train(
            params=best_parms,
            dtrain=train,
            num_boost_round=10,
            evals=[(valid, 'validation')],
            early_stopping_rounds=20
        )
        y_pred = booster.predict(valid)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)
        
        pathlib.Path("models").mkdir(exist_ok=True)
        
        with open("models/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)
            
        mlflow.log_artifact("models/preprocessor.b", artifact_path="preprocessor")
        
        mlflow.xgboost.log_model(booster, artifact_path="models_mlflow")
        
        return None

In [27]:
@flow
def main_flow(train_path = "data/green_tripdata_2022-01.parquet",
              val_path = "data/green_tripdata_2022-02.parquet"):
    
    """The main pipeline"""
    
    mlflow.set_tracking_uri("http://127.0.0.1:5000")
    mlflow.set_experiment("orchestrate-workflow")
    
    df_train = read_data(train_path)
    df_val = read_data(val_path)
    
    X_train,X_val, y_train, y_val, dv = add_features(df_train, df_val)
    
    train_best_model(X_train,X_val, y_train, y_val, dv)
    
    

In [28]:
main_flow()

[0]	validation-rmse:8.44110
[1]	validation-rmse:6.46714
[2]	validation-rmse:6.07598
[3]	validation-rmse:5.98638
[4]	validation-rmse:5.95438
[5]	validation-rmse:5.93839
[6]	validation-rmse:5.93238
[7]	validation-rmse:5.92929
[8]	validation-rmse:5.92758
[9]	validation-rmse:5.92527


