# Workflow: Subscribe to DataPoints

This notebook demonstrates how to subscribe to DataPoints via the Data Fabric Client.

**Workflow Steps:**
1. Initialize the client and load connector configuration
2. Discover available DataPoints from the Knowledge Graph
3. Subscribe to a DataPoint (automatically resolves the correct connector)
4. View active subscriptions
5. Unsubscribe from a DataPoint

## Step 1: Initialize Client

In [1]:
import os
import sys
from os import environ

sys.path.append(os.path.abspath(".."))

from client.connector_client import ConnectorClient

# Initialize client with Kafka broker
client = ConnectorClient(bootstrap_servers=[environ.get("KAFKA_BROKER")])

# Load connector configuration from Knowledge Graph
await client.load_connector_config()

# Show available connectors
print("Available connectors:")
client.return_connectors()

Available connectors:


dict_keys(['milling_station', 'assembly_station', 'injection_molding'])

## Step 2: Discover DataPoints

Query the Knowledge Graph to find all available DataPoints with their labels and metadata.

In [2]:
# Namespace definitions (until namespace_resolver is integrated)
DF = "http://stephantrattnig.org/data_fabric_ontology#"
RDFS = "http://www.w3.org/2000/01/rdf-schema#"

# List all DataPoints with their labels, data types, and units
datapoints_df = await client.list_instances(
    class_uri=f"{DF}DataPoint", 
    optional_props=[f"{RDFS}label", f"{DF}hasDataType", f"{DF}hasUnit", f"{DF}dataPointIdentifier"],
    pretty=True
)
datapoints_df

Unnamed: 0,entityIri,entityLabel,entityType,label,hasDataType,hasUnit,dataPointIdentifier
0,d56cae74-6db3-4f2b-a817-f71625771f77,DataPoint: AlignmentOffset,DataPoint,DataPoint: AlignmentOffset,float,millimetre,ns=2;i=57
1,b08d7da3-057e-48a7-98f6-92842a7588f9,DataPoint: SpindleSpeed,DataPoint,DataPoint: SpindleSpeed,float,revolutions per minute,ns=2;i=56
2,3dd43d1f-77e2-4466-a022-86362c0d580d,DataPoint: ClampingForce,DataPoint,DataPoint: ClampingForce,float,newton,ns=2;i=59
3,d4eb1149-2d68-4791-94f5-643b014378ad,DataPoint: InjectionPressure,DataPoint,DataPoint: InjectionPressure,float,bar,ns=2;i=56
4,29d8c9a4-80cd-495a-a60b-91af8e292d1b,DataPoint: FeedRate,DataPoint,DataPoint: FeedRate,float,millimetre per minute,ns=2;i=58
5,e358d34d-b611-42a8-b85c-556730ec8faa,DataPoint: ScrewdriverTorque,DataPoint,DataPoint: ScrewdriverTorque,float,newton metre,ns=2;i=61
6,308e2283-3c01-4c6b-a63a-de9600fcfd4f,DataPoint: MoldTemperature,DataPoint,DataPoint: MoldTemperature,float,degree Celsius,ns=2;i=58
7,7eb48039-fdae-4f89-8112-ef51378b173e,DataPoint: ToolTemperature,DataPoint,DataPoint: ToolTemperature,float,degree Celsius,ns=2;i=60
8,cce9b4d7-01c8-4b44-b241-56a7dcd22a5c,DataPoint: CycleTime,DataPoint,DataPoint: CycleTime,float,second,ns=2;i=63
9,e2189ca8-f21e-41ea-b1d8-4902f0729134,DataPoint: ScrewSpeed,DataPoint,DataPoint: ScrewSpeed,float,revolutions per minute,ns=2;i=60


## Step 3: Subscribe to a DataPoint

Select a DataPoint and subscribe to it. The client automatically:
1. Finds the Device that owns this DataPoint
2. Finds the Connector that manages this Device
3. Sends a subscription request via Kafka
4. Registers the subscription in the Knowledge Graph

In [3]:
# Select a DataPoint by index (change this to explore different datapoints)
DATAPOINT_INDEX = 0

# Build the full URI from the instance UUID
selected_uuid = datapoints_df.iloc[DATAPOINT_INDEX]["entityIri"]
selected_datapoint_uri = f"http://stephantrattnig.org/instances#{selected_uuid}"

print(f"Selected DataPoint: {datapoints_df.iloc[DATAPOINT_INDEX].get('label', selected_uuid)}")
print(f"URI: {selected_datapoint_uri}")

