In [1]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [25]:
from joblib import load, dump
from tqdm import tqdm
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
from sklearn.pipeline import make_pipeline

import requests
import mlflow
import os
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge

from sklearn.metrics import mean_squared_error

In [3]:

os.environ["AWS_PROFILE"] = "algodx-magnus-dev" # fill in with your AWS profile. More info: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/setup.html#setup-credentials

TRACKING_SERVER_HOST = "ec2-13-53-93-155.eu-north-1.compute.amazonaws.com" # fill in with the public DNS of the EC2 instance
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:5000")
mlflow.set_experiment("nyc-taxi-experiment")

<Experiment: artifact_location='s3://mlflow-models-magnus-dev/2', creation_time=1719565541634, experiment_id='2', last_update_time=1719565541634, lifecycle_stage='active', name='nyc-taxi-experiment', tags={}>

In [4]:
files = [
    ('green_tripdata_2021-01.parquet', './data'),
    ('green_tripdata_2021-02.parquet', './data'),
    ]

print("Download files:")
for file, path in files:
    url=f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file}"
    resp=requests.get(url, stream=True)
    save_path=f"{path}/{file}"
    with open(save_path, "wb") as handle:
        for data in tqdm(resp.iter_content(),
                        desc=f"{file}",
                        postfix=f"save to {save_path}",
                        total=int(resp.headers["Content-Length"])):
            handle.write(data)

Download files:


green_tripdata_2021-01.parquet: 100%|███████████████████████████████████████████████████| 1333519/1333519 [00:04<00:00, 313524.28it/s, save to ./data/green_tripdata_2021-01.parquet]
green_tripdata_2021-02.parquet: 100%|███████████████████████████████████████████████████| 1145679/1145679 [00:03<00:00, 327766.32it/s, save to ./data/green_tripdata_2021-02.parquet]


In [4]:
train_data_path = 'data/green_tripdata_2021-01.parquet'
val_data_path = 'data/green_tripdata_2021-02.parquet'
train_data_raw = pd.read_parquet(train_data_path)
val_data_raw = pd.read_parquet(val_data_path)

In [5]:
def prep_data(data):
    # create target
    data["duration_min"] = data.lpep_dropoff_datetime - data.lpep_pickup_datetime
    data.duration_min = data.duration_min.apply(lambda td : float(td.total_seconds())/60)
    # filter out outliers
    data = data[(data.duration_min >= 0) & (data.duration_min <= 60)]
    data = data[(data.passenger_count > 0) & (data.passenger_count <= 8)]
    return data

def feature_engineering(data: pd.DataFrame):
    data['PU_DO'] = data['PULocationID'].astype(str) + '_' + data['DOLocationID'].astype(str)
    return data

In [6]:
# data labeling
target = "duration_min"
#num_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
num_features = ["trip_distance"]
cat_features = ["PU_DO"]

In [7]:
train_data = feature_engineering(train_data_raw)
val_data = feature_engineering(val_data_raw)

In [8]:
train_data = prep_data(train_data)
val_data = prep_data(val_data)

In [10]:
dv = DictVectorizer()
train_dicts = train_data[cat_features + num_features].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts = val_data[cat_features + num_features].to_dict(orient='records')
X_val = dv.transform(val_dicts)

In [16]:
X_train.shape

(39457, 4878)

In [17]:
target = 'duration_min'
y_train = train_data[target].values
y_val = val_data[target].values

In [21]:
X_val[0]

<1x4878 sparse matrix of type '<class 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Row format>

In [22]:
import pickle

with mlflow.start_run():
    mlflow.set_tag("developer", "magnus")
    mlflow.log_param("train-data-path", train_data_path)
    mlflow.log_param("val-data-path", val_data_path)
    model = LinearRegression()
    model.fit(X_train, y_train)
    with open('models/lin_reg.bin', 'wb') as f_out:
        pickle.dump(model, f_out)

    with open('artifacts/dict_vectorizer.bin', 'wb') as f_out:
        pickle.dump(dv, f_out)

    y_pred = model.predict(X_val)
    rmse = mean_squared_error(y_val, y_pred, squared=False)

    mlflow.log_metric("rmse", rmse)
    mlflow.sklearn.log_model(model, "model")
    mlflow.log_artifact('artifacts/dict_vectorizer.bin')



# Let's use a pipeline instead!

Also let's use a RandomForestRegressor like Alexei

In [34]:
from sklearn.ensemble import RandomForestRegressor

params = dict(max_depth=20, n_estimators=100, min_samples_leaf=10, random_state=0)

pipeline = make_pipeline(
    DictVectorizer(),
    RandomForestRegressor(**params, n_jobs=-1), # -1 means use all available processors for multiproc when fitting and predicting
)

In [35]:
import pickle

with mlflow.start_run():
    mlflow.set_tag("developer", "magnus")
    mlflow.log_param("train-data-path", train_data_path)
    mlflow.log_param("val-data-path", val_data_path)
    pipeline.fit(train_dicts, y_train)
    with open('pipelines/lin_reg.bin', 'wb') as f_out:
        pickle.dump(model, f_out)

    y_pred = pipeline.predict(val_dicts)
    rmse = mean_squared_error(y_val, y_pred, squared=False)

    mlflow.log_metric("rmse", rmse)
    mlflow.sklearn.log_model(pipeline, artifact_path="model")

