# NYC Taxi ML Pipeline


Creating a pipeline using sklearn

In [1]:
import mlflow
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import make_scorer
from sklearn.metrics import mean_squared_error
from itertools import product

from preprocessing_utils import *

## Preprocessing Pipeline

In [2]:
taxi_train_df = pd.read_csv("data/train.csv")

In [3]:
def rmsle(y_true, y_pred):
    return np.sqrt(mean_squared_error(np.log1p(y_true), np.log1p(y_pred)))

In [4]:
preprocessing_pipeline = Pipeline(
    [
        ("fetch_datetime_columns", FetchDatetimeColumns()),
        ("fetch_categorical_columns", FetchCategoricalColumns()),
        (
            "fetch_distances",
            FetchDistances(),
        ),
        ("scale_distances", ScaleDistances()),
    ]
)

Datetime column formatting initialized
Categorical column formatting initialized
Distance Calculation initialized
Distance Scaling initialized


We're using cached distances for OSRM for faster distance calculations.

In [5]:
X, y = preprocessing_pipeline.fit_transform(taxi_train_df)

Request failed on attempt 1/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.04054260253906,40.71708679199219;-74.04055023193358,40.717090606689446?overview=false
Request failed on attempt 2/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.04054260253906,40.71708679199219;-74.04055023193358,40.717090606689446?overview=false
Request failed on attempt 3/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.04054260253906,40.71708679199219;-74.04055023193358,40.717090606689446?overview=false
Request failed on attempt 4/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.04054260253906,40.71708679199219;-74.04055023193358,40.717090606689446?overview=false
Maximum number of retries reached. Unable to complete the request.
Request failed on attempt 1/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.00809478759764,40.725883

Request failed on attempt 3/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-73.993896484375,40.75139617919922;-73.99386596679686,40.75139617919922?overview=false
Request failed on attempt 4/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-73.993896484375,40.75139617919922;-73.99386596679686,40.75139617919922?overview=false
Maximum number of retries reached. Unable to complete the request.
Request failed on attempt 1/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.0278091430664,40.753456115722656;-74.02771759033203,40.75362014770508?overview=false
Request failed on attempt 2/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.0278091430664,40.753456115722656;-74.02771759033203,40.75362014770508?overview=false
Request failed on attempt 3/4: 400 Client Error: Bad Request for url: http://127.0.0.1:5000/route/v1/driving/-74.0278091430664,40.753456115722656

In [6]:
pd.concat([X.head(5), y.head(5)], axis=1).T

Unnamed: 0,0,1,2,3,4
vendor_id_1,0.0,1.0,0.0,0.0,0.0
vendor_id_2,1.0,0.0,1.0,1.0,1.0
store_and_fwd_flag_N,1.0,1.0,1.0,1.0,1.0
store_and_fwd_flag_Y,0.0,0.0,0.0,0.0,0.0
pickup_dayofweek_0,1.0,0.0,0.0,0.0,0.0
pickup_dayofweek_1,0.0,0.0,1.0,0.0,0.0
pickup_dayofweek_2,0.0,0.0,0.0,1.0,0.0
pickup_dayofweek_3,0.0,0.0,0.0,0.0,0.0
pickup_dayofweek_4,0.0,0.0,0.0,0.0,0.0
pickup_dayofweek_5,0.0,0.0,0.0,0.0,1.0


## Grid Search for Hyperparams

We're running a grid search with cross validation and logging each set of parameters with their average RMSLE accross folds.

In [8]:
rf_param_grid = {
    "n_estimators": [100],
    "max_depth": [10, 20],
    "min_samples_split": [10],
    "min_samples_leaf": [2, 4],
    "bootstrap": [True],
}
param_combinations = list(product(*rf_param_grid.values()))

In [9]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
kfold = KFold(n_splits=10, shuffle=True, random_state=42)

In [10]:
custom_scoring_rmsle = make_scorer(rmsle, greater_is_better=False)

In [12]:
run_ids = []

