# MLFlow practice
This notebook aims to help you get familiar with using MLflow through hands-on learning by training various models to predict the duration of a taxi ride.

**Data**
I'm taking the [NYC taxi cab data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) which is store in parquet format.

Specifically I will take the green taxi cab data from Jan 2023 (train), Feb 2023(val), and Mar 2023 (test)

The data will be stored in the following folder `/mlops-zoomcamp/cohorts/2024/02-experiment-tracking/homework/data/`

The data dictionary is available [here](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf)

**Models**
While technically not necessary for learning. I'm going to test 3 different models
1. xgboost
2. random forrest (sklearn)
3. linear regression

In [2]:
#Dependencies
import mlflow
import os
import pandas as pd
import pickle
import numpy as np

## Data Prep
### Download data

In [3]:
#Set paths
Homework_Path = os.getcwd()
Data_Path = "./data/"

In [4]:
# Download data - I did this in the terminal because I forgot to install wget. But this should work
os.chdir(Data_Path)
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-03.parquet
os.listdir()
os.chdir(Homework_Path)

zsh:1: command not found: wget
zsh:1: command not found: wget
zsh:1: command not found: wget


In [5]:
train_path = "./data/green_tripdata_2023-01.parquet"
val_path = "./data/green_tripdata_2023-02.parquet"
test_path = "./data/green_tripdata_2023-03.parquet"

df_train = pd.read_parquet(train_path)
df_val = pd.read_parquet(val_path)
df_test = pd.read_parquet(test_path)

df_train.head()
df_val.head()
df_test.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2023-03-01 00:25:10,2023-03-01 00:35:47,N,1.0,82,196,1.0,2.36,13.5,1.0,0.5,0.0,0.0,,1.0,16.0,2.0,1.0,0.0
1,2,2023-03-01 00:14:29,2023-03-01 00:25:04,N,1.0,7,7,1.0,0.78,-6.5,-1.0,-0.5,0.0,0.0,,-1.0,-9.0,3.0,1.0,0.0
2,2,2023-03-01 00:14:29,2023-03-01 00:25:04,N,1.0,7,7,1.0,0.78,6.5,1.0,0.5,0.0,0.0,,1.0,9.0,3.0,1.0,0.0
3,2,2023-02-28 22:59:46,2023-02-28 23:08:38,N,1.0,166,74,1.0,1.66,11.4,1.0,0.5,2.78,0.0,,1.0,16.68,1.0,1.0,0.0
4,2,2023-03-01 00:54:03,2023-03-01 01:03:14,N,1.0,236,229,1.0,3.14,15.6,1.0,0.5,4.17,0.0,,1.0,25.02,1.0,1.0,2.75


## Pre-process data
Ok, so I want to make a slightly more complicated data set than those used in the tutorials. Here are the steps I want to do

**Create the dataframe and features**
`create_dataframe`
1. Create the target variable (dropoff - pickup = duration)
    * Filter out any data with a duration < 1min and >60 mins
2. Convert pickup and dropoff locations into strings
3. Create additional columns for day of the week: 1-7 (Mon-Sun, ordinal)
4. Create columns for hours and minutes of pick up

**Preprocess the dataframe**
`preprocess_dataframe`
1. Merge the pickup and dropoff to a single string
2. Use the dictionary vectorizer to one-hot encode the PU_DO location

In [6]:
import datetime
from sklearn.feature_extraction import DictVectorizer

def create_dataframe(filepath):
    #Read the parquet file
    df = pd.read_parquet(train_path)

    # Create duration (mins)
    df['duration'] = df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    # Filter outliers
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    #Pick up and drop off locations to strings
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    #Additional features
    df['pickup_day'] = df.lpep_pickup_datetime.apply(lambda x: x.weekday())
    df['pickup_hour'] = df.lpep_pickup_datetime.apply(lambda x: x.hour)
    df['pickup_min'] = df.lpep_pickup_datetime.apply(lambda x: x.minute)

    #Remove unwanted columns
    chosen_cols = ['duration','PULocationID','DOLocationID','trip_distance','pickup_day','pickup_hour','pickup_min']
    df = df[chosen_cols]

    return df

