In [52]:
import hopsworks
import json

from feldera import FelderaClient, PipelineBuilder

In [53]:
connection = hopsworks.connection()
secrets_api = connection.get_secrets_api()
feldera_api_key = secrets_api.get_secret("feldera").value

In [54]:
client = FelderaClient("https://try.feldera.com", api_key = feldera_api_key)

In [55]:
project = hopsworks.login()

Connection closed.
2024-10-17 23:10:09,773 INFO: Python Engine initialized.

Logged in to project, explore it here https://demo.hops.works/p/123


In [56]:
fs = project.get_feature_store()
windowed_fg = fs.get_feature_group(name="profiles_activity_5m", version=1)

In [57]:
kafka_config = fs._storage_connector_api.get_kafka_connector(fs.id, True).confluent_options()

In [58]:
kafka_config

{'bootstrap.servers': '51.68.86.187:9093',
 'security.protocol': 'SSL',
 'ssl.endpoint.identification.algorithm': 'none',
 'ssl.ca.location': '/tmp/kafka_sc_123_-1_ca_chain.pem',
 'ssl.certificate.location': '/tmp/kafka_sc_123_-1_client_cert.pem',
 'ssl.key.location': '/tmp/kafka_sc_123_-1_client_key.pem'}

In [59]:
KAFKA_INPUT_TOPIC = f"{project.name}_real_time_live_transactions"

# Need to remove the location fields from the Kafka config
# and provide feldera with the actual pem files
ca = kafka_config.pop('ssl.ca.location')
certificate = kafka_config.pop('ssl.certificate.location')
key = kafka_config.pop('ssl.key.location')

kafka_config["ssl.ca.pem"] = open(ca, 'rt').read()
kafka_config["ssl.certificate.pem"] = open(certificate, 'rt').read()
kafka_config["ssl.key.pem"] = open(key, 'rt').read()

In [60]:
# Feldera source config
transaction_source_config = json.dumps({
    "transport": {
        "name": "kafka_input",
        "config": kafka_config | {"topics": [KAFKA_INPUT_TOPIC], "auto.offset.reset": "earliest"}
    },
    "format": {
        "name": "json",
        "config": {
            "update_format": "raw",
            "array": False
        }
    }
})

In [61]:
# Feldera Sync config
def create_sink_config(kafka_config: dict, fg, project_id):
    return kafka_config | {
        "topic": fg._online_topic_name,
        "auto.offset.reset": "earliest",
        "headers": [
            {
                'key': 'projectId',
                'value': str(project_id),
            },
            {
                'key': 'featureGroupId',
                'value': str(fg.id),
            },
            {
                'key': 'subjectId',
                'value': str(fg.subject["id"]),
            },
        ]
    }

In [62]:
windowed_sink_config = json.dumps({
    "transport": {
        "name": "kafka_output",
        "config": create_sink_config(kafka_config, windowed_fg, project.id)
    },
    "format": {
        "name": "avro",
        "config": {
            "schema": windowed_fg.avro_schema,
            "skip_schema_id": True
        }
    }
})

In [63]:
# Create SQL program parameterized by source and sink connnector configurations.
def build_sql(transaction_source_config: str, windowed_sink_config: str) -> str:
    return f"""
    -- get source transactions table    
    CREATE TABLE TRANSACTIONS(
        tid STRING,
        date_time TIMESTAMP,
        account_id STRING,
        amount DOUBLE
    ) WITH (
        'connectors' = '[{transaction_source_config}]'
    );

    -- Create a 5 minutes hopping window aggregation from data from transactions table
    CREATE LOCAL VIEW HOP as 
        SELECT * 
        FROM TABLE(HOP(TABLE TRANSACTIONS, DESCRIPTOR(date_time), INTERVAL 5 MINUTES, INTERVAL 1 MINUTES));
    
    -- Compute aggregates from it
    CREATE VIEW WINDOWED 
    WITH (
        'connectors' = '[{windowed_sink_config}]'
    ) AS
        SELECT
            COUNT(account_id) AS count_trans,
            MIN(amount) AS min_amount,
            MAX(amount) AS max_amount,
            AVG(amount) AS avg_amount,
            window_end AS date_time,
            account_id
        FROM hop
        GROUP BY account_id, window_end;
    """
    

In [64]:
sql = build_sql(transaction_source_config, windowed_sink_config)
pipeline = PipelineBuilder(client, name = "hopsworks_kafka", sql = sql).create_or_replace()

In [65]:
# Start the Feldera pipeline.
pipeline.start()