with mlflow.start_run():
    for params in param_combinations:
        with mlflow.start_run(nested=True) as nested_run:
            rf_model = RandomForestRegressor(
                **dict(zip(rf_param_grid.keys(), params)),
            )
            scores = cross_val_score(
                rf_model, X, y, cv=kfold, scoring=custom_scoring_rmsle, n_jobs=-1, verbose=2
            )
            mlflow.log_params(dict(zip(rf_param_grid.keys(), params)))
            mlflow.log_metric("mean_RMSLE", np.mean(scores))
            run_ids.append(nested_run.info.run_id)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of  10 | elapsed:  5.9min remaining: 13.7min
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:  5.9min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of  10 | elapsed:  5.8min remaining: 13.6min
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:  5.9min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of  10 | elapsed:  8.8min remaining: 20.6min


[CV] END .................................................... total time= 5.8min
[CV] END .................................................... total time= 5.8min
[CV] END .................................................... total time= 8.8min


[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:  8.8min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of  10 | elapsed:  8.9min remaining: 20.7min


[CV] END .................................................... total time= 5.9min
[CV] END .................................................... total time= 5.8min
[CV] END .................................................... total time= 8.8min
[CV] END .................................................... total time= 8.8min


[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:  8.9min finished


Now we're reading from the logs and fetching the params with the best metrics.

In [13]:
run_metrics = {}

for run_id in run_ids:
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    metrics = run.data.metrics
    run_metrics[run_id] = metrics

for run_id, metrics in run_metrics.items():
    print(f"Run ID: {run_id}")
    for metric, value in metrics.items():
        print(f"\t{metric}: {value}")

Run ID: d86e85e762ae4e8db97591bfabd269ec
	mean_RMSLE: -0.568379426883871
Run ID: 6641dd43a21e40d593643c4a312bcbc2
	mean_RMSLE: -0.5689313157978331
Run ID: ff8f34568acc4d06ad8e4e8d9294762f
	mean_RMSLE: -0.5682192042756247
Run ID: 1ba1f6303d4e43d0863cdfd01143f6fd
	mean_RMSLE: -0.5734012862185802


In [14]:
best_run_id = min(run_metrics, key=lambda x: run_metrics[x]['mean_RMSLE'])
best_params = client.get_run(best_run_id).data.params

print(f"\nBest Run ID: {best_run_id}")
print("Best Parameters:")
for key, value in best_params.items():
    print(f"\t{key}: {value}")


Best Run ID: 1ba1f6303d4e43d0863cdfd01143f6fd
Best Parameters:
	bootstrap: True
	max_depth: 20
	min_samples_leaf: 4
	n_estimators: 100
	min_samples_split: 10


## Inference Pipeline

Now we'll create a small inference pipeline that leverages the best parameters and the preprocessing pipeline we created earlier.

In [15]:
best_params = {
    "bootstrap": True,
    "max_depth": 20,
    "min_samples_leaf": 4,
    "n_estimators": 100,
    "min_samples_split": 10,
}

In [16]:
taxi_test_df = pd.read_csv("data/test.csv")

In [17]:
# the following is so that the pipeline doesn't fail
taxi_test_df["dropoff_datetime"] = None
taxi_test_df["trip_duration"] = None

In [18]:
best_rf_model = RandomForestRegressor(**best_params, n_jobs=-1)
best_rf_model.fit(X_train, y_train)

In [19]:
X_test, _ = preprocessing_pipeline.transform(taxi_test_df.sample(50000))

Request failed on attempt 1/4: HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /route/v1/driving/-73.95964813232422,40.77071762084961;-73.94488525390625,40.791141510009766?overview=false (Caused by NewConnectionError("<urllib3.connection.HTTPConnection object at 0x2c0b9ed20>: Failed to establish a new connection: [Errno 49] Can't assign requested address"))
Request failed on attempt 2/4: HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /route/v1/driving/-73.95964813232422,40.77071762084961;-73.94488525390625,40.791141510009766?overview=false (Caused by NewConnectionError("<urllib3.connection.HTTPConnection object at 0x2c0b9d310>: Failed to establish a new connection: [Errno 49] Can't assign requested address"))
Request failed on attempt 3/4: HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /route/v1/driving/-73.95964813232422,40.77071762084961;-73.94488525390625,40.791141510009766?overview=fal

In [24]:
X_test.head()

Unnamed: 0,vendor_id_1,vendor_id_2,store_and_fwd_flag_N,store_and_fwd_flag_Y,pickup_dayofweek_0,pickup_dayofweek_1,pickup_dayofweek_2,pickup_dayofweek_3,pickup_dayofweek_4,pickup_dayofweek_5,...,pickup_timeofday_9-12,pickup_timeofday_12-15,pickup_timeofday_15-18,pickup_timeofday_18-21,pickup_timeofday_21-24,passenger_count_bucket_0,passenger_count_bucket_1-3,passenger_count_bucket_4-6,passenger_count_bucket_7-9,scaled_distance_osrm
0,1,0,1,0,0,0,1,0,0,0,...,1,0,0,0,0,0,1,0,0,1.860362
1,1,0,1,0,0,0,0,0,1,0,...,0,0,0,0,0,0,1,0,0,-0.615039
2,1,0,1,0,0,0,0,0,0,1,...,0,0,0,0,1,0,1,0,0,1.483559
3,0,1,1,0,0,0,0,0,0,1,...,0,1,0,0,0,0,0,1,0,-0.564627
4,0,1,1,0,0,0,0,0,1,0,...,0,0,0,0,0,0,1,0,0,0.193816


In [20]:
y_test_pred = best_rf_model.predict(X_test)

In [22]:
y_test_pred

array([2310.1957403 ,  241.52656825, 1669.16040869, ...,  321.41050228,
        879.395432  , 1153.55583245])

## Visualizing a path

Now, let's visualize what one of the routes would look like.

In [49]:
import requests
import folium
import polyline
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from shapely.geometry import LineString

In [44]:
def osrm_route(lat1, lon1, lat2, lon2, max_retries=3):
    url = (
        f"http://127.0.0.1:5000/route/v1/driving/{lon1},{lat1};{lon2},{lat2}?steps=true"
    )

    # Configure retries
    retries = Retry(
        total=max_retries, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]
    )

    # Create a session with retry settings
    session = requests.Session()
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    for attempt in range(max_retries + 1):
        try:
            response = session.get(url)
            response.raise_for_status()

            if response.status_code == 200:
                route_info = response.json()["routes"][0]
                return route_info
            else:
                print(f"Error: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"Request failed on attempt {attempt + 1}/{max_retries + 1}: {e}")

    print(f"Maximum number of retries reached. Unable to complete the request.")
    return None

In [40]:
osrm_sample_point = (
    taxi_test_df[taxi_test_df["id"] == "id1447914"].reset_index(drop=True).loc[0]
)
pickup_lat = osrm_sample_point["pickup_latitude"]
pickup_lon = osrm_sample_point["pickup_longitude"]
dropoff_lat = osrm_sample_point["dropoff_latitude"]
dropoff_lon = osrm_sample_point["dropoff_longitude"]

In [57]:
route_info = osrm_route(
    pickup_lat,
    pickup_lon,
    dropoff_lat,
    dropoff_lon,
)
route_geometry = route_info.get("geometry")
osrm_distance = route_info.get("distance") / 1000
map_center = [(pickup_lat + dropoff_lat) / 2, (pickup_lon + dropoff_lon) / 2]
decoded_route = polyline.decode(route_geometry)
shapely_route = LineString(decoded_route)
route_coords = list(shapely_route.coords)

In [58]:
m = folium.Map(
    location=map_center,
    zoom_start=15,
    tiles="https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}",
    attr="Google",
)
folium.Marker(
    [pickup_lat, pickup_lon], popup="Pickup", icon=folium.Icon(color="red")
).add_to(m)
folium.Marker(
    [dropoff_lat, dropoff_lon], popup="Dropoff", icon=folium.Icon(color="green")
).add_to(m)
folium.PolyLine(
    route_coords,
    color="orange",
    weight=4,
    opacity=1,
    dash_array="5, 5",
    popup=f"OSRM Distance: {osrm_distance:.2f} km",
).add_to(m)
m

The image looks as follows if it doesn't render for you:

<img src="images/osrm_inference.png" alt="Euclidean vs. Haversine" width="650">