In [1]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [2]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('h')
print(f"{current_date}")

2025-02-06 12:00:00


In [3]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

In [4]:
fetch_data_to

Timestamp('2025-02-06 12:00:00')

In [5]:
fetch_data_from

Timestamp('2025-01-09 12:00:00')

In [6]:
from datetime import datetime, timedelta
from typing import Union
import pandas as pd
from src.data_utils import load_and_process_taxi_data

def fetch_batch_raw_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e., 1 year).

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: A DataFrame containing the simulated production data.
    """
    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Validate input dates
    if not isinstance(from_date, datetime) or not isinstance(to_date, datetime):
        raise ValueError("Both 'from_date' and 'to_date' must be datetime objects or valid ISO format strings.")
    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    # Load and filter data for the historical period
    rides_from = load_and_process_taxi_data(year=historical_from_date.year, months=[historical_from_date.month])
    rides_from = rides_from[rides_from.pickup_datetime >= historical_from_date]

    rides_to = load_and_process_taxi_data(year=historical_to_date.year, months=[historical_to_date.month])
    rides_to = rides_to[rides_to.pickup_datetime < historical_to_date]

    # Combine the filtered data
    rides = pd.concat([rides_from, rides_to], ignore_index=True)

    # Shift the data forward by 52 weeks to simulate recent data
    rides['pickup_datetime'] += timedelta(weeks=52)

    # Sort the data for consistency
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [7]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

File already exists for 2024-01.
Loading data for 2024-01...


Total records: 2,964,624
Valid records: 2,911,483
Records dropped: 53,141 (1.79%)


Successfully processed data for 2024-01.
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-02.
Loading data for 2024-02...


Total records: 3,007,526
Valid records: 2,954,709
Records dropped: 52,817 (1.76%)


Successfully processed data for 2024-02.
Combining all monthly data...
Data loading and processing complete!


In [8]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
9893,2025-01-09 13:09:55,3
76883,2025-01-10 04:38:43,3
77498,2025-01-10 05:34:12,3
78620,2025-01-10 06:37:19,3
84804,2025-01-10 08:40:24,3
...,...,...
2715468,2025-02-06 11:55:11,263
2716734,2025-02-06 11:55:32,263
2715162,2025-02-06 11:56:00,263
2716223,2025-02-06 11:56:58,263


In [9]:
from src.data_utils import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

In [10]:
ts_data

Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2025-01-09 12:00:00,3,0
1,2025-01-09 13:00:00,3,1
2,2025-01-09 14:00:00,3,0
3,2025-01-09 15:00:00,3,0
4,2025-01-09 16:00:00,3,0
...,...,...,...
170683,2025-02-06 07:00:00,263,119
170684,2025-02-06 08:00:00,263,153
170685,2025-02-06 09:00:00,263,131
170686,2025-02-06 10:00:00,263,125


In [11]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 170688 entries, 0 to 170687
Data columns (total 3 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   pickup_hour         170688 non-null  datetime64[ns]
 1   pickup_location_id  170688 non-null  int32         
 2   rides               170688 non-null  int32         
dtypes: datetime64[ns](1), int32(2)
memory usage: 2.6 MB


In [12]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key=['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
)



2025-02-06 07:54:11,178 INFO: Initializing external client


2025-02-06 07:54:11,194 INFO: Base URL: https://c.app.hopsworks.ai:443


2025-02-06 07:54:13,466 INFO: Python Engine initialized.



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1211551


In [13]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |                                  | Rows 0/170688 | Elapsed Time: 00:00 | Remaining Time: ?

Uploading Dataframe: 6.57% |█▋                        | Rows 11214/170688 | Elapsed Time: 00:01 | Remaining Time: 00:14

Uploading Dataframe: 13.12% |███▎                     | Rows 22396/170688 | Elapsed Time: 00:02 | Remaining Time: 00:13

Uploading Dataframe: 19.65% |████▉                    | Rows 33539/170688 | Elapsed Time: 00:03 | Remaining Time: 00:13

Uploading Dataframe: 25.70% |██████▍                  | Rows 43870/170688 | Elapsed Time: 00:04 | Remaining Time: 00:12

Uploading Dataframe: 31.92% |███████▉                 | Rows 54481/170688 | Elapsed Time: 00:05 | Remaining Time: 00:11

Uploading Dataframe: 38.19% |█████████▌               | Rows 65189/170688 | Elapsed Time: 00:06 | Remaining Time: 00:10

Uploading Dataframe: 44.68% |███████████▏             | Rows 76255/170688 | Elapsed Time: 00:07 | Remaining Time: 00:08

Uploading Dataframe: 51.37% |████████████▊            | Rows 87688/170688 | Elapsed Time: 00:08 | Remaining Time: 00:07

Uploading Dataframe: 57.98% |██████████████▍          | Rows 98957/170688 | Elapsed Time: 00:09 | Remaining Time: 00:06

Uploading Dataframe: 64.40% |███████████████▍        | Rows 109919/170688 | Elapsed Time: 00:10 | Remaining Time: 00:05

Uploading Dataframe: 70.78% |████████████████▉       | Rows 120819/170688 | Elapsed Time: 00:11 | Remaining Time: 00:04

Uploading Dataframe: 77.35% |██████████████████▌     | Rows 132029/170688 | Elapsed Time: 00:12 | Remaining Time: 00:03

Uploading Dataframe: 83.75% |████████████████████    | Rows 142959/170688 | Elapsed Time: 00:13 | Remaining Time: 00:02

Uploading Dataframe: 90.13% |█████████████████████▋  | Rows 153849/170688 | Elapsed Time: 00:14 | Remaining Time: 00:01

Uploading Dataframe: 96.43% |███████████████████████▏| Rows 164595/170688 | Elapsed Time: 00:15 | Remaining Time: 00:00

Uploading Dataframe: 100.00% |███████████████████████| Rows 170688/170688 | Elapsed Time: 00:16 | Remaining Time: 00:00




Launching job: time_series_hourly_feature_group_1_offline_fg_materialization


Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1211551/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)