In [1]:
import hopsworks
import pandas as pd
import json

## Creating a feature group from the feature group with Json Data

#### Login to Hopsworks and fetch feature group

In [2]:
# Login to Hopsworks.
project = hopsworks.login()

# Fetch the feature store.
fs = project.get_feature_store()

# Fetch the feature group
fg_json = fs.get_feature_group(name="fg_raw_event_data", version=1)

2025-03-13 09:46:41,093 INFO: Initializing external client
2025-03-13 09:46:41,093 INFO: Base URL: https://10.87.41.143:28181
2025-03-13 09:46:41,776 INFO: Python Engine initialized.

Logged in to project, explore it here https://10.87.41.143:28181/p/119


#### Read data from the feature group and perform required feature enginerring

In [3]:
# Read from feature group
df = fg_json.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.25s) 


In [7]:
# Extract the json data convert it into a dataframe with required columns
unnested_dataframe = pd.json_normalize(df["data"].apply(lambda x : json.loads(x)))
unnested_dataframe.head(2)

Unnamed: 0,event_time,event_id,user_id,click_count,time_spent,scroll_depth,purchase_completed,checkout_time,ad_interaction
0,2024-06-22 13:00:00,1,14,0,3.26,23.27,0,2024-06-22 17:00:00,
1,2029-10-02 08:00:00,17,3,12,18.08,46.8,1,2029-10-02 12:00:00,


In [11]:
# Perform required feature enginering
unnested_dataframe = unnested_dataframe[["event_time", "event_id", "user_id", "click_count", "time_spent", "scroll_depth", "purchase_completed", "checkout_time"]]

# Convert string datetime to datetime object
unnested_dataframe["event_time"] = pd.to_datetime(unnested_dataframe["event_time"])
unnested_dataframe["checkout_time"] = pd.to_datetime(unnested_dataframe["checkout_time"])

user_event_df = unnested_dataframe[["event_time", "event_id", "user_id", "purchase_completed"]]

events_df = unnested_dataframe[["event_time", "event_id", "click_count", "time_spent", "scroll_depth", "checkout_time"]]

#### Create feature groups

There are two feature groups being created.
1. **An user-events feature group** : This feature group store all events for an user. The feature group has the primary key as `user_id` hence the online feature store will only contain the latest events for the user and the offline feature group will contain all event triggered by the user.
2. **An events feature group**: This feature group will conatin all information regarding the event. This feature group will have the primary key as `event_id`. Hence can be joined with the user-events feature group to create the entire dataframe.

Splitting the data like this also allows creating a seperate *users feature group* which could contain user specific details which can again be joined to create a feature view.

##### Creating an on-demand transformation function

Transformation functions can be created and attached to a feature group to defined on-demand transformation function.

Transformation functions once defined can used across multiple feature groups and also be [saved in the feture store](https://docs.hopsworks.ai/latest/user_guides/fs/transformation_functions/#saving-to-the-feature-store) so that it can be [retrived and used from the feature store](https://docs.hopsworks.ai/latest/user_guides/fs/transformation_functions/#retrieval-from-the-feature-store).

In [33]:
# On-demand transformations can be used to calculate relative timestamp features
@hopsworks.udf(float, mode="pandas")
def time_delta_event_time(event_time, feature):
    return (feature - event_time).dt.seconds/(60*60)




##### Creating a feature group

In [34]:
# Creating user-events feature group
fg_user_events = fs.get_or_create_feature_group(name = "fg_user_events",
                                                version = 1, 
                                                primary_key = ["user_id"],
                                                event_time = ["event_time"],
                                                online_enabled=True)

# Creating events feature group
fg_events = fs.get_or_create_feature_group(name = "fg_events",
                                                version = 1, 
                                                primary_key = ["event_id"],
                                                event_time = ["event_time"],
                                                online_enabled=True,
                                                transformation_functions=[time_delta_event_time("event_time", "checkout_time").alias("time_till_checkout")])


# Inserting data into the feature groups
fg_user_events.insert(user_event_df)
fg_events.insert(events_df)





Uploading Dataframe: 100.00% |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Rows 40/40 | Elapsed Time: 00:00 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/musubi/Resources/jobs/fg_user_events_1_offline_fg_materialization/config_1741857223429) to trigger the materialization job again.

Feature Group created successfully, explore it at 
https://10.87.41.143:28181/p/119/fs/67/fg/39



Uploading Dataframe: 100.00% |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Rows 40/40 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: fg_events_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://10.87.41.143:28181/p/119/jobs/named/fg_events_1_offline_fg_materialization/executions


(Job('fg_events_1_offline_fg_materialization', 'SPARK'), None)

