## Create Feature Pipeline

The first step is to load the configuration file we created called **config.py** so that we can access the HOPSWORKS variables such as the project name, feature group name, feature group version and API key. We do this so that we do not have to keep identifying them over and over again.

In [22]:
import src.config as config

Since we are fetching data in this notebook and doing it every hour, we need to keep track of the current time and the range of dates that we would like to fetch data from. This will fetch the last 28x3 days every hour. We are setting this to be 28x3 because the last available data is from over two months ago. This means we need enough data to cover this gap. 

In [23]:
from datetime import datetime, timedelta, timezone

import pandas as pd

current_date = pd.to_datetime(datetime.now(timezone.utc)).floor('H')
current_date = current_date.replace(tzinfo=None)
print(f'{current_date=}')


# fetch data to the current time (rounded to nearest hour) from three months ago (28 days ago*3 months)
fetch_data_to = current_date
# we need to cover for no data from July until today- 09/26
fetch_data_from = current_date - timedelta(days = 28*3)
print(f'{fetch_data_from=}')

current_date=Timestamp('2024-09-28 19:00:00')
fetch_data_from=Timestamp('2024-07-06 19:00:00')


In [24]:
fetch_data_from

Timestamp('2024-07-06 19:00:00')

For this model, we do not have access to the NYC Taxi Data Warehouse (we cannot get the new rides every hour). For this purpose we will simulate a call which will show how it would work. Since we have access to the rides from a year ago we will pretend that this is the most recent demand that we can fetch. This function will load in data from exactly one year ago and get data for 3 months prior with each month being 28 days long. This data is the simulated new data. We pretend that we run this code every hour to get the newest 28x3 days worth of data but since our data comes from a source that does not update every hour, we use this synthetic data.

