In [1]:
%%bash
tree ..

[01;34m..[0m
├── [00mLICENSE[0m
├── [00mMakefile[0m
├── [00mREADME.md[0m
├── [01;34martifacts[0m
│   └── [00mmodel.pkl[0m
├── [00mconfig.yaml[0m
├── [01;34mdata[0m
│   └── [01;34mtaxi_zones[0m
│       ├── [00mtaxi_zones.dbf[0m
│       ├── [00mtaxi_zones.prj[0m
│       ├── [00mtaxi_zones.sbn[0m
│       ├── [00mtaxi_zones.sbx[0m
│       ├── [00mtaxi_zones.shp[0m
│       ├── [00mtaxi_zones.shp.xml[0m
│       └── [00mtaxi_zones.shx[0m
├── [01;34mnotebooks[0m
│   └── [00mtaxi-demand-forecasting.ipynb[0m
├── [00mpoetry.lock[0m
├── [00mpyproject.toml[0m
└── [01;34msrc[0m
    ├── [00m__init__.py[0m
    ├── [00mapp.py[0m
    ├── [00mconfig.py[0m
    ├── [00mfeature_store_api.py[0m
    ├── [00minference.py[0m
    ├── [00mingest.py[0m
    ├── [00mlogger.py[0m
    ├── [01;34mpipelines[0m
    │   ├── [00mfeature_pipeline.py[0m
    │   └── [00mtraining_pipeline.py[0m
    ├── [00mtrain.py[0m
    ├── [00mtransform.py[0m
    └── [00muti

#### **`Dependencies`**

In [2]:
%load_ext autoreload
%autoreload 2

In [None]:
import warnings

import pandas as pd
import plotly.express as px

from dotenv import load_dotenv
from plotly.graph_objects import Figure

# taxi-demand-forecasting modules
from src.config import Paths
from src.feature_store_api import get_feature_group
from src.inference import generate_forecast
from src.transform import fetch_and_transform
from src.utils import plot_record

warnings.filterwarnings("ignore")
load_dotenv(Paths.ENV)

True

In [4]:
# set the Pandas DataFrame and Series display options
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

#### **`Data ingestion`**

In [5]:
%%bash
# push the latest validated and pre-processed data to Hopsworks
cd ..
make runner_features

poetry install
Installing dependencies from lock file

No dependencies to install or update

Installing the current project: src (0.1.0)
poetry run flake8 src
poetry run python src/pipelines/feature_pipeline.py
see the appropriate new directories, set the environment variable
`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.
The use of platformdirs will be the default in `jupyter_core` v6



2024-11-24 08:27:10.452 | INFO     | src.ingest:download_data:95 - Downloading, validating, and pre-processing https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-10.parquet.
100%|██████████| 256/256 [00:05<00:00, 43.20it/s]
2024-11-24 08:27:30.548 | INFO     | src.ingest:download_data:95 - Downloading, validating, and pre-processing https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet.
100%|██████████| 258/258 [00:05<00:00, 45.57it/s]
2024-11-24 08:27:43.484 | INFO     | __main__:upload_data:43 - Uploading the latest batch of NYC taxi demand data to Hopsworks, Project Name: 'taxi_demand_forecasting', Feature Group: 'hourly_taxi_rides'


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.


Uploading Dataframe: 100.00% |██████████| Rows 82616/82616 | Elapsed Time: 00:08 | Remaining Time: 00:00


Launching job: hourly_taxi_rides_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/903316/jobs/named/hourly_taxi_rides_1_offline_fg_materialization/executions
rm -rf `find . -type d -name __pycache__`


In [6]:
# fetch the latest validated and pre-processed data from Hopsworks
df: pd.DataFrame = get_feature_group().read()
df = (
    df
    .assign(unix_time_ms=pd.to_datetime(df["unix_time_ms"], unit="ms"))
    .rename({"unix_time_ms": "pickup_time"}, axis=1)
    .sort_values(by=["location_id", "pickup_time"])
    .reset_index(drop=True)
)

# confirm that the 'df' pd.DataFrame is free of null values and duplicates
assert df.isna().sum().sum() == 0
assert df.duplicated(subset=["location_id", "pickup_time"]).sum() == 0

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (3.69s) 


In [7]:
# a list of select location IDs
location_ids: list[int] = [43, 90, 107]

# plot the hourly taxi rides for each location ID in the 'location_ids' list
fig: Figure = px.line(
    df.query(f"location_id.isin({location_ids})"),
    x="pickup_time",
    y="n_rides",
    color="location_id",
    labels={
        "pickup_time": "Datetime",
        "n_rides": "Number of taxi rides",
        "location_id": "Location ID"
    },
    title="NYC Hourly Taxi Rides",
    template="plotly_dark"
)
fig.show()

#### **`Data transformation`**

In [None]:
# fetch the latest validated and pre-processed data from Hopsworks, and ...
# transform it into machine learning-ready features and labels
fetch_and_transform()

#### **`Model training and evaluation`**

In [8]:
%%bash
# evaluate the current model on the latest data and replace/update it if necessary
cd ..
make runner_train

poetry install
Installing dependencies from lock file

No dependencies to install or update

Installing the current project: src (0.1.0)
poetry run flake8 src
poetry run python src/pipelines/training_pipeline.py
see the appropriate new directories, set the environment variable
`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.
The use of platformdirs will be the default in `jupyter_core` v6

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.63s) 


2024-11-24 09:51:42.904 | INFO     | src.transform:tabularize_data:24 - Transforming the NYC taxi demand data into features and labels.
100%|██████████| 260/260 [00:11<00:00, 22.88it/s]


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.


100%|██████████| 254/254 [00:00<00:00, 346.25it/s]
2024-11-24 09:51:59.725 | INFO     | __main__:evaluate_model:61 - The current forecasting model is unsatisfactory and will be replaced.


Connection closed.artifact (0 dirs, 3 files)... DONE
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.98s) 


2024-11-24 09:52:05.894 | INFO     | src.transform:tabularize_data:24 - Transforming the NYC taxi demand data into features and labels.
100%|██████████| 260/260 [00:12<00:00, 21.01it/s]
  0%|          | 0/3 [00:00<?, ?it/s]2024-11-24 09:52:18.312 | INFO     | src.train:train_model:129 - Training initiated for the CatBoostRegressor.
 33%|███▎      | 1/3 [00:30<01:00, 30.32s/it]2024-11-24 09:52:48.634 | INFO     | src.train:train_model:129 - Training initiated for the LGBMRegressor.
 67%|██████▋   | 2/3 [00:36<00:16, 16.00s/it]2024-11-24 09:52:54.616 | INFO     | src.train:train_model:129 - Training initiated for the XGBRegressor.
100%|██████████| 3/3 [00:44<00:00, 14.91s/it]
2024-11-24 09:53:03.045 | INFO     | src.train:train_model:179 - Training complete, the CatBoostRegressor produced the lowest average validation set RMSE.


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.


2024-11-24 09:53:04.651 | INFO     | src.train:upload_model:221 - Uploading the 'CatBoostRegressor' to the 'taxi_demand_forecasting' project's Model Registry, under the name, 'one_step_forecaster'.
Uploading model files (0 dirs, 0 files):  17%|█▋        | 1/6 [00:00<00:01,  3.02it/s]
Uploading: 0.000%|          | 0/4674258 elapsed<00:00 remaining<?[A
Uploading: 100.000%|██████████| 4674258/4674258 elapsed<00:03 remaining<00:00[A
Uploading input_example and model_schema:  33%|███▎      | 2/6 [00:03<00:08,  2.15s/it]
Uploading: 0.000%|          | 0/182 elapsed<00:00 remaining<?[A
Uploading: 100.000%|██████████| 182/182 elapsed<00:01 remaining<00:00[A

Uploading: 0.000%|          | 0/2406 elapsed<00:00 remaining<?[A
Uploading: 100.000%|██████████| 2406/2406 elapsed<00:01 remaining<00:00[A
Model export complete: 100%|██████████| 6/6 [00:12<00:00,  2.03s/it]                   


Model created, explore it at https://c.app.hopsworks.ai:443/p/903316/models/one_step_forecaster/1
rm -rf `find . -type d -name __pycache__`


#### **`Inference`**

In [9]:
# generate each location's one-step forecast, i.e., its predicted taxi demand for the upcoming hour
df: pd.DataFrame = fetch_and_transform().pipe(generate_forecast)

# extract the top 10 busiest locations, based on forecasted taxi demand
df = df.sort_values(by="forecast", ascending=False).reset_index(drop=True).head(10)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Reading data from Hopsworks, using Hopsworks Feature Query Service..    

[32m2024-11-24 09:55:45.051[0m | [1mINFO    [0m | [36msrc.transform[0m:[36mtabularize_data[0m:[36m24[0m - [1mTransforming the NYC taxi demand data into features and labels.[0m


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.69s) 


100%|██████████| 260/260 [00:13<00:00, 19.50it/s]

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.






Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/903316
Connected. Call `.close()` to terminate connection gracefully.
Downloading model artifact (0 dirs, 3 files)... DONE

100%|██████████| 254/254 [00:00<00:00, 319.67it/s]


In [10]:
# line plots of the 10 busiest locations, based on forecasted taxi demand
_ = [plot_record(df, location_id).show() for location_id in df["location_id"]]