## Creating a feature view and generating train-test data

#### Define a query to join feature groups

The joins performed by Hopsworks are always point in time correct base on event time. Hence you can easily join the the user-events and the users feature groups to create a new feature view that has point int time correct data.

In [37]:
query = fg_user_events.select("purchase_completed").join(fg_events.select_features(), prefix="event_data_", on="event_id")
query.show(5)

2025-03-13 10:16:41,761 INFO: Using ['click_count', 'time_spent', 'scroll_depth', 'checkout_time', 'time_till_checkout'] as features for the query.To include primary key and event time use `select_all`.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.56s) 


Unnamed: 0,purchase_completed,event_data_click_count,event_data_time_spent,event_data_scroll_depth,event_data_checkout_time,event_data_time_till_checkout
0,0,7,7.08,73.65,2028-02-11 04:00:00+00:00,4.0
1,0,17,6.98,32.43,2033-07-27 07:00:00+00:00,1.0
2,1,3,5.01,16.0,2024-09-14 11:00:00+00:00,4.0
3,0,11,13.42,51.76,2033-09-19 05:00:00+00:00,1.0
4,1,7,29.92,13.83,2029-05-30 23:00:00+00:00,0.0


#### Creating feature view

In [38]:
# Import any require model-dependent transformation functions
from hopsworks.hsfs.builtin_transformations import min_max_scaler

# Create feature view
fv = fs.get_or_create_feature_view(name="fv_events", 
                                   version = 1, 
                                   query = query, 
                                   transformation_functions=[
                                       min_max_scaler("event_data_click_count"), 
                                       min_max_scaler("event_data_time_spent"), 
                                       min_max_scaler("event_data_scroll_depth")
                                   ],
                                   labels = ["purchase_completed"])


# Create training data
X_train, X_test, y_train, y_test = fv.train_test_split(test_size = 0.2)

Feature view created successfully, explore it at 
https://10.87.41.143:28181/p/119/fs/67/fv/fv_events/version/1
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.40s) 



## Testing a new feature

You can easily test a new fetaure without disturbing existing feature groups by creating a seperate feature group with only the new tests feature, this feature group can be joined with existing feature groups to create a new feature view. Once testing is done, the new feature can then be appended to an existing feature group can be used to create and backfill an new feature group.

In [39]:
# Fetch the feature group with the raw json
fg_json = fs.get_feature_group(name="fg_raw_event_data", version=1)

# Read the data from the raw feature group.
df = fg_json.read()
unnested_dataframe = pd.json_normalize(df["data"].apply(lambda x : json.loads(x)))
unnested_dataframe["event_time"] = pd.to_datetime(unnested_dataframe["event_time"])
unnested_dataframe["checkout_time"] = pd.to_datetime(unnested_dataframe["checkout_time"])

# Extract the new feature that is required to be tested.
test_df = unnested_dataframe[["event_time", "event_id", "ad_interaction"]]

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.33s) 


In [40]:
# Create the new test feature_group
fg_test = fs.get_or_create_feature_group(name = "fg_test",
                                                version = 1, 
                                                primary_key = ["event_id"],
                                                event_time = ["event_time"],
                                                online_enabled=True)

fg_test.insert(test_df)


Feature Group created successfully, explore it at 
https://10.87.41.143:28181/p/119/fs/67/fg/40



[Aoading Dataframe: 0.00% |                                                                                                                                                          | Rows 0/40 | Elapsed Time: 00:00 | Remaining Time: ?
Uploading Dataframe: 100.00% |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Rows 40/40 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: fg_test_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://10.87.41.143:28181/p/119/jobs/named/fg_test_1_offline_fg_materialization/executions


(Job('fg_test_1_offline_fg_materialization', 'SPARK'), None)

In [43]:
# Create a new testing feature view that joins the feature groups : fg_user_events, fg_events and fg_test
query = fg_user_events.select("purchase_completed").join(fg_events.select_features(), prefix="event_data_", on="event_id").join(fg_test.select_features(), prefix="testing_", on="event_id")
query.show(5)

2025-03-13 10:18:54,012 INFO: Using ['click_count', 'time_spent', 'scroll_depth', 'checkout_time', 'time_till_checkout'] as features for the query.To include primary key and event time use `select_all`.
2025-03-13 10:18:54,014 INFO: Using ['ad_interaction'] as features for the query.To include primary key and event time use `select_all`.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.62s) 


