In [34]:
import pandas as pd
import sklearn
import pickle
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, ElasticNet
from sklearn.feature_extraction import DictVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

from sklearn.metrics import root_mean_squared_error

import xgboost as xgb
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll.base import scope



In [23]:
import mlflow

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("nyc-taxi")

<Experiment: artifact_location='mlflow-artifacts:/1', creation_time=1751800556166, experiment_id='1', last_update_time=1751800556166, lifecycle_stage='active', name='nyc-taxi', tags={}>

In [21]:
def read_dataframe(filename):
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)

        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    elif filename.endswith('.parquet'):
        df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    # extract day of week and hour of day and put into new columns
    df['day_of_week'] = df.tpep_pickup_datetime.dt.dayofweek
    df['hour_of_day'] = df.tpep_pickup_datetime.dt.hour
    df['day_of_week'] = df['day_of_week'].astype(str)
    df['hour_of_day'] = df['hour_of_day'].astype(str)
    
    # get 'congestion_surcharge', 'fare_amount', 'tip_amount', 'total_amount' and convert to float, delete rows with null values
    for field in ['congestion_surcharge', 'fare_amount', 'tip_amount', 'total_amount']:
        # df[field] = pd.to_numeric(df[field], errors='coerce')
        df = df[df[field].notna()]
        df[field] = df[field].astype(float)
        
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    
    return df

In [6]:

dir_path = './'

train_path = dir_path + '/yellow_tripdata_2025-01_sample_train.parquet'
val_path = dir_path + '/yellow_tripdata_2025-01_sample_val.parquet'
test_path = dir_path + '/yellow_tripdata_2025-01_sample_test.parquet'


## Create dataset 

In [10]:
df = read_dataframe('yellow_tripdata_2025-01_sample.parquet')
df.head()
df_train, df_val, df_test = np.split(df.sample(frac=1, random_state=42), [int(.8*len(df)), int(.9*len(df))])

# save the datasets to parquet files

df_train.to_parquet(train_path)
df_val.to_parquet(val_path)
df_test.to_parquet(test_path)
# check the length of the datasets
len(df_train), len(df_val), len(df_test)

  return bound(*args, **kwds)


(215971, 26996, 26997)

In [8]:
df_train = pd.read_parquet(train_path)
df_val = pd.read_parquet(val_path)
df_test = pd.read_parquet(test_path)

df_train.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee,duration,day_of_week,hour_of_day,PU_DO
1455479,2,2025-02-15 15:38:29,2025-02-15 16:01:08,1.0,5.21,1.0,N,100,209,1,...,0.0,1.0,38.7,2.5,0.0,0.75,22.65,5,15,100_209
2507360,2,2025-02-26 18:53:05,2025-02-26 19:02:45,1.0,1.9,1.0,N,162,236,1,...,0.0,1.0,21.35,2.5,0.0,0.75,9.666667,2,18,162_236
1871035,1,2025-02-20 09:04:59,2025-02-20 09:14:26,1.0,1.7,1.0,N,140,233,1,...,0.0,1.0,19.4,2.5,0.0,0.75,9.45,3,9,140_233
859430,2,2025-02-09 15:29:41,2025-02-09 16:00:32,5.0,8.14,1.0,N,138,225,1,...,0.0,1.0,57.18,0.0,1.75,0.0,30.85,6,15,138_225
2637860,2,2025-02-27 20:32:46,2025-02-27 20:41:07,1.0,1.08,1.0,N,233,137,1,...,0.0,1.0,15.05,2.5,0.0,0.75,8.35,3,20,233_137


In [9]:
# save 10 percent of data to a parquet file

sample_dataset_path = 'yellow_tripdata_2025-01_sample.parquet'
df.sample(frac=0.1).to_parquet(sample_dataset_path)

In [74]:
categorical = [ 'day_of_week', 'hour_of_day'] #'PULocationID', 'DOLocationID', 'PU_DO',
numerical = ['trip_distance', 'congestion_surcharge']  # 'fare_amount', 'tip_amount', 'total_amount'


full_pipeline = DictVectorizer(sparse=False)

x_dict = df_train[categorical + numerical].to_dict(orient='records')

print(x_dict[0])

# transform the training and validation data using the full pipeline
X_train = full_pipeline.fit_transform(x_dict)


X_val = full_pipeline.transform(df_val[categorical + numerical].to_dict(orient='records'))

feature_names = list(full_pipeline.get_feature_names_out())

print(feature_names)

