# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Streaming Feature Pipeline</span>

## 🗒️ This notebook is divided into the following sections:
1. Connect to the Hopsworks AI Lakehouse.
2. Retrieve Feature Groups for departures.
3. Consume departures events from Kafka topic "departures.
4. Process the events and compute windowed aggregations
5. Write the resulting aggregated features to the correspoding Feature Groups.

**NOTE:** Before going through this notebook, the following python scripts need to be executed:
- *setup/feature_group.py*: creates the Feature Groups for departures, and aggregated information of departures.
- *reply_site_departures.py*: fetch real-time departures information from SL's Stockholm Public Transportation services and reply the events to the Kafka topic "departures".


![tutorial-flow](images/01_featuregroups.png)

In [1]:
# connect to Hopsworks

import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()
kafka_api = project.get_kafka_api()

2024-09-20 08:20:56,097 INFO: Python Engine initialized.

Logged in to project, explore it here https://hopsworks0.logicalclocks.com/p/119


In [2]:
# other imports

import json
import pandas as pd
from datetime import datetime, timedelta

In [3]:
# Get departures feature groups
departures_fg = fs.get_feature_group("departures", 2)
departures_agg_30m_fg = fs.get_feature_group("departures_agg_30m", 1)
departures_agg_1h_fg = fs.get_feature_group("departures_agg_1h", 1)
departures_agg_6h_fg = fs.get_feature_group("departures_agg_6h", 1)

### Streaming feature pipeline

We use **QuixStreams**, a streaming processing engine, to consume the departures events and compute the aggregated features.

In [4]:
# Get default Kafka configuration

kafka_config = kafka_api.get_default_config()

def get_consumer_config():
    consumer_config = kafka_config
    consumer_config['default.topic.config'] = {'auto.offset.reset': 'latest'}
    consumer_config['partition.assignment.strategy'] = "cooperative-sticky"
    return consumer_config

def get_producer_config():
    from hsfs.core import kafka_engine
    producer_config = kafka_engine.get_kafka_config(
        departures_fg.feature_store_id, {}
    )
    return producer_config

##### Create QuixStreams Application

In [5]:
from quixstreams import Application, State




In [6]:
# callbacks

def on_consumer_error(*args, **kwargs):
    print("ON CONSUMER ERROR")
    if args is not None:
        print(args)
    if kwargs is not None:
        print(kwargs)
    
def on_processing_error(*args, **kwargs):
    print("ON PROCESSING ERROR")
    if args is not None:
        print(args)
    if kwargs is not None:
        print(kwargs)
    
def on_producer_error(*args, **kwargs):
    print("ON PRODUCER ERROR")
    if args is not None:
        print(args)
    if kwargs is not None:
        print(kwargs)

In [7]:
# Create QuixStreams Application

app = Application(
    broker_address=kafka_config["bootstrap.servers"],
    auto_create_topics=False,
    #loglevel = "DEBUG",
    
    # consumer
    consumer_extra_config=get_consumer_config(),
    consumer_group="my-group-id",
    on_consumer_error=on_consumer_error,
    auto_offset_reset="earliest",
    use_changelog_topics=False,
    
    # producer
    producer_extra_config=get_producer_config(),
    on_producer_error=on_producer_error,
    
    # processing
    on_processing_error=on_processing_error,
)

try:
    app.clear_state()
except:
    pass

In [8]:
# Add consumer topic

input_topic = app.topic(name="departures", value_serializer="json")

**Processing functions and multi_part_insert**

In [9]:
def multi_part_insert(event: dict, feature_group):
    event_df = to_pandas(event)
    with feature_group.multi_part_insert() as writer:
        writer.insert(event_df)
    print(">> Event delived successfully to " + feature_group._online_topic_name + "_" + feature_group.name)
    
