In [10]:
from datetime import datetime,timedelta
import hopsworks

from src.data import *
from src.paths import *
from src.config import *

In [2]:
# Fix dates to fetch data for the last 28 days

fetch_data_to = datetime.now().replace(minute=0,second=0,microsecond=0)
fetch_data_from = fetch_data_to - timedelta(days=28)
print("Fetch data for the past 28 days :",fetch_data_from,"to",fetch_data_to)

Fetch data for the past 28 days : 2024-01-24 13:00:00 to 2024-02-21 13:00:00


#### 1. Fetch (a.k.a Simulate a fetch) data every hour from warehouse

- But we dont have access to NYC Association data warehouse
- Instead we simulate a call to DW
    - Fetch 2021's data instead

In [3]:
def fetch_raw_data(fetch_data_from,fetch_data_to):
    fetch_data_from = fetch_data_from - timedelta(days=365*3)
    fetch_data_to = fetch_data_to - timedelta(days=365*3)
    print("Simulating call to DW (Fetch 2021 data) :",fetch_data_from,"to",fetch_data_to)
    
    download_files_raw_monthly(fetch_data_from.year,fetch_data_from.month)
    download_files_raw_monthly(fetch_data_to.year,fetch_data_to.month)

In [4]:
fetch_raw_data(fetch_data_from,fetch_data_to)

Simulating call to DW (Fetch 2021 data) : 2021-01-24 13:00:00 to 2021-02-21 13:00:00
Downloaded files :  ['rides_2021-01.parquet']
Downloaded files :  ['rides_2021-02.parquet']


#### 2. Validate raw data

In [5]:
validate_data_files()

Unnamed: 0,pickup_time,pickup_location
0,2021-01-01 00:30:10,142
1,2021-01-01 00:51:20,238
2,2021-01-01 00:43:30,132
3,2021-01-01 00:15:48,138
4,2021-01-01 00:31:49,68
...,...,...
2741418,2021-02-28 23:02:08,265
2741419,2021-02-28 23:27:00,68
2741420,2021-02-28 23:18:05,68
2741421,2021-02-28 23:41:07,113


#### 3. Transform the validated data into Time Series data

- Pretend that the time series data is the latest one
    - Adjust dates accordingly

In [6]:
taxi_data_ts = transform_to_timeseries()
taxi_data_ts.columns = ['pickup_hour','pickup_location_id','rides']
taxi_data_ts['pickup_hour'] = taxi_data_ts['pickup_hour'] + timedelta(days=365*3)

taxi_data_ts = taxi_data_ts[(taxi_data_ts['pickup_hour'] >= fetch_data_from)&(taxi_data_ts['pickup_hour'] <= fetch_data_to)]
taxi_data_ts.reset_index(drop=True,inplace=True)
taxi_data_ts

Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2024-01-24 13:00:00,120,0
1,2024-01-24 13:00:00,70,8
2,2024-01-24 13:00:00,175,0
3,2024-01-24 13:00:00,121,0
4,2024-01-24 13:00:00,28,1
...,...,...,...
174975,2024-02-21 13:00:00,8,0
174976,2024-02-21 13:00:00,99,0
174977,2024-02-21 13:00:00,111,0
174978,2024-02-21 13:00:00,176,0


#### 4. Hopsworks feature store

In [11]:
### Connecting to Hopsworks Feature Store


hw_project = hopsworks.login(project=HOPSWORKS_PROJECT,api_key_value=HOPSWORKS_API_KEY)
fs = hw_project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/467093
Connected. Call `.close()` to terminate connection gracefully.


In [13]:
### Connect to the Feature Group

fg = fs.get_or_create_feature_group(name=FEATURE_GROUP_NAME,
                             description='Hourly data of taxi demand in NYC in TimeSeries format',
                             version=FEATURE_GROUP_VERSION,
                             primary_key=['pickup_hour','pickup_location_id'],
                             event_time='pickup_hour')

In [14]:
%%time

### Insert data into Feature Group and wait for it to finish

fg.insert(taxi_data_ts)

Uploading Dataframe: 0.00% |          | Rows 0/174980 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: taxi_time_series_hourly_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/467093/jobs/named/taxi_time_series_hourly_1_offline_fg_materialization/executions
CPU times: user 7.17 s, sys: 477 ms, total: 7.65 s
Wall time: 15.5 s


(<hsfs.core.job.Job at 0x12cb45df0>, None)