{'day_of_week': '4', 'hour_of_day': '7', 'trip_distance': 0.57, 'congestion_surcharge': 2.5}
['congestion_surcharge', 'day_of_week=0', 'day_of_week=1', 'day_of_week=2', 'day_of_week=3', 'day_of_week=4', 'day_of_week=5', 'day_of_week=6', 'hour_of_day=0', 'hour_of_day=1', 'hour_of_day=10', 'hour_of_day=11', 'hour_of_day=12', 'hour_of_day=13', 'hour_of_day=14', 'hour_of_day=15', 'hour_of_day=16', 'hour_of_day=17', 'hour_of_day=18', 'hour_of_day=19', 'hour_of_day=2', 'hour_of_day=20', 'hour_of_day=21', 'hour_of_day=22', 'hour_of_day=23', 'hour_of_day=3', 'hour_of_day=4', 'hour_of_day=5', 'hour_of_day=6', 'hour_of_day=7', 'hour_of_day=8', 'hour_of_day=9', 'trip_distance']


In [39]:
target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values

In [77]:
train = xgb.DMatrix(X_train, label=y_train)
valid = xgb.DMatrix(X_val, label=y_val)

In [10]:
def objective(params):
    with mlflow.start_run():
        mlflow.set_tag("model", "xgboost")
        mlflow.log_params(params)
        
        mlflow.log_param('categorical_features', categorical)
        mlflow.log_param('numerical_features', numerical)
        
        mlflow.log_param('train_dataset_path', train_path)
        mlflow.log_param('val_dataset_path', val_path)
    
        booster = xgb.train(
            params=params,
            dtrain=train,
            num_boost_round=1000,
            evals=[(valid, 'validation')],
            early_stopping_rounds=50
        )
        y_pred = booster.predict(valid)
        rmse = root_mean_squared_error(y_val, y_pred)
        mlflow.log_metric("rmse", rmse)

    return {'loss': rmse, 'status': STATUS_OK}

In [34]:
search_space = {
    'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
    'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
    'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
    'objective': 'reg:linear',
    'seed': 42
}

best_result = fmin(
    fn=objective,
    space=search_space,
    algo=tpe.suggest,
    max_evals=3,
    trials=Trials()
)

  0%|          | 0/3 [00:00<?, ?trial/s, best loss=?]

  self.starting_round = model.num_boosted_rounds()



[0]	validation-rmse:9.02459                          
[1]	validation-rmse:8.11913                          
[2]	validation-rmse:7.32049                          
[3]	validation-rmse:6.62028                          
[4]	validation-rmse:6.01117                          
[5]	validation-rmse:5.47901                          
[6]	validation-rmse:5.02124                          
[7]	validation-rmse:4.63048                          
[8]	validation-rmse:4.29830                          
[9]	validation-rmse:4.01887                          
[10]	validation-rmse:3.78582                         
[11]	validation-rmse:3.59273                         
[12]	validation-rmse:3.43407                         
[13]	validation-rmse:3.30466                         
[14]	validation-rmse:3.20145                         
[15]	validation-rmse:3.11836                         
[16]	validation-rmse:3.05365                         
[17]	validation-rmse:3.00293                         
[18]	validation-rmse:2.96411

KeyboardInterrupt: 

In [78]:
import json
import matplotlib.pyplot as plt
import seaborn as sns


def comprehensive_feature_importance_analysis(model, feature_names=None):
    """Analyze and log comprehensive feature importance."""

    importance_types = ["weight", "gain", "cover", "total_gain"]

    for imp_type in importance_types:
        # Get importance scores
        importance = model.get_score(importance_type=imp_type)

        if not importance:
            continue

        # Sort features by importance
        sorted_features = sorted(
            importance.items(), key=lambda x: x[1], reverse=True
        )

        # Create visualization
        features, scores = zip(*sorted_features[:10])

        plt.figure(figsize=(10, 8))
        sns.barplot(x=list(scores), y=list(features))
        plt.title(f"Top 10 Feature Importance ({imp_type.title()})")
        plt.xlabel("Importance Score")
        plt.tight_layout()

        # Save and log plot
        plot_filename = f"feature_importance_{imp_type}.png"
        plt.savefig(plot_filename, bbox_inches="tight")
        mlflow.log_artifact(plot_filename)
        plt.close()

        # Log importance as JSON artifact
        json_filename = f"feature_importance_{imp_type}.json"
        with open(json_filename, "w") as f:
            json.dump(importance, f, indent=2)
        mlflow.log_artifact(json_filename)

In [79]:
import mlflow.xgboost


mlflow.end_run()

