In [62]:
import src.config as config

In [63]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

# we fetch raw data for the last 28 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2024-03-02 03:00:00')


In [64]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    """
    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)
    print(f'{from_date=}, {to_date_=}')

    # download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]

    rides = pd.concat([rides, rides_2])

    # shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [65]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

from_date=Timestamp('2024-02-03 03:00:00'), to_date_=Timestamp('2023-03-04 03:00:00')
File 2023-02 was already in local storage
File 2023-03 was already in local storage


In [66]:
len(rides.pickup_location_id.unique())

257

In [67]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
100%|██████████| 265/265 [00:01<00:00, 249.27it/s]


In [68]:
print(ts_data.pickup_hour.min())
print(ts_data.pickup_hour.max())

2024-02-03 03:00:00
2024-03-02 02:00:00


In [69]:
print(len(ts_data.pickup_hour.unique()))
print(len(ts_data.pickup_location_id.unique()))

672
265


In [70]:
ts_data

Unnamed: 0,pickup_hour,rides,pickup_location_id
0,2024-02-03 03:00:00,0,1
1,2024-02-03 04:00:00,0,1
2,2024-02-03 05:00:00,0,1
3,2024-02-03 06:00:00,0,1
4,2024-02-03 07:00:00,0,1
...,...,...,...
178075,2024-03-01 22:00:00,5,265
178076,2024-03-01 23:00:00,4,265
178077,2024-03-02 00:00:00,0,265
178078,2024-03-02 01:00:00,5,265


In [71]:
df = pd.DataFrame(ts_data.groupby('pickup_location_id')['pickup_hour'].count())
df

Unnamed: 0_level_0,pickup_hour
pickup_location_id,Unnamed: 1_level_1
1,672
2,672
3,672
4,672
5,672
...,...
261,672
262,672
263,672
264,672


In [72]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
# ts_data['pickup_ts'] = ts_data['pickup_hour'].apply(lambda x: x.timestamp()) // 10**6

import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
)
# ts_data['pickup_ts'] = ts_data['pickup_ts'].astype('int64')
feature_group.insert(ts_data, write_options={"wait_for_job": True})


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/423065
Connected. Call `.close()` to terminate connection gracefully.


Uploading Dataframe: 0.00% |          | Rows 0/178080 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/423065/jobs/named/time_series_hourly_feature_group_2_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1fbe8f1d580>, None)