Import Statements

In [1]:
import sys
import os
from datetime import datetime, timedelta, timezone
from typing import Union
import pandas as pd

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from src.data_utils import load_and_process_taxi_data
from src.data_utils import transform_raw_data_into_ts_data
import src.config as config

Setting current date, and setting timelines for data fetching

In [2]:
current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")
type(current_date)
current_date = pd.to_datetime(datetime.now(timezone.utc)).floor("h")
current_date.to_datetime64()
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)
fetch_data_to
fetch_data_from

2025-03-05 10:38:26.955242


  current_date = pd.to_datetime(datetime.utcnow())


Timestamp('2025-02-04 10:00:00+0000', tz='UTC')

Function definition for fetching data in a batch for specified timeline

In [3]:
def fetch_batch_raw_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks and remove timezone info
    historical_from_date = (from_date - timedelta(weeks=52)).replace(tzinfo=None)
    historical_to_date = (to_date - timedelta(weeks=52)).replace(tzinfo=None)

    rides_from = load_and_process_taxi_data(year=historical_from_date.year, months=[historical_from_date.month])
    rides_from['pickup_datetime'] = rides_from['pickup_datetime'].dt.tz_localize(None)
    rides_from = rides_from[rides_from.pickup_datetime >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_taxi_data(year=historical_to_date.year, months=[historical_to_date.month])
        rides_to['pickup_datetime'] = rides_to['pickup_datetime'].dt.tz_localize(None)
        rides_to = rides_to[rides_to.pickup_datetime < historical_to_date]
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from

    rides['pickup_datetime'] += timedelta(weeks=52)
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)
    
    return rides


rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)
rides.head(5)
ts_data = transform_raw_data_into_ts_data(rides)
ts_data.head(5)
ts_data.info()

File already exists for 2024-02.
Loading data for 2024-02...
Total records: 3,007,526
Valid records: 2,954,709
Records dropped: 52,817 (1.76%)
Successfully processed data for 2024-02.
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-03.
Loading data for 2024-03...
Total records: 3,582,628
Valid records: 3,518,066
Records dropped: 64,562 (1.80%)
Successfully processed data for 2024-03.
Combining all monthly data...
Data loading and processing complete!
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 175392 entries, 0 to 175391
Data columns (total 3 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   pickup_hour         175392 non-null  datetime64[ns]
 1   pickup_location_id  175392 non-null  int16         
 2   rides               175392 non-null  int16         
dtypes: datetime64[ns](1), int16(2)
memory usage: 2.0 MB


Hopsworks Login and loading feature store

In [4]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time series data at hourly freaquency",
    primary_key=["pickup_location_id", "pickup hour"],
    event_time="pickup_hour"
)

  from .autonotebook import tqdm as notebook_tqdm


2025-03-05 05:38:36,191 INFO: Initializing external client
2025-03-05 05:38:36,195 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-05 05:38:37,411 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214689


In [5]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 175392/175392 | Elapsed Time: 00:11 | Remaining Time: 00:00
Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/CDA500P1/Resources/jobs/time_series_hourly_feature_group_1_offline_fg_materialization/config_1741118123424) to trigger the materialization job again.


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)