Selected DataPoint: DataPoint: AlignmentOffset
URI: http://stephantrattnig.org/instances#d56cae74-6db3-4f2b-a817-f71625771f77


In [4]:
# Subscribe to the selected DataPoint
result = await client.resolve_and_subscribe(
    datapoint_uri=selected_datapoint_uri,
)
result

({'version': '1.0.0',
  'payload': {'type': 'CommandResponse',
   'base_payload': {'type': 'SubscriptionRegisterResponse',
    'device_origin': 'assembly_station',
    'response': 'Subscription triggered'}},
  'type': 'ResponseMessage',
  'message_id': UUID('f0801352-ee44-40eb-9fb3-15678099074c'),
  'correlation_id': UUID('e033e1ad-675d-4c11-86ba-f842bf9fc66c'),
  'timestamp': datetime.datetime(2025, 12, 23, 17, 33, 20, 943109),
  'status_code': 'Valid',
  'service_id': 'assembly_station'},
 {'version': '1.0.0',
  'payload': {'type': 'CommandResponse',
   'base_payload': {'type': 'SubscriptionRegisterResponse',
    'device_origin': 'MetadataTopic',
    'response': '34afeade-1ade-4a4f-8ae8-a6a0f04caf9f'}},
  'type': 'ResponseMessage',
  'message_id': UUID('55040e4f-d6e1-41d8-a063-b7c5e22b07fa'),
  'correlation_id': UUID('e033e1ad-675d-4c11-86ba-f842bf9fc66c'),
  'timestamp': datetime.datetime(2025, 12, 23, 17, 33, 20, 970856),
  'status_code': 'valid',
  'service_id': 'assembly_station'

## Step 4: View Active Subscriptions

Query the Knowledge Graph to see all active subscriptions and their states.

In [5]:
# List all subscriptions with their states
subscriptions_df = await client.list_instances(
    class_uri=f"{DF}Subscription", 
    optional_props=[f"{RDFS}label", f"{DF}hasSubscriptionState", f"{DF}subscribesToDataPoint"],
    pretty=True
)
subscriptions_df

Unnamed: 0,entityIri,entityLabel,entityType,label,hasSubscriptionState,subscribesToDataPoint
0,723451ad-83ee-4eba-99a7-6b33bedc85a0,Subscription,Subscription,Subscription,Active,d56cae74-6db3-4f2b-a817-f71625771f77
1,e1f7d6cf-c7a3-4882-b851-01a02db0872e,Subscription,Subscription,Subscription,Active,fd09f1e1-d231-4631-b421-16c018481142
2,eb0cb3ad-b1e1-4d29-975a-1d4fd38f2c54,Subscription,Subscription,Subscription,Active,97a93778-78c9-49a0-9805-8eb84cc43d24
3,d5e7c464-3d0c-4488-889e-d9009a80ddf0,Subscription,Subscription,Subscription,Active,edf21258-d150-4cf8-8fb8-2bca2ecc3b99
4,bd85e04c-c3d6-4ca6-911e-853bd9df30e4,Subscription,Subscription,Subscription,Active,edf21258-d150-4cf8-8fb8-2bca2ecc3b99
5,34afeade-1ade-4a4f-8ae8-a6a0f04caf9f,Subscription,Subscription,Subscription,Active,76675a9a-015f-45ff-b3d8-9a7d741e60f9


## Step 5: Unsubscribe from a DataPoint (Experimental)

**Note:** The unsubscribe functionality may not be fully implemented on the connector side yet. This step is included for completeness but may not work as expected.

Select a subscription and unsubscribe from it.

In [6]:
# Select a subscription by index (change this to unsubscribe from different subscriptions)
SUBSCRIPTION_INDEX = 0

# Build the full URI from the instance UUID
selected_subscription_uuid = subscriptions_df.iloc[SUBSCRIPTION_INDEX]["entityIri"]
selected_subscription_uri = f"http://stephantrattnig.org/instances#{selected_subscription_uuid}"

print(f"Selected Subscription: {selected_subscription_uuid}")
print(f"URI: {selected_subscription_uri}")

Selected Subscription: 723451ad-83ee-4eba-99a7-6b33bedc85a0
URI: http://stephantrattnig.org/instances#723451ad-83ee-4eba-99a7-6b33bedc85a0


In [7]:
# Unsubscribe
unsubscribe_result = await client.resolve_and_unsubscribe(
    subscription_uri=selected_subscription_uri
)
unsubscribe_result

723451ad-83ee-4eba-99a7-6b33bedc85a0


JSONDecodeError: Expecting value: line 1 column 1 (char 0)