def to_pandas(event: dict):
    df = pd.DataFrame(event, index=[0])
    
    # parse to float32
    cols = list(df.columns)
    for col_name in ["departure_id", "departure_agg_id", "site_id", "scheduled", "first_scheduled", "expected", "state", "journey_state", "journey_prediction_state", "late"]:
        if col_name in cols:
            cols.remove(col_name)
    df[cols] = df[cols].astype('float32')
    return df

**Reduce functions for windowed aggregations**

In [10]:
# Initializer and Reducer for first aggregation

def num_state_issues(state):
    return 1 if state in ["CANCELLED", "INHIBITED", "MISSED", "REPLACED"] else 0

def num_journey_state_issues(journey_state):
    return 1 if journey_state in ["SLOWPROGRESS", "NOPROGRESS", "OFFROUTE", "ABORTED", "CANCELLED"] else 0

def num_journey_prediction_state_issues(journey_prediction_state):
    return 1 if journey_prediction_state in ["LOSTCONTACT", "UNRELIABLE"] else 0

def get_earlier_datetime_str(t1, t2):
    min_t = min(datetime.fromisoformat(t1), datetime.fromisoformat(t2))
    return min_t.isoformat()

def initializer_agg(event: dict) -> dict:
    value = {
        "departure_agg_id": event["site_id"] + "-" + event["scheduled"],
        "site_id": event["site_id"],
        "first_scheduled": event["scheduled"],
        "state_issue_count": num_state_issues(event["state"]),
        "journey_state_issue_count": num_journey_state_issues(event["journey_state"]),
        "journey_prediction_state_issue_count": num_journey_prediction_state_issues(event["journey_prediction_state"]),
        "deviations_count_min": event["deviations_count"],
        "deviations_count_max": event["deviations_count"],
        "deviations_count_sum": event["deviations_count"],
        "deviations_count_count": 1,
        "deviations_count_mean": event["deviations_count"],
        "deviations_importance_min": event["deviations_importance_max"],
        "deviations_importance_max": event["deviations_importance_max"],
        "deviations_importance_sum": event["deviations_importance_max"],
        "deviations_importance_count": 1,
        "deviations_importance_mean": event["deviations_importance_max"],
        "late_count": 1 if event["late"] else 0,
    }
    return json.loads(json.dumps(value))

def reducer_agg(aggregated: dict, event: dict) -> dict:
    return {
        "departure_agg_id": aggregated["site_id"] + "-" + aggregated["first_scheduled"],
        "site_id": aggregated["site_id"],
        "first_scheduled": get_earlier_datetime_str(aggregated["first_scheduled"], event["scheduled"]),
        "state_issue_count": aggregated["state_issue_count"] + num_state_issues(event["state"]),
        "journey_state_issue_count": aggregated["journey_state_issue_count"] + num_journey_state_issues(event["journey_state"]),
        "journey_prediction_state_issue_count": aggregated["journey_prediction_state_issue_count"] + num_journey_prediction_state_issues(event["journey_prediction_state"]),
        "deviations_count_min": min(aggregated["deviations_count_min"], event["deviations_count"]),
        "deviations_count_max": max(aggregated["deviations_count_max"], event["deviations_count"]),
        "deviations_count_sum": aggregated["deviations_count_sum"] + event["deviations_count"],
        "deviations_count_count": aggregated["deviations_count_count"] + 1,
        "deviations_count_mean": float(aggregated["deviations_count_sum"] + event["deviations_count"]) / (aggregated["deviations_count_count"] + 1),
        "deviations_importance_min": min(aggregated["deviations_importance_min"], event["deviations_importance_max"]),
        "deviations_importance_max": max(aggregated["deviations_importance_max"], event["deviations_importance_max"]),
        "deviations_importance_sum": aggregated["deviations_importance_sum"] + event["deviations_importance_max"],
        "deviations_importance_count": aggregated["deviations_importance_count"] + 1,
        "deviations_importance_mean": float(aggregated["deviations_importance_sum"] + event["deviations_importance_max"]) / (aggregated["deviations_importance_count"] + 1),
        "late_count": 1 if event["late"] else 0,
    }


