In [2]:
from datetime import datetime, timedelta, timezone
import pandas as pd

current_date = pd.to_datetime(datetime.now(timezone.utc).replace(tzinfo=None)).floor('h')
print(f"{current_date=}")

current_date=Timestamp('2025-02-03 18:00:00')


In [3]:
# fetching last 28 days data
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

In [4]:
print(f"{fetch_data_to=}")
print(f"{fetch_data_from=}")

fetch_data_to=Timestamp('2025-02-03 18:00:00')
fetch_data_from=Timestamp('2025-01-06 18:00:00')


### Fetching raw batch data from the original data source i.e., NYC yellow taxi data website

In [5]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    """

    # we dont have access to recent data, so to simulate fetching data every hour, we will back date to 1 year and simulate fetching from that date

    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)

    # download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]

    rides = pd.concat([rides, rides_2])

    # shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [6]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

File 2024-01 was already in local storage
File 2024-02 was already in local storage


### Transforming the fetched raw data to timeseries data

In [7]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 258/258 [00:00<00:00, 913.61it/s]


In [13]:
ts_data["pickup_location_id"].dtype

dtype('int32')

In [14]:
ts_data["pickup_location_id"] = ts_data["pickup_location_id"].astype("int64")
ts_data["pickup_location_id"].dtype

dtype('int64')

### Make a connection to hopsworks feature store to insert this data

In [15]:
import hopsworks
import src.config as config

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

2025-02-03 13:38:36,437 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-02-03 13:38:36,442 INFO: Initializing external client
2025-02-03 13:38:36,443 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-02-03 13:38:37,587 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1211558


In [16]:
# connect to the feature store

feature_store = project.get_feature_store()
feature_store

<hsfs.feature_store.FeatureStore at 0x16ff12750>

In [17]:
# connect to the feature group

feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time="pickup_hour",
)
feature_group

<hsfs.feature_group.FeatureGroup at 0x16a11ef60>

In [18]:
feature_group.get_feature("pickup_location_id").type

'bigint'

In [19]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 173376/173376 | Elapsed Time: 00:07 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1211558/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)