Unnamed: 0,purchase_completed,event_data_click_count,event_data_time_spent,event_data_scroll_depth,event_data_checkout_time,event_data_time_till_checkout,testing_ad_interaction
0,0,7,7.08,73.65,2028-02-11 04:00:00+00:00,4.0,
1,0,17,6.98,32.43,2033-07-27 07:00:00+00:00,1.0,
2,1,3,5.01,16.0,2024-09-14 11:00:00+00:00,4.0,
3,0,11,13.42,51.76,2033-09-19 05:00:00+00:00,1.0,1.0
4,1,7,29.92,13.83,2029-05-30 23:00:00+00:00,0.0,


In [44]:
# Create testing feature view
fv_test = fs.get_or_create_feature_view(name="fv_test", 
                                   version = 1, 
                                   query = query, 
                                   transformation_functions=[
                                       min_max_scaler("event_data_click_count"), 
                                       min_max_scaler("event_data_time_spent"), 
                                       min_max_scaler("event_data_scroll_depth"),
                                       min_max_scaler("testing_ad_interaction")
                                   ],
                                   labels = ["purchase_completed"])

X_train, X_test, y_train, y_test = fv_test.train_test_split(test_size = 0.2)

Feature view created successfully, explore it at 
https://10.87.41.143:28181/p/119/fs/67/fv/fv_test/version/1
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.57s) 



### Appending new features to the feature view
Once a feature has been testing and wants to be included in a feature group it can be done in two ways 
1. Appending the feature to an existing feature group and backfilling the data.
2. Creating a new version of the feature group that contains the new feature and backfilling the data.

#### Option 1: Appending the feature to the fetaure group

##### Appending the feature

In [45]:
# Create the feature to be appended
from hsfs.feature import Feature
features = [Feature(name="ad_interaction",type="double",online_type="double")]

In [46]:
# Appending a new feature to the feature group
fg_events.append_features(features)

2025-03-13 10:19:20,618 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-03-13 10:19:23,717 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-03-13 10:20:25,437 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-03-13 10:20:28,529 INFO: Waiting for execution to finish. Current state: FINISHED. Final status: SUCCEEDED
2025-03-13 10:20:28,716 INFO: Waiting for log aggregation to finish.
2025-03-13 10:20:28,717 INFO: Execution finished successfully.


<hsfs.feature_group.FeatureGroup at 0x10609a5c0>

In [51]:
fg_events.features[6].on_demand

True

##### Backfilling data to the feature group

In [62]:
# Preparing data to be backfilled
df = fg_json.read()
unnested_dataframe = pd.json_normalize(df["data"].apply(lambda x : json.loads(x)))
unnested_dataframe["event_time"] = pd.to_datetime(unnested_dataframe["event_time"])
unnested_dataframe["checkout_time"] = pd.to_datetime(unnested_dataframe["checkout_time"])

events_df = unnested_dataframe[["event_time", "event_id", "click_count", "time_spent", "scroll_depth", "checkout_time", "ad_interaction"]] 

# Insert the data into the feature group
fg_events = fs.get_feature_group(name="fg_events", version=1)
fg_events.insert(events_df)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.38s) 



Uploading Dataframe: 100.00% |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Rows 40/40 | Elapsed Time: 00:00 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/musubi/Resources/jobs/fg_events_1_offline_fg_materialization/config_1741857296206) to trigger the materialization job again.



(Job('fg_events_1_offline_fg_materialization', 'SPARK'), None)

#### Option 2: Creating the new feature group version

##### Creating a new version of the feature group

In [56]:
# Creating events feature group
fg_events_v2 = fs.get_or_create_feature_group(name = "fg_events",
                                                version = 2, 
                                                primary_key = ["event_id"],
                                                event_time = ["event_time"],
                                                online_enabled=True,
                                                transformation_functions=[time_delta_event_time("event_time", "checkout_time").alias("time_till_checkout")])





##### Backfilling data to the feature group

In [59]:
# Preparing data to be backfilled
df = fg_json.read()
unnested_dataframe = pd.json_normalize(df["data"].apply(lambda x : json.loads(x)))
unnested_dataframe["event_time"] = pd.to_datetime(unnested_dataframe["event_time"])
unnested_dataframe["checkout_time"] = pd.to_datetime(unnested_dataframe["checkout_time"])

events_df = unnested_dataframe[["event_time", "event_id", "click_count", "time_spent", "scroll_depth", "checkout_time", "ad_interaction"]]

# Insert the data into the feature group
fg_events_v2.insert(events_df)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.39s) 
Feature Group created successfully, explore it at 
https://10.87.41.143:28181/p/119/fs/67/fg/41



Uploading Dataframe: 100.00% |███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Rows 40/40 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: fg_events_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://10.87.41.143:28181/p/119/jobs/named/fg_events_2_offline_fg_materialization/executions


(Job('fg_events_2_offline_fg_materialization', 'SPARK'), None)