# Initializer and Reducer for Accumulative Aggregations

def initializer_acc_agg(event: dict) -> dict:
    return {
        "departure_agg_id": event["site_id"] + "-" + event["first_scheduled"],
        "site_id": event["site_id"],
        "first_scheduled": event["first_scheduled"],
        "state_issue_count": event["state_issue_count"],
        "journey_state_issue_count": event["journey_state_issue_count"],
        "journey_prediction_state_issue_count": event["journey_prediction_state_issue_count"],
        "deviations_count_min": event["deviations_count_min"],
        "deviations_count_max": event["deviations_count_max"],
        "deviations_count_sum": event["deviations_count_sum"],
        "deviations_count_count": 1,
        "deviations_count_mean": event["deviations_count_mean"],
        "deviations_importance_min": event["deviations_importance_min"],
        "deviations_importance_max": event["deviations_importance_max"],
        "deviations_importance_sum": event["deviations_importance_sum"],
        "deviations_importance_count": 1,
        "deviations_importance_mean": event["deviations_importance_mean"],
        "late_count": event["late_count"],
    }

def reducer_acc_agg(aggregated: dict, event: dict) -> dict:
    return {
        "departure_agg_id": aggregated["site_id"] + "-" + aggregated["first_scheduled"],
        "site_id": event["site_id"],
        "first_scheduled": get_earlier_datetime_str(aggregated["first_scheduled"], event["first_scheduled"]),
        "state_issue_count": aggregated["state_issue_count"] + event["state_issue_count"],
        "journey_state_issue_count": aggregated["journey_state_issue_count"] + event["journey_state_issue_count"],
        "journey_prediction_state_issue_count": aggregated["journey_prediction_state_issue_count"] + event["journey_prediction_state_issue_count"],
        "deviations_count_min": min(aggregated["deviations_count_min"], event["deviations_count_min"]),
        "deviations_count_max": max(aggregated["deviations_count_max"], event["deviations_count_max"]),
        "deviations_count_sum": aggregated["deviations_count_sum"] + event["deviations_count_sum"],
        "deviations_count_count": aggregated["deviations_count_count"] + 1,
        "deviations_count_mean": float(aggregated["deviations_count_sum"] + event["deviations_count_mean"]) / (aggregated["deviations_count_count"] + 1),
        "deviations_importance_min": min(aggregated["deviations_importance_min"], event["deviations_importance_min"]),
        "deviations_importance_max": max(aggregated["deviations_importance_max"], event["deviations_importance_max"]),
        "deviations_importance_sum": aggregated["deviations_importance_sum"] + event["deviations_importance_sum"],
        "deviations_importance_count": aggregated["deviations_importance_count"] + 1,
        "deviations_importance_mean": float(aggregated["deviations_importance_sum"] + event["deviations_importance_mean"]) / (aggregated["deviations_importance_count"] + 1),
        "late_count": aggregated["late_count"] + event["late_count"],
    }

**Create a Streaming DataFrame and define the aggregations**

In [11]:
# Create Streaming DataFrame

sdf = app.dataframe(input_topic)

In [12]:
# cast data types
sdf["scheduled"] = sdf["scheduled"].apply(datetime.fromisoformat)
sdf["expected"] = sdf["expected"].apply(datetime.fromisoformat)

# insert all events to departures feature group
sdf = sdf.update(lambda event: multi_part_insert(event, departures_fg))

# drop expected field since it's not needed in the agg fgs
sdf.drop("expected")

<quixstreams.dataframe.dataframe.StreamingDataFrame at 0x7f92e8968100>

In [13]:
#
#  1st Aggregation - 30 Minutes
#

# convert datetime to str - for serialization of the state
sdf["scheduled"] = sdf["scheduled"].apply(lambda t: t.isoformat())

sdf.apply(lambda value: print('Event: ', value))

