In [1]:
import src.config as config

In [2]:
from datetime import datetime, timedelta
import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

# we fetch raw data for the last 28 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2023-11-02 17:00:00')


In [3]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Fetch a batch of historical raw taxi data, simulating production data by adjusting the dates.
    
    The function retrieves taxi ride data from a historical time frame equivalent to the 
    desired recent dates but shifted back by one year. The output is then adjusted to appear 
    as if it were from the recent time frame by shifting the dates forward by one year.
    
    Parameters:
    - from_date (datetime): The starting date and time for the batch of data to be fetched.
    - to_date (datetime): The ending date and time for the batch of data to be fetched.
    
    Returns:
    - pd.DataFrame: A DataFrame containing the taxi ride data for the specified date range, 
      with the 'pickup_datetime' shifted to simulate current data.
    
    Note:
    - This function is designed to simulate a real-time data fetching scenario where
      historical data is used to represent recent data for testing purposes.
    """
    
    # Calculate the equivalent historical date range one year prior
    from_date_ = from_date - timedelta(weeks=52)
    to_date_ = to_date - timedelta(weeks=52)

    # Fetch the raw data for the starting month of the historical date range
    rides = load_raw_data(year=from_date_.year, months=[from_date_.month])
    rides = rides[rides.pickup_datetime >= from_date_]

    # If the start and end dates span two different months, fetch data for the ending month as well
    if from_date_.month != to_date_.month:
        rides_2 = load_raw_data(year=to_date_.year, months=[to_date_.month])
        rides_2 = rides_2[rides_2.pickup_datetime < to_date_]
        rides = pd.concat([rides, rides_2])  # Combine the data from both periods

    # Adjust the historical data to present dates by shifting forward by one year
    rides['pickup_datetime'] += timedelta(weeks=52)

    # Sort the data by pickup location ID and date-time for consistency
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [4]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

File 2022-10 was already in local storage
File 2022-11 was already in local storage


In [5]:
from src.data import transform_raw_data_into_time_series_data
ts_data = transform_raw_data_into_time_series_data(rides)

100%|██████████| 260/260 [00:00<00:00, 407.52it/s]


In [6]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description='Time-series at hourly frequency',
    primary_key=['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour'
)


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/143509
Connected. Call `.close()` to terminate connection gracefully.


In [9]:
feature_group.insert(ts_data, write_options={'wait_for_job': True})

Uploading Dataframe: 0.00% |          | Rows 0/174720 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/143509/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1e0c7b49f30>, None)