with mlflow.start_run():
    
    best_params = {
        'learning_rate': 0.06836426267409443,
        'max_depth': 10,
        'min_child_weight': 14.354365207007865,
        'objective': 'reg:linear',
        'reg_alpha': 0.2042301820266,
        'reg_lambda': 0.11861308163,
        'seed': 42,
    }
    
    mlflow.log_params(best_params)

    booster = xgb.train(best_params, dtrain=train,
                num_boost_round=1000,
                evals=[(valid, 'validation')],
                early_stopping_rounds=50
            )

    y_pred = booster.predict(valid)
    rmse = root_mean_squared_error(y_val, y_pred)
    mlflow.log_metric("rmse", rmse)
    
    mlflow.log_param('categorical_features', categorical)
    mlflow.log_param('numerical_features', numerical)
    
    comprehensive_feature_importance_analysis(booster, feature_names=feature_names)
    
    mlflow.log_param('model', 'XGBoost')
    
    with open('models/preprocessor.b', 'wb') as f_out:
        pickle.dump(full_pipeline, f_out)
    
    mlflow.log_artifact('models/preprocessor.b', artifact_path="preprocessor")
    mlflow.xgboost.log_model(booster, artifact_path="model")
    
    

  self.starting_round = model.num_boosted_rounds()


[0]	validation-rmse:9.62179
[1]	validation-rmse:9.14769
[2]	validation-rmse:8.71483
[3]	validation-rmse:8.31956
[4]	validation-rmse:7.95896
[5]	validation-rmse:7.63162
[6]	validation-rmse:7.33396
[7]	validation-rmse:7.06379
[8]	validation-rmse:6.82052
[9]	validation-rmse:6.59958
[10]	validation-rmse:6.40193
[11]	validation-rmse:6.22303
[12]	validation-rmse:6.06145
[13]	validation-rmse:5.91692
[14]	validation-rmse:5.78847
[15]	validation-rmse:5.67198
[16]	validation-rmse:5.56883
[17]	validation-rmse:5.47548
[18]	validation-rmse:5.39435
[19]	validation-rmse:5.32176
[20]	validation-rmse:5.25559
[21]	validation-rmse:5.19715
[22]	validation-rmse:5.14494
[23]	validation-rmse:5.09885
[24]	validation-rmse:5.05733
[25]	validation-rmse:5.01947
[26]	validation-rmse:4.98646
[27]	validation-rmse:4.95666
[28]	validation-rmse:4.93027
[29]	validation-rmse:4.90587
[30]	validation-rmse:4.88523
[31]	validation-rmse:4.86648
[32]	validation-rmse:4.85009
[33]	validation-rmse:4.83384
[34]	validation-rmse:4.8

  xgb_model.save_model(model_data_path)


🏃 View run unleashed-seal-70 at: http://localhost:5000/#/experiments/1/runs/cc64c892e826442baaa3b7ff0f916e17
🧪 View experiment at: http://localhost:5000/#/experiments/1


In [80]:

def load_model():
    """
    Load model once and cache it
    """
    model_name = "xgboost_model"
    model_version = "latest"

    model_uri = f"models:/{model_name}/{model_version}"
    model = mlflow.xgboost.load_model(model_uri)

    return model

In [81]:
import os 

RUN_ID = 'cc64c892e826442baaa3b7ff0f916e17'
def load_preprocessor():
    """
    Load preprocessor once and cache it
    """

    cur_dir = './'
    preprocessor_path = f'{cur_dir}/preprocessor/preprocessor.b'

    # Download only if not exists
    mlflow.artifacts.download_artifacts(
        artifact_uri=f'mlflow-artifacts:/1/{RUN_ID}/artifacts/preprocessor',
        dst_path=cur_dir
    )

    with open(preprocessor_path, 'rb') as f:
        preprocessor = pickle.load(f)


    return preprocessor

In [82]:
preprocessor= load_preprocessor()
 
booster = load_model()



Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00, 151.95it/s]


Downloading artifacts: 100%|██████████| 5/5 [00:00<00:00, 51.19it/s] 


In [83]:
import time 

ride_dict = {'day_of_week': '1', 'hour_of_day': '12', 'trip_distance': 3.5, 'congestion_surcharge': 2.5}


# Transform data, calc time spent to transform data
start_time = time.time()
x_transformed = preprocessor.transform(ride_dict)
end_time = time.time()
print(f"Data transformation time: {end_time - start_time} seconds")

# Create DMatrix with cached feature names
start_time = time.time()
x_dmatrix = xgb.DMatrix(x_transformed)
end_time = time.time()
print(f"DMatrix creation time: {end_time - start_time} seconds")

# Predict
start_time = time.time()
prediction = booster.predict(x_dmatrix)
end_time = time.time()
print(f"Prediction time: {end_time - start_time} seconds")

Data transformation time: 0.0005190372467041016 seconds
DMatrix creation time: 0.022701740264892578 seconds
Prediction time: 0.034089088439941406 seconds