# perform window 1-minute aggregations
sdf = (
    # Define a tumbling window
    sdf.tumbling_window(timedelta(seconds=15))  # set 5 seconds for demo and debugging

    # Create a "reduce" aggregation with "reducer" and "initializer" functions
    .reduce(reducer=reducer_agg, initializer=initializer_agg)

    # Emit results only for closed windows
    .final()
    
    # extract value
    .apply(lambda result: result["value"])
)

# revert timestamp string to datetime
sdf["first_scheduled"] = sdf["first_scheduled"].apply(datetime.fromisoformat)

# insert to departures_agg feature group
sdf = sdf.update(lambda event: multi_part_insert(event, departures_agg_30m_fg))

In [14]:
#
#  2nd Aggregation - 2 Minutes
#

# convert datetime to str - for serialization of the state
sdf["first_scheduled"] = sdf["first_scheduled"].apply(lambda t: t.isoformat())

# perform window aggregations for 2 minutes
sdf = (
    # Define a tumbling window
    sdf.tumbling_window(timedelta(seconds=30)) # set 10 seconds for demo and debugging

    # Create a "reduce" aggregation with "reducer" and "initializer" functions
    .reduce(reducer=reducer_acc_agg, initializer=initializer_acc_agg)

    # Emit results only for closed windows
    .final()
    
    # extract value
    .apply(lambda result: result["value"])
)

# revert timestamp string to datetime
sdf["first_scheduled"] = sdf["first_scheduled"].apply(datetime.fromisoformat)

# insert to departures_agg FG topic
sdf = sdf.update(lambda event: multi_part_insert(event, departures_agg_1h_fg))

In [15]:
#
#  3rd Aggregation - 3 Minutes
#

# convert datetime to str - for serialization of the state
sdf["first_scheduled"] = sdf["first_scheduled"].apply(lambda t: t.isoformat())

# perform window aggregations for 2 minutes
sdf = (
    # Define a tumbling window
    sdf.tumbling_window(timedelta(seconds=60))  # set 15 seconds for demo and debugging

    # Create a "reduce" aggregation with "reducer" and "initializer" functions
    .reduce(reducer=reducer_acc_agg, initializer=initializer_acc_agg)

    # Emit results only for closed windows
    .final()
    
    # extract value
    .apply(lambda result: result["value"])
)

# revert timestamp string to datetime
sdf["first_scheduled"] = sdf["first_scheduled"].apply(datetime.fromisoformat)

# insert to departures_agg FG topic
sdf = sdf.update(lambda event: multi_part_insert(event, departures_agg_6h_fg))

In [16]:
# start streaming feature pipeline

app.run(sdf)

[2024-09-20 08:20:59,801] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'kafka-cluster-kafka-0.kafka-cluster-kafka-brokers.hopsworks.svc:9092'}" consumer_group="my-group-id" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-09-20 08:20:59,802] [INFO] [quixstreams] : Topics required for this application: "departures"
[2024-09-20 08:20:59,803] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-09-20 08:21:00,101] [INFO] [quixstreams] : Kafka topics validation complete
[2024-09-20 08:21:00,102] [INFO] [quixstreams] : Initializing state directory at "/hopsfs/Jupyter/commute/state/my-group-id"
[2024-09-20 08:21:00,156] [INFO] [quixstreams] : Waiting for incoming messages


KeyboardInterrupt: 

#### Materialize departures and departures aggregated features into the Feature Groups

In [None]:
# start materialization job
departures_fg.materialization_job.run(await_termination=True)
departures_agg_30m_fg.materialization_job.run(await_termination=True)
departures_agg_1h_fg.materialization_job.run(await_termination=True)
departures_agg_6h_fg.materialization_job.run(await_termination=True)

In [None]:
# departures_agg_30m_fg.materialization_job.run(await_termination=False)
# departures_agg_1h_fg.materialization_job.run(await_termination=False)
# departures_agg_6h_fg.materialization_job.run(await_termination=False)