In [2]:
import src.config as config

In [3]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.Timestamp.now(tz="UTC").floor('H')
print(f'{current_date=}')

# we fetch raw data for the last 28 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = max(current_date - timedelta(days=28), pd.Timestamp("2024-01-01", tz="UTC"))
print(fetch_data_from, fetch_data_to)

current_date=Timestamp('2025-03-30 20:00:00+0000', tz='UTC')
2025-03-02 20:00:00+00:00 2025-03-30 20:00:00+00:00


In [4]:
from src.data import load_raw_data
from src.data import merge_geo_and_ts_data


In [5]:
def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simula producción de datos pero ajustándose al rango disponible (desde enero 2024).
    """
    # Intentamos retroceder 52 semanas, pero respetamos el límite mínimo de enero 2024
    from_date_ = max(from_date - timedelta(days=7*52), pd.Timestamp("2024-01-01", tz="UTC"))
    to_date_ = max(to_date - timedelta(days=7*52), pd.Timestamp("2024-01-01", tz="UTC"))
    
    print(f'{from_date=}, {to_date_=}')

    # Cargar solo los datos dentro del rango disponible
    rides = load_raw_data()
    rides = merge_geo_and_ts_data(rides)

    # Solo ajustamos la fecha si no estamos en el límite inferior
    if from_date_ > pd.Timestamp("2024-01-01", tz="UTC"):
        rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location', 'pickup_datetime'], inplace=True)

    return rides



In [6]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

from_date=Timestamp('2025-03-02 20:00:00+0000', tz='UTC'), to_date_=Timestamp('2024-03-31 20:00:00+0000', tz='UTC')


Downloading...
From (original): https://drive.google.com/uc?id=1dj6DhNhELjnxjziXIGlo4FhPneXauFV8
From (redirected): https://drive.google.com/uc?id=1dj6DhNhELjnxjziXIGlo4FhPneXauFV8&confirm=t&uuid=f2a0f751-cc18-4741-a74f-935b0a9ccac8
To: C:\Users\joral_08cedew\chicago_taxi_demand_predictor\data\raw\taxi_trips.parquet
100%|██████████| 437M/437M [00:09<00:00, 45.9MB/s] 


Taxi data downloaded: C:\Users\joral_08cedew\chicago_taxi_demand_predictor\data\raw\taxi_trips.parquet
Date range: 2024-01-01 00:00:00 to 2025-02-01 00:00:00


Downloading...
From: https://drive.google.com/uc?id=1AqIi-XKEuLosLZbMYTGTRbWEOWotz_pZ
To: C:\Users\joral_08cedew\chicago_taxi_demand_predictor\data\raw\chicago_geo_data.parquet
100%|██████████| 1.30M/1.30M [00:00<00:00, 3.70MB/s]


Geo data downloaded: C:\Users\joral_08cedew\chicago_taxi_demand_predictor\data\raw\chicago_geo_data.parquet


In [7]:
rides

Unnamed: 0,pickup_datetime,pickup_latitude,pickup_longitude,pickup_location
121951,2024-12-30 00:00:00,41.968069,-87.721559,Albany Park
418832,2024-12-30 00:30:00,41.968069,-87.721559,Albany Park
276943,2024-12-30 02:45:00,41.968069,-87.721559,Albany Park
250449,2024-12-30 03:00:00,41.968069,-87.721559,Albany Park
347035,2024-12-30 03:15:00,41.968069,-87.721559,Albany Park
...,...,...,...,...
6726001,2026-01-03 21:45:00,41.949140,-87.656804,Wrigleyville
6764720,2026-01-03 21:45:00,41.949140,-87.656804,Wrigleyville
6859776,2026-01-03 21:45:00,41.949140,-87.656804,Wrigleyville
6872223,2026-01-04 00:00:00,41.949140,-87.656804,Wrigleyville


In [8]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 95/95 [00:02<00:00, 37.27it/s]


In [9]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype('int64') // 10**6

In [10]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location', 'pickup_hour'],
    event_time='pickup_hour',
)

2025-03-30 15:33:38,678 INFO: Initializing external client
2025-03-30 15:33:38,679 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-30 15:33:39,930 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1220769


In [12]:
feature_group.insert(ts_data, write_options={"wait_for_job": True})

Uploading Dataframe: 100.00% |██████████| Rows 905255/905255 | Elapsed Time: 00:53 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1220769/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions
2025-03-30 15:46:01,913 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-03-30 15:46:05,080 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-03-30 15:46:49,424 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-03-30 15:49:50,197 INFO: Waiting for log aggregation to finish.
2025-03-30 15:50:51,772 INFO: Execution finished successfully.


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)