This notebook is executed hourly by github actions. 
Further details on [.github/workflows/feature_pipeline.yaml](../.github/workflows/feature_pipeline.yaml)

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import src.config as config

In [3]:
from datetime import datetime, timedelta
import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')

# we fetch raw data for the last 90 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=70)

print(f"{fetch_data_from=}")
print(f"{fetch_data_to=}")

fetch_data_from=Timestamp('2023-03-13 23:00:00')
fetch_data_to=Timestamp('2023-05-22 23:00:00')


In [None]:
# from src.data import load_raw_data

# def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
#     """
#     Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
#     """
#     from_date_ = from_date - timedelta(days=7*52)
#     to_date_ = to_date - timedelta(days=7*52)
#     print(f'{from_date=}, {to_date_=}')

#     # download 2 files from website
#     rides = load_raw_data(year=from_date_.year, months=from_date_.month)
#     rides = rides[rides.pickup_datetime >= from_date_]
#     rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
#     rides_2 = rides_2[rides_2.pickup_datetime < to_date_]

#     rides = pd.concat([rides, rides_2])

#     # shift the data to pretend this is recent data
#     rides['pickup_datetime'] += timedelta(days=7*52)

#     rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

#     return rides

In [None]:
# rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

In [4]:
from src.data import load_raw_data
from dateutil.relativedelta import relativedelta


def fetch_batch_raw_data2(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    Modified version of the fetch_batch_raw_data
    """
    dates_to_fetch = pd.date_range(
        start=fetch_data_from, 
        end=fetch_data_to+relativedelta(months=1), 
        freq='M'
    )
    
    rides = pd.DataFrame()
    for date in dates_to_fetch:
        date_ = date - relativedelta(years=1)
        rides_date = load_raw_data(year=date_.year, months=date_.month)
        rides = pd.concat([rides, rides_date])


    # shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides = rides[rides['pickup_datetime'] >= from_date]
    rides = rides[rides['pickup_datetime'] <= to_date]

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [5]:
rides = fetch_batch_raw_data2(from_date=fetch_data_from, to_date=fetch_data_to)

File 2022-03 was already in local storage
File 2022-04 was already in local storage
File 2022-05 was already in local storage


In [6]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
1568225,2023-03-14 03:59:37,1
1568437,2023-03-14 04:56:30,1
1572601,2023-03-14 07:51:53,1
1588516,2023-03-14 10:49:47,1
1602055,2023-03-14 12:11:46,1
...,...,...
3562095,2023-05-22 21:05:49,265
2611550,2023-05-22 21:13:48,265
3562290,2023-05-22 22:05:42,265
2615764,2023-05-22 22:11:07,265


In [7]:
rides.agg({'pickup_datetime': ['min', 'max']})

Unnamed: 0,pickup_datetime
min,2023-03-13 23:00:00
max,2023-05-22 23:00:00


In [8]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 265/265 [00:00<00:00, 411.19it/s]


In [9]:
ts_data.sort_values(by=['pickup_location_id', 'pickup_hour'])

Unnamed: 0,pickup_hour,rides,pickup_location_id
0,2023-03-13 23:00:00,0,1
1,2023-03-14 00:00:00,0,1
2,2023-03-14 01:00:00,0,1
3,2023-03-14 02:00:00,0,1
4,2023-03-14 03:00:00,1,1
...,...,...,...
445460,2023-05-22 19:00:00,2,265
445461,2023-05-22 20:00:00,6,265
445462,2023-05-22 21:00:00,2,265
445463,2023-05-22 22:00:00,3,265


In [10]:
ts_data.agg({'pickup_hour': ['min', 'max']})

Unnamed: 0,pickup_hour
min,2023-03-13 23:00:00
max,2023-05-22 23:00:00


In [None]:
# from src.plot import plot_ts
# plot_ts(ts_data, locations=[43])

In [12]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
# )

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/47255
Connected. Call `.close()` to terminate connection gracefully.


In [12]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |          | Rows 0/445465 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/47255/jobs/named/time_series_hourly_feature_group_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7fe928367370>, None)