In [1]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [2]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")

2025-05-07 22:17:19.804643


In [3]:
#type(current_date)

In [4]:
# from datetime import datetime, timedelta, timezone
# current_date = pd.to_datetime(datetime.now(timezone.utc)).floor("h")

In [5]:
# current_date.to_datetime64()

In [6]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)

In [7]:
fetch_data_to

Timestamp('2025-05-07 22:17:19.804643')

In [8]:
fetch_data_from

Timestamp('2025-04-08 22:17:19.804643')

In [9]:
from datetime import datetime, timedelta
from typing import Union
import pandas as pd
from src.data_utils import load_and_process_citibike_data

def fetch_batch_citibike_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e., 1 year).

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: A DataFrame containing the simulated production data.
    """
    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Validate input dates
    if not isinstance(from_date, datetime) or not isinstance(to_date, datetime):
        raise ValueError("Both 'from_date' and 'to_date' must be datetime objects or valid ISO format strings.")
    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    # Load and filter data for the historical period
    rides_from = load_and_process_citibike_data(year=historical_from_date.year, months=[historical_from_date.month])
    rides_from = rides_from[rides_from.pickup_datetime >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_citibike_data(year=historical_to_date.year, months=[historical_to_date.month])
        rides_to = rides_to[rides_to.pickup_datetime < historical_to_date]
        # Combine the filtered data
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from
    # Shift the data forward by 52 weeks to simulate recent data
    rides['pickup_datetime'] += timedelta(weeks=52)

    # Sort the data for consistency
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [10]:

rides = fetch_batch_citibike_data(fetch_data_from, fetch_data_to)

File already exists for 2024-04.
Loading Citi Bike data for 2024-04...
Total records: 79,116
Valid records: 78,948
Records dropped: 168 (0.21%)
Successfully processed data for 2024-04.
Combining all monthly Citi Bike data...
Citi Bike data loading and processing complete!
File already exists for 2024-05.
Loading Citi Bike data for 2024-05...
Total records: 97,479
Valid records: 97,225
Records dropped: 254 (0.26%)
Successfully processed data for 2024-05.
Combining all monthly Citi Bike data...
Citi Bike data loading and processing complete!


In [11]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
6158,2025-04-15 14:40:42,5187.03
5543,2025-04-15 14:27:56,5282.02
53116,2025-04-16 09:23:25,5746.14
5361,2025-04-19 18:15:55,6098.12
2731,2025-04-29 07:53:14,6322.01
...,...,...
69512,2025-05-07 19:23:19,JC116
70916,2025-05-07 19:51:23,JC116
76583,2025-05-07 19:53:07,JC116
70323,2025-05-07 19:54:03,JC116


In [12]:
from src.data_utils import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

In [13]:
ts_data

Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2025-04-08 22:00:00,5187.03,0
1,2025-04-08 23:00:00,5187.03,0
2,2025-04-09 00:00:00,5187.03,0
3,2025-04-09 01:00:00,5187.03,0
4,2025-04-09 02:00:00,5187.03,0
...,...,...,...
66210,2025-05-07 18:00:00,JC116,6
66211,2025-05-07 19:00:00,JC116,8
66212,2025-05-07 20:00:00,JC116,0
66213,2025-05-07 21:00:00,JC116,0


In [14]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 66215 entries, 0 to 66214
Data columns (total 3 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   pickup_hour         66215 non-null  datetime64[ns]
 1   pickup_location_id  66215 non-null  object        
 2   rides               66215 non-null  int32         
dtypes: datetime64[ns](1), int32(1), object(1)
memory usage: 1.3+ MB


In [15]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)



2025-05-07 18:17:23,949 INFO: Initializing external client
2025-05-07 18:17:23,951 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-07 18:17:27,508 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1213633


In [16]:
#feature_store = project.get_feature_store()

In [17]:
# FEATURE_GROUP_NAME = "citi_bike_hourly_features"
# FEATURE_GROUP_VERSION = 1

In [18]:
# feature_group = feature_store.get_or_create_feature_group(
#     name=FEATURE_GROUP_NAME,
#     version=FEATURE_GROUP_VERSION,
#     description="Citi Bike time-series features (hourly)",
#     primary_key=["pickup_location_id", "pickup_hour"],
#     event_time="pickup_hour"
# )

In [19]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |█████████████████████████| Rows 66215/66215 | Elapsed Time: 00:11 | Remaining Time: 00:00
Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/sp25_taxi/Resources/jobs/citi_bike_hourly_features_1_offline_fg_materialization/config_1746580978139) to trigger the materialization job again.


(Job('citi_bike_hourly_features_1_offline_fg_materialization', 'SPARK'), None)