In [25]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    """
    # get data from a year ago from today
    from_date_ = from_date - timedelta(days=7*52)
    # get the data to the date which is 28*3 days before from_date_
    to_date_ = to_date - timedelta(days=7*52)
    print(f'{from_date_=}, {to_date_=}')

    # download 2 files from website using our load_raw_data_function - this is July
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides['pickup_datetime'] >= from_date_]

    # this is august
    rides1 = load_raw_data(year=from_date_.year, months= from_date_.month + 1)
    
    # this is september
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2['pickup_datetime'] < to_date_]

    rides = pd.concat([rides, rides1])
    rides = pd.concat([rides, rides_2])

    # shift the data to pretend this is recent data - add a year to each row
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [26]:
# get rides
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

from_date_=Timestamp('2023-07-08 19:00:00'), to_date_=Timestamp('2023-09-30 19:00:00')
File 2023-07 was already in local storage
File 2023-08 was already in local storage
File 2023-09 was already in local storage


In [27]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
601094,2024-07-06 20:42:15,1
605756,2024-07-06 21:35:54,1
608614,2024-07-06 22:00:08,1
610829,2024-07-06 22:50:16,1
620252,2024-07-07 00:24:27,1
...,...,...
2660919,2024-09-28 17:06:13,265
2662010,2024-09-28 17:20:02,265
2664171,2024-09-28 17:25:20,265
2672577,2024-09-28 18:00:33,265


In [28]:
rides[rides['pickup_datetime'] > '2024-08-02']

Unnamed: 0,pickup_datetime,pickup_location_id
299775,2024-08-02 02:13:48,1
347577,2024-08-02 17:23:08,1
369006,2024-08-02 20:11:08,1
393314,2024-08-03 02:34:51,1
399841,2024-08-03 07:43:51,1
...,...,...
2660919,2024-09-28 17:06:13,265
2662010,2024-09-28 17:20:02,265
2664171,2024-09-28 17:25:20,265
2672577,2024-09-28 18:00:33,265


Below we can get a sense of what happened. We called the data warehouse to get 28 days worth of data from today. Since we didnt have the data we used last years data and shifted it to be 28 days ago from today. This is why the **rides** dataframe starts on 08-28-2024 (28 days ago), and ends today. All we did is retrieve 28 days worth of data but since we didnt have it we pretended to have it by creating it. Don't overthink this step, its just getting 28 days worth of *new* data. In practice we would have the real data available to us.

At the time of writing this code - 09.25.2024 the newest data available is from July.

##### Transform the Raw Data into Time Series Data

We now have the raw data and want to transform it into time series data just as we did before. This will be done with the **transform_raw_data_into_ts_data** function from **src.data**.

The older data that we had is already transformed into time series data and loaded into our feature group. This is the new data that we now have to transform.

In [29]:
from src.data import transform_raw_data_into_ts_data

ts_data = transform_raw_data_into_ts_data(rides)

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)



100%|██████████| 265/265 [00:00<00:00, 314.61it/s]


In [30]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].apply(lambda x: x.timestamp() * 1000)

In [31]:
cp = ts_data[ts_data['pickup_location_id'] == 43]

In [32]:
cp[cp['pickup_hour'] > '2024-08-01']

Unnamed: 0,pickup_hour,rides,pickup_location_id,pickup_ts
85278,2024-08-01 01:00:00+00:00,3,43,1.722474e+12
85279,2024-08-01 02:00:00+00:00,5,43,1.722478e+12
85280,2024-08-01 03:00:00+00:00,1,43,1.722481e+12
85281,2024-08-01 04:00:00+00:00,0,43,1.722485e+12
85282,2024-08-01 05:00:00+00:00,5,43,1.722488e+12
...,...,...,...,...
86683,2024-09-28 14:00:00+00:00,155,43,1.727532e+12
86684,2024-09-28 15:00:00+00:00,171,43,1.727536e+12
86685,2024-09-28 16:00:00+00:00,217,43,1.727539e+12
86686,2024-09-28 17:00:00+00:00,173,43,1.727543e+12


In [33]:
ts_data[ts_data['pickup_hour'] > '08-03-2024']

Unnamed: 0,pickup_hour,rides,pickup_location_id,pickup_ts
654,2024-08-03 01:00:00+00:00,0,1,1.722647e+12
655,2024-08-03 02:00:00+00:00,1,1,1.722650e+12
656,2024-08-03 03:00:00+00:00,0,1,1.722654e+12
657,2024-08-03 04:00:00+00:00,0,1,1.722658e+12
658,2024-08-03 05:00:00+00:00,0,1,1.722661e+12
...,...,...,...,...
534235,2024-09-28 14:00:00+00:00,4,265,1.727532e+12
534236,2024-09-28 15:00:00+00:00,2,265,1.727536e+12
534237,2024-09-28 16:00:00+00:00,3,265,1.727539e+12
534238,2024-09-28 17:00:00+00:00,3,265,1.727543e+12


In [34]:
ts_data['pickup_hour'].dtype

datetime64[ns, UTC]

In [35]:
ts_data

Unnamed: 0,pickup_hour,rides,pickup_location_id,pickup_ts
0,2024-07-06 19:00:00+00:00,0,1,1.720292e+12
1,2024-07-06 20:00:00+00:00,1,1,1.720296e+12
2,2024-07-06 21:00:00+00:00,1,1,1.720300e+12
3,2024-07-06 22:00:00+00:00,2,1,1.720303e+12
4,2024-07-06 23:00:00+00:00,0,1,1.720307e+12
...,...,...,...,...
534235,2024-09-28 14:00:00+00:00,4,265,1.727532e+12
534236,2024-09-28 15:00:00+00:00,2,265,1.727536e+12
534237,2024-09-28 16:00:00+00:00,3,265,1.727539e+12
534238,2024-09-28 17:00:00+00:00,3,265,1.727543e+12


##### Connect to Feature Store/Feature Group

This is where we will connect this new data to our feature group.

In [36]:
import hopsworks

# connect to project 
project = hopsworks.login(
    project = config.HOPSWORKS_PROJECT_NAME,
    api_key_value = config.HOPSWORKS_API_KEY
)

# get feature store
feature_store = project.get_feature_store()

# connect to feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description='Hourly time series data',
    primary_key=['pickup_location_id', 'pickup_ts'],
    event_time='pickup_ts'
)


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1049751
Connected. Call `.close()` to terminate connection gracefully.


In [37]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 534240 entries, 0 to 534239
Data columns (total 4 columns):
 #   Column              Non-Null Count   Dtype              
---  ------              --------------   -----              
 0   pickup_hour         534240 non-null  datetime64[ns, UTC]
 1   rides               534240 non-null  int64              
 2   pickup_location_id  534240 non-null  int64              
 3   pickup_ts           534240 non-null  float64            
dtypes: datetime64[ns, UTC](1), float64(1), int64(2)
memory usage: 16.3 MB


In [38]:
# insert this data to the feature group
feature_group.insert(ts_data, write_options={'wait_for_job':False})

Uploading Dataframe: 0.00% |          | Rows 0/534240 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1049751/jobs/named/time_series_hourly_feature_group_2_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1c8a5a9ac50>, None)

#### Automate Running Every Hour

we want this notebook to automatically run every hour. We will do this using GitHub actions. This is done by creating a **.github/workflows** folder which is placed in the parent directory. In this folder we will create a YAML file called **feature_workflows.yaml**. View the file to see the sytax. Help with the syntax can be found at the following links:

+ https://docs.github.com/en/actions/use-cases-and-examples/creating-an-example-workflow

+ https://spacelift.io/blog/github-actions-tutorial

+ https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions

+ https://docs.github.com/en/actions/writing-workflows/quickstart
