In [4]:
import os

In [5]:
%pwd

'd:\\Courses\\Pau_ML_course\\Taxi_demand_predictor\\notebooks'

In [6]:
os.chdir("../")

In [7]:
%pwd

'd:\\Courses\\Pau_ML_course\\Taxi_demand_predictor'

### This note book will implement the feature pipeline.

- This is run automatically every hour and is going to fetch a batch of recent data, transform it and save it to the feature store

In [16]:
import src.config as config
import pandas as pd
from datetime import datetime, timedelta

In [17]:
current_date = pd.to_datetime(datetime.utcnow()).floor("H")
print(f"current_date: {current_date}")


# we fetch the raw data for the last 28 days, to add redundancy to our data pipeline.
# redundancy coz in case pipeline failes for a certain hour, it's not gonna break things completely
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days = 28)

current_date: 2023-06-15 18:00:00


In [21]:
print(f"fetch_data_from: {fetch_data_from}")
print(f"fetch_data_to: {fetch_data_to}")

fetch_data_from: 2023-05-18 18:00:00
fetch_data_to: 2023-06-15 18:00:00


### Now, we need to fetch recent data

- We don't have access to NYC assosiation data warehouse, so we will simulate a call to data warehouse

In [24]:
from src.data_processing import load_raw_data

In [25]:
def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """Simulate production data by sampling historical data from 52 weeks ago(i.e. 1 year ago)

    Args:
        from_date (datetime): _description_
        to_date (datetime): _description_

    Returns:
        pd.DataFrame: dataframe of historical data of that time period
    """
    
    from_date_ = from_date - timedelta(days = 7 * 52)
    to_date_ = to_date - timedelta(days = 7 * 52)
    print(f'{from_date=}, {to_date_=}')
    
    # download 2 files from website
    rides = load_raw_data(year = from_date_.year, month_lst = from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year = to_date_.year, month_lst = to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]
    
    
    rides = pd.concat([rides, rides_2])
    
    # shift the data by 1 year to pretend this is recent data
    rides["pickup_datetime"] += timedelta(weeks = 7 * 52)
    
    rides.sort_values(by = ["pickup_location_id" ,"pickup_datetime"], inplace = True)
    
    
    return rides

### Download the raw data of 1 yr back

In [26]:
rides = fetch_batch_raw_data(from_date = fetch_data_from, to_date = fetch_data_to)

from_date=Timestamp('2023-05-18 18:00:00'), to_date_=Timestamp('2022-06-16 18:00:00')
File for 2022-05 already exists in local directory
File for 2022-06 already exists in local directory


### Transform the raw data to time series data

In [27]:
from src.data_processing import transform_raw_data_into_ts_data

In [28]:
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 265/265 [00:09<00:00, 27.62it/s]


### Connect to feature store and feature group where we want to store the data

In [29]:
import hopsworks

In [30]:
# connect to the project
project = hopsworks.login(
    project = config.HOPSWORKS_PROJECT_NAME,
    api_key_value = config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name = config.FEATURE_GROUP_NAME,
    version = config.FEATURE_GROUP_VERSION,
    description = "Time-series data hourly frequency",
    primary_key = ["pickup_location_id", "pickup_hour"],
    event_time = "pickup_hour"
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/59568
Connected. Call `.close()` to terminate connection gracefully.


### Now, insert the data into the feature group

In [32]:
feature_group.insert(ts_data, write_options = {"wait_for_job": False})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/59568/fs/58464/fg/62814


Uploading Dataframe: 0.00% |          | Rows 0/178080 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/59568/jobs/named/time_series_hourly_feature_gorup_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x2468ca6df70>, None)

### In the next notebook, we will automate this execution by running this notebook  automatically every hour. We use github actions for this task