# Demand Feature Pipeline Notebook

This notebook processes demand data and uploads it to the Hopsworks feature store. It replicates the functionality of the feature_pipeline.py script in an interactive format.

In [1]:
# Import necessary libraries
import pandas as pd
import hopsworks
import os
from datetime import datetime
from dotenv import load_dotenv

  from .autonotebook import tqdm as notebook_tqdm


## Load Environment Variables

We'll load environment variables for Hopsworks connection credentials.

In [9]:
# Load environment variables
load_dotenv()

# Configure parameters (these can be modified as needed)
project_name = 'many_models'
feature_group_name = 'demand_features'
version = 1

## Connect to Hopsworks Feature Store

Establish connection to the Hopsworks Feature Store using credentials from environment variables.

In [3]:
print("Connecting to Hopsworks Feature Store")

# Connect to Hopsworks
project = hopsworks.login(
    host=os.getenv("HOST"),
    port=os.getenv("PORT"),
    api_key_value=os.getenv("HOPSWORKS_API_KEY"),
    project=project_name or os.getenv("PROJECT")
)

fs = project.get_feature_store()
print(f"Connected to feature store in project: {project_name}")

Connecting to Hopsworks Feature Store
2025-05-09 10:04:46,780 INFO: Initializing external client
2025-05-09 10:04:46,781 INFO: Base URL: https://demo.hops.works:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.1) by running 'pip install hopsworks==4.1.*'


2025-05-09 10:04:48,063 INFO: Python Engine initialized.

Logged in to project, explore it here https://demo.hops.works:443/p/14455
Connected to feature store in project: many_models


## Load Source Data

Load the demand data from CSV file and prepare it for the feature store.

In [4]:
print("Loading source data")
demand_df = pd.read_csv('../data/demand_qty_item_loc.csv')

# Display first few rows to inspect the data
display(demand_df.head())

Loading source data


Unnamed: 0,sp_id,loc_id,time_bucket,repetitive_demand_quantity
0,9684698,3,202104,55.0
1,9684698,3,202105,117.0
2,9684698,3,202106,62.0
3,9684698,3,202107,45.0
4,9684698,3,202108,77.0


In [5]:

# Add datetime column for feature store
demand_df['datetime'] = datetime.now()

# Display the transformed dataframe
display(demand_df.head())

Unnamed: 0,sp_id,loc_id,time_bucket,repetitive_demand_quantity,datetime
0,9684698,3,202104,55.0,2025-05-09 10:05:45.219201
1,9684698,3,202105,117.0,2025-05-09 10:05:45.219201
2,9684698,3,202106,62.0,2025-05-09 10:05:45.219201
3,9684698,3,202107,45.0,2025-05-09 10:05:45.219201
4,9684698,3,202108,77.0,2025-05-09 10:05:45.219201


## Create Feature Group and Upload Data

Define the feature group schema and upload the prepared data to the feature store.

In [10]:
print("⬆ Creating/getting feature group")
# Define the feature group
demand_fg = fs.get_or_create_feature_group(
    name=feature_group_name,
    version=version,
    description="Item demand by location and time",
    primary_key=['sp_id', 'loc_id', 'time_bucket'],
    event_time='datetime',
)

⬆ Creating/getting feature group


In [11]:
print("⬆ Uploading data to the Feature Store")
# Upload data to the feature store
demand_fg.insert(demand_df, write_options={"wait_for_job": True})
print("Feature pipeline completed successfully")

⬆ Uploading data to the Feature Store
Feature Group created successfully, explore it at 
https://demo.hops.works:443/p/14455/fs/13379/fg/11279


Uploading Dataframe: 100.00% |██████████| Rows 9600/9600 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: demand_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://demo.hops.works:443/p/14455/jobs/named/demand_features_1_offline_fg_materialization/executions
2025-05-09 10:08:34,542 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-05-09 10:08:37,669 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-09 10:08:40,784 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-09 10:11:25,132 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-05-09 10:11:28,256 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-09 10:11:28,351 INFO: Waiting for log aggregation to finish.
2025-05-09 10:11:36,776 INFO: Execution finished successfully.
Feature pipeline completed successfully
