In [8]:
import os, sys

sys.path.append('c:\\Projects\\taxi_predicts\\src\\')

In [9]:
import config as config

In [10]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

#we fetch raw data for the last 28days to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

print(fetch_data_from)

current_date=Timestamp('2024-03-21 08:00:00')
2024-02-22 08:00:00


In [11]:
#we cannot import the current data from data warehouse
# so we simulate the data

from data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    #we simulate production data by sampling historical data from 52weeks ago 

    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)

    print(from_date_)
    print(to_date_)

    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]
    
    rides = pd.concat([rides, rides_2])
    rides['pickup_datetime'] += timedelta(days=7*52)
    #shift the data to pretend this is recent data

    print(rides.head(10))
    print(rides_2.tail(10))

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [12]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

2023-02-23 08:00:00
2023-03-23 08:00:00
File 2023-02 was already in local storage
File 2023-03 was already in local storage
            pickup_datetime  pickup_location_id
2226208 2024-02-22 13:27:20                 264
2227795 2024-02-22 16:05:21                 132
2228201 2024-02-22 17:04:42                 114
2228202 2024-02-22 17:29:24                 170
2228203 2024-02-22 17:47:02                 234
2228204 2024-02-22 17:58:25                 186
2229316 2024-02-22 18:31:35                  13
2229317 2024-02-22 18:50:06                 100
2230480 2024-02-22 08:00:40                 262
2230596 2024-02-22 08:00:37                 234
            pickup_datetime  pickup_location_id
3378594 2023-03-23 07:37:23                  48
3378595 2023-03-23 07:31:42                 229
3378596 2023-03-23 07:55:26                 231
3378597 2023-03-23 07:18:31                 161
3378598 2023-03-23 07:55:42                 262
3378599 2023-03-23 07:36:00                 239
3378600 2023

In [13]:
from data import transform_raw_data_into_ts_data

ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 265/265 [00:00<00:00, 304.35it/s]


In [14]:
print(f'{ts_data.tail(10)}')

               pickup_hour  rides  pickup_location_id
182310 2024-03-20 22:00:00      1                 265
182311 2024-03-20 23:00:00      3                 265
182312 2024-03-21 00:00:00      4                 265
182313 2024-03-21 01:00:00      3                 265
182314 2024-03-21 02:00:00      1                 265
182315 2024-03-21 03:00:00      3                 265
182316 2024-03-21 04:00:00      1                 265
182317 2024-03-21 05:00:00      3                 265
182318 2024-03-21 06:00:00      6                 265
182319 2024-03-21 07:00:00      0                 265


In [15]:
import hopsworks
#now we connect to the feture_store and feature group to store the data

#connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

#connect to the feature store
feature_store = project.get_feature_store()

#connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description='Time-series data at hourly frequency',
    primary_key=['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour'
)



Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/549012
Connected. Call `.close()` to terminate connection gracefully.


In [16]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 182320/182320 | Elapsed Time: 00:17 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549012/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1f60e68cac0>, None)

In [None]:
#now we need to automate this so it runs every hour, we do that in github actions
#this need to work every hour, if not the inference pipeline might brake and it wont get the previous hours predicts and it might fail
