In [77]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [78]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.now())
print(f"{current_date}")

2025-02-17 15:07:09.328675


In [79]:
type(current_date)

pandas._libs.tslibs.timestamps.Timestamp

In [80]:
from datetime import datetime, timedelta, timezone
current_date =current_date.floor("h")

In [81]:
current_date.to_datetime64()

numpy.datetime64('2025-02-17T15:00:00.000000000')

In [82]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)

In [83]:
fetch_data_to

Timestamp('2025-02-17 15:00:00')

In [84]:
fetch_data_from

Timestamp('2025-01-19 15:00:00')

In [85]:
from datetime import datetime, timedelta
from typing import Union
import pandas as pd
from src.data_utils import load_and_process_taxi_data

def fetch_batch_raw_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e., 1 year).

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: A DataFrame containing the simulated production data.
    """
    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Validate input dates
    if not isinstance(from_date, datetime) or not isinstance(to_date, datetime):
        raise ValueError("Both 'from_date' and 'to_date' must be datetime objects or valid ISO format strings.")
    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    # Load and filter data for the historical period
    rides_from = load_and_process_taxi_data(year=historical_from_date.year, months=[historical_from_date.month])
    rides_from = rides_from[rides_from.pickup_datetime >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_taxi_data(year=historical_to_date.year, months=[historical_to_date.month])
        rides_to = rides_to[rides_to.pickup_datetime < historical_to_date]
        # Combine the filtered data
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from
    # Shift the data forward by 52 weeks to simulate recent data
    rides['pickup_datetime'] += timedelta(weeks=52)

    # Sort the data for consistency
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [86]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

File already exists for 2024-01.
Loading data for 2024-01...
Total records: 2,964,624
Valid records: 2,911,483
Records dropped: 53,141 (1.79%)
Successfully processed data for 2024-01.
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-02.
Loading data for 2024-02...
Total records: 3,007,526
Valid records: 2,954,709
Records dropped: 52,817 (1.76%)
Successfully processed data for 2024-02.
Combining all monthly data...
Data loading and processing complete!


In [87]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
2302300,2025-02-12 16:25:44,2
2609001,2025-02-15 16:56:40,2
976417,2025-01-20 07:53:24,3
54835,2025-01-20 10:33:31,3
59165,2025-01-20 11:40:12,3
...,...,...
2765677,2025-02-17 14:57:01,263
2763848,2025-02-17 14:57:08,263
2765237,2025-02-17 14:57:53,263
2762972,2025-02-17 14:57:59,263


In [88]:
from src.data_utils import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

In [89]:
ts_data

Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2025-01-19 15:00:00,2,0
1,2025-01-19 16:00:00,2,0
2,2025-01-19 17:00:00,2,0
3,2025-01-19 18:00:00,2,0
4,2025-01-19 19:00:00,2,0
...,...,...,...
176083,2025-02-17 10:00:00,263,76
176084,2025-02-17 11:00:00,263,61
176085,2025-02-17 12:00:00,263,89
176086,2025-02-17 13:00:00,263,88


In [90]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 176088 entries, 0 to 176087
Data columns (total 3 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   pickup_hour         176088 non-null  datetime64[ns]
 1   pickup_location_id  176088 non-null  int16         
 2   rides               176088 non-null  int16         
dtypes: datetime64[ns](1), int16(2)
memory usage: 2.0 MB


In [91]:
ts_data.tail()

Unnamed: 0,pickup_hour,pickup_location_id,rides
176083,2025-02-17 10:00:00,263,76
176084,2025-02-17 11:00:00,263,61
176085,2025-02-17 12:00:00,263,89
176086,2025-02-17 13:00:00,263,88
176087,2025-02-17 14:00:00,263,88


In [92]:
ts_data['pickup_hour'] = ts_data['pickup_hour'].dt.tz_localize(tz='US/Eastern', ambiguous=True, nonexistent='shift_forward')

In [93]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 176088 entries, 0 to 176087
Data columns (total 3 columns):
 #   Column              Non-Null Count   Dtype                     
---  ------              --------------   -----                     
 0   pickup_hour         176088 non-null  datetime64[ns, US/Eastern]
 1   pickup_location_id  176088 non-null  int16                     
 2   rides               176088 non-null  int16                     
dtypes: datetime64[ns, US/Eastern](1), int16(2)
memory usage: 2.0 MB


In [94]:
ts_data.tail()

Unnamed: 0,pickup_hour,pickup_location_id,rides
176083,2025-02-17 10:00:00-05:00,263,76
176084,2025-02-17 11:00:00-05:00,263,61
176085,2025-02-17 12:00:00-05:00,263,89
176086,2025-02-17 13:00:00-05:00,263,88
176087,2025-02-17 14:00:00-05:00,263,88


In [95]:
import hopsworks

api_key = os.getenv('HOPSWORKS_API_KEY')
project_name = os.getenv('HOPSWORKS_PROJECT_NAME')

# Initialize connection to Hopsworks
project = hopsworks.login(
    api_key_value=api_key,
    project=project_name
)
print(f"Successfully connected to Hopsworks project: {project_name}")

2025-02-17 15:07:20,572 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-02-17 15:07:20,591 INFO: Initializing external client
2025-02-17 15:07:20,591 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-02-17 15:07:21,780 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1212635
Successfully connected to Hopsworks project: nolanphi


In [96]:
feature_store = project.get_feature_store()

In [100]:
FEATURE_GROUP_NAME = "time_series_hourly_feature_group"
FEATURE_GROUP_VERSION = 1

In [101]:
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)

In [102]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 176088/176088 | Elapsed Time: 00:20 | Remaining Time: 00:00


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)