def preprocess_dataframe(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    #Merge the Pickup and dropoff locations to a single column
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']

    numerical = ['trip_distance','pickup_day','pickup_hour','pickup_min']
    
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv

def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

In [7]:
dv = DictVectorizer()

df_train = create_dataframe(train_path)
df_val = create_dataframe(val_path)
df_test = create_dataframe(test_path)

X_train, dv = preprocess_dataframe(df_train,dv, fit_dv=True)
X_val, _ = preprocess_dataframe(df_val,dv, fit_dv=False)
X_test, _ = preprocess_dataframe(df_test,dv, fit_dv=False)

print(f"X_train contains: {np.shape(X_train)[0]} entries, and {np.shape(X_train)[1]} columns")
print(f"X_val contains: {np.shape(X_val)[0]} entries, and {np.shape(X_val)[1]} columns")
print(f"X_test contains: {np.shape(X_test)[0]} entries, and {np.shape(X_test)[1]} columns")

X_train contains: 65946 entries, and 5705 columns
X_val contains: 65946 entries, and 5705 columns
X_test contains: 65946 entries, and 5705 columns


In [8]:
#Store target variables
y_train = df_train['duration']
y_val = df_val['duration']
y_test = df_test['duration']

# Save DictVectorizer and datasets
dump_pickle(dv, os.path.join(Data_Path, "dv.pkl"))
dump_pickle((X_train, y_train), os.path.join(Data_Path, "train.pkl"))
dump_pickle((X_val, y_val), os.path.join(Data_Path, "val.pkl"))
dump_pickle((X_test, y_test), os.path.join(Data_Path, "test.pkl"))

## Training models
So in this experiment (`nyc-taxi-ride-prediction`) the aim is to investigate which model can best predict the duration of a taxi ride. 

I'm planning three demonstrations
1. Record a single run (more for practice)
2. Perform a thorough experiment, with hyperparameter optimisation
3. Evaluate multiple models, and then select the best model.

### Record a single run

In [9]:
# Configure MLflow
database_uri = "sqlite:///mlflow.db"
experiment_name = "nyc-taxi-ride-prediction-single"

mlflow.set_tracking_uri(database_uri) #The name of the database to use
mlflow.set_experiment(experiment_name) #If already exists mlflow will append to existing data. Else it will make a new experiment.

<Experiment: artifact_location='/Users/marcusleiwe/Documents/GitHubRepos/mlops-zoomcamp/cohorts/2024/02-experiment-tracking/homework/mlruns/1', creation_time=1716887585182, experiment_id='1', last_update_time=1716887585182, lifecycle_stage='active', name='nyc-taxi-ride-prediction-single', tags={}>

In [10]:
#Libraries
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

#Train the model
with mlflow.start_run():
    #MLflow tags
    mlflow.set_tag("developer","Marcus")
    mlflow.log_param("train-data-path",train_path)
    mlflow.log_param("val-data-path",val_path)

    #Model init
    params = {
        'max_depth': 15,
        'n_estimators': 50,
        'min_samples_split': 2,
        'min_samples_leaf': 4,
        'random_state': 42
    }
    #Store Random Forest Parameters
    mlflow.log_params(params)

    #Actually train the model
    rf = RandomForestRegressor(**params)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_val)

    #Evaluation
    rmse = mean_squared_error(y_val, y_pred, squared=False)
    mlflow.log_metric("rmse",rmse)

    #Save model
    with open('./models/rf_reg.bin','wb') as f_out:
        pickle.dump((dv,rf), f_out)



### Experiment Tracking with Hyperparameter Tuning
In this case I will optimise the random forest model. 

In [11]:
# Configure MLflow
database_uri = "sqlite:///mlflow.db"
experiment_name = "nyc-taxi-ride-prediction-optimisation"

mlflow.set_tracking_uri(database_uri) #The name of the database to use
mlflow.set_experiment(experiment_name) #If already exists mlflow will append to existing data. Else it will make a new experiment.

2024/05/28 18:17:33 INFO mlflow.tracking.fluent: Experiment with name 'nyc-taxi-ride-prediction-optimisation' does not exist. Creating a new experiment.


<Experiment: artifact_location='/Users/marcusleiwe/Documents/GitHubRepos/mlops-zoomcamp/cohorts/2024/02-experiment-tracking/homework/mlruns/2', creation_time=1716891453739, experiment_id='2', last_update_time=1716891453739, lifecycle_stage='active', name='nyc-taxi-ride-prediction-optimisation', tags={}>

In [45]:
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials    
from hyperopt.pyll import scope

#Define the search space
search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 2, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 5, 100, 5)),
        #'min_samples_split': scope.int(hp.quniform('min_samples_split', 1, 10, 1)),
        #'criterion' : hp.choice('criterion', ['squared_error', 'poisson']),
        #'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 10, 1)),
        'random_state': 42
    }

def objective(params):
    with mlflow.start_run():
        #Create tags
        mlflow.set_tag("model","RandomForestRegressor")
        mlflow.set_tag("developer","Marcus")
        mlflow.log_param("train-data-path",train_path)
        mlflow.log_param("val-data-path",val_path)
        mlflow.log_params(params)
        
        #Train model
        rf = RandomForestRegressor(**params)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_val)
        
        #Evaluation
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse",rmse)

In [46]:
# Use fmin to optimise
rstate = np.random.default_rng(42)

best_result = fmin(
    fn = objective,
    space = search_space, #the search space
    algo = tpe.suggest, #uses the TPE algo to estimate the next parameters
    max_evals = 10,
    trials = Trials(), #Stores the information for each trial
    rstate=rstate
)

  0%|          | 0/250 [00:00<?, ?trial/s, best loss=?]


job exception: 'NoneType' object is not iterable



  0%|          | 0/250 [00:15<?, ?trial/s, best loss=?]


TypeError: 'NoneType' object is not iterable

In [23]:
print(scope.int(hp.loguniform('max_depth', 2, 10)))

0 int
1   float
2     hyperopt_param
3       Literal{max_depth}
4       loguniform
5         Literal{2}
6         Literal{10}
