<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 20px;">
    <div style="flex: 0 0 auto; margin-left: 0; margin-bottom: 0;">
        <img src="https://naise.northwestern.edu/images/sage-logo-410x410-1-360x200.jpg" alt="SAGE Logo"/>
    </div>
    <div style="flex: 0 0 auto; margin-left: auto; margin-bottom: 0;">
        <img src="https://nairrpilot.org/app/site/media/ndp.jpg" alt="NDP Logo" width="200"/>
    </div>
</div>

# SciDX Streaming Capabilities Demonstration 

This demonstration showcases the **SciDX Streaming capabilities**, leveraging both the **SciDX POP Library** for managing data objects and the **Streaming Library** for real-time data streaming and processing. 

## Objectives 

We will:
1. **Register SAGE sensor data streams** with the SciDX POP Library (*Data Provider*).
2. **Discover and apply filters** to customize data streams for specific use cases (*Data Consumer*).
3. **Consuming and visualizing real-time data streams**.

## Workflow

Below is a diagram illustrating the interaction between the data provider and consumer in the streaming workflow:

![Data Stream Library](data_stream_library.png) 

### Key Components: 
- **POP Library:** Used to register and discover data objects (acts as the Data Provider). Interacts with the POP API to register and manage data objects.
- **Streaming Library:** Used to create, manage, and consume real-time data streams (acts as the Data Consumer). Manages real-time data streams, including applying filters and consuming messages.

## Step 1: Setting Up the POP (Data Provider) and Streaming (Data Consumer) Clients

In this step, we will:
1. **Import necessary modules** for handling data streams.
2. **Initialize the Point of Presence (POP) client** to manage data registration and discovery.
3. **Initialize the Streaming client** to manage and consume real-time data streams.

In [1]:
# Import the sciDX client and demo-specific modules
from scidx_streaming import StreamingClient
from pointofpresence import APIClient
from sage_demo import sensor_data, filters, API_URL, USERNAME, PASSWORD

Here, we:
1. Initialize the `APIClient` to handle data registration and discovery.
2. Initialize the `StreamingClient` to handle real-time data streams.

In [2]:
# Initialize the POP client for data registration and discovery
client = APIClient(base_url=API_URL, username=USERNAME, password=PASSWORD)

# Initialize the Streaming client for real-time data streaming
streaming = StreamingClient(client)
print(f"Streaming Client initialized. User ID: {streaming.user_id}")

Streaming Client initialized. User ID: d4402055-669b-4ea9-b98b-053877a61ea1


## Step 2: Registering Sensor Data from multiple SAGE nodes (Data Provider)

In this step, we will use the **POP client**, and the metadata for accessing **BME280 sensors data**, to register it into our POP. Each sensor’s data will be registered as a unique resource with its respective URL.

### Data Streams
- **Temperature**
- **Pressure**
- **Humidity**

In [3]:
# Register each sensor data stream from the `sensor_data` list
for sensor in sensor_data:
    client.register_url(sensor)

## Step 3: Search for  the Registered BME280 Sensor Data (Data Provider) 

Now that we have registered the BME280 sensor data, we will:
1. Use the **POP client** to search for datasets using the `search_datasets` method.
2. Verify that the **BME280 sensors** are correctly registered by searching them.

This ensures the datasets are discoverable for use by the Data Consumers.

In [4]:
# Search for all registered BME280 sensor datasets
search_results = client.search_datasets("sage_demo_bme280")
print(f"Number of datasets found: {len(search_results)}")

Number of datasets found: 3


## Step 4: Create a Stream with Filtered Data from the SAGE BME280 registered sensors (Data Consumer)

In this step, we’ll create a Kafka data stream on sciDX with filters to select specific data and apply custom alerts. Here’s how the data is filtered and transformed:

- **State Selection**: Filters to only include data from California, Montana, Oregon, North Dakota, Michigan, and Illinois.
- **Data Mapping**: Maps sensor readings to temperature, pressure, and humidity fields.
- **State Assignment**: Associates each sensor with its corresponding state name.
- **Conditional Alerts**:
  - **Heatwave Alert**: Activates if temperature > 35°C or humidity < 25%.
  - **State-Specific Alerts**: Certain states have unique temperature thresholds that will trigger alerts:
    - Montana: Temperature > 40°C
    - Oregon: Temperature > 30°C
  - **Pressure Alert**: Activates if pressure exceeds 101,000 Pa.
- **Pressure Adjustment**: For certain temperature alerts, reduces pressure readings by 5%.

These filters allow us to isolate meaningful subsets of the data, trigger alerts dynamically, and transform the data stream for more actionable insights.

In [5]:
# Create the Kafka stream using sciDXClient with specified filters for SAGE data
stream = await streaming.create_kafka_stream(
    keywords=["sage_demo"],
    match_all=True,
    filter_semantics=filters
)

# Retrieve the stream's topic name
topic = stream.data_stream_id
print(f"Stream created: {topic}")

Stream created: data_stream_d4402055-669b-4ea9-b98b-053877a61ea1_2


## Step 5: Consuming the Filtered Stream Data 

With the Kafka stream created, we now: 
1. Initialize a **data consumer** using the `consume_kafka_messages` method.
2. Start **real-time consumption** of filtered data.

The consumer continuously listens for incoming messages and populates a dynamic DataFrame. 

**Note**: It may take a few seconds for data to populate due to real-time processing.

In [6]:
# Start consuming Kafka messages from the created topic
consumer = streaming.consume_kafka_messages(topic)

In [8]:
# Display the first 10 rows of the consumed data
consumer.dataframe.head(10)

Unnamed: 0,name,timestamp,value,meta.host,meta.job,meta.node,meta.plugin,meta.sensor,meta.task,meta.vsn,meta.zone,pressure,state,alert,temperature,humidity
0,"[env.pressure, env.pressure]","[2025-01-07T17:51:27.767251921Z, 2025-01-07T17...","[100554.671875, 96221.5]","[000048b02d35a9ce.ws-nxcore, 000048b02d35a97c....","[Pluginctl, Pluginctl]","[000048b02d35a9ce, 000048b02d35a97c]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W085, W08C]","[core, core]","[100554.671875, 96221.5]","[North Dakota, Michigan]","[None, None]",,
1,"[env.temperature, env.temperature]","[2025-01-07T17:51:27.761292574Z, 2025-01-07T17...","[7.48, 15.71]","[000048b02d35a9ce.ws-nxcore, 000048b02d35a97c....","[Pluginctl, Pluginctl]","[000048b02d35a9ce, 000048b02d35a97c]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W085, W08C]","[core, core]",,"[North Dakota, Michigan]","[None, None]","[7.48, 15.71]",
2,"[env.relative_humidity, env.relative_humidity]","[2025-01-07T17:51:27.771586719Z, 2025-01-07T17...","[15.865, 19.495]","[000048b02d35a9ce.ws-nxcore, 000048b02d35a97c....","[Pluginctl, Pluginctl]","[000048b02d35a9ce, 000048b02d35a97c]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W085, W08C]","[core, core]",,"[North Dakota, Michigan]","[Heatwave, Heatwave]",,"[15.865, 19.495]"
3,"[env.temperature, env.temperature]","[2025-01-07T17:51:38.033769632Z, 2025-01-07T17...","[33.9, 15.13]","[000048b02d3ae300.ws-nxcore, 000048b02d3af45d....","[Pluginctl, Pluginctl]","[000048b02d3ae300, 000048b02d3af45d]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W070, W068]","[core, core]",,"[California, Oregon]","[None, None]","[33.9, 15.13]",
4,"[env.relative_humidity, env.relative_humidity]","[2025-01-07T17:51:38.036863814Z, 2025-01-07T17...","[7.702, 55.496]","[000048b02d3ae300.ws-nxcore, 000048b02d3af45d....","[Pluginctl, Pluginctl]","[000048b02d3ae300, 000048b02d3af45d]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W070, W068]","[core, core]",,"[California, Oregon]","[Heatwave, None]",,"[7.702, 55.496]"
5,"[env.pressure, env.pressure]","[2025-01-07T17:51:38.040962168Z, 2025-01-07T17...","[82732.82421800001, 101542.78125]","[000048b02d3ae300.ws-nxcore, 000048b02d3af45d....","[Pluginctl, Pluginctl]","[000048b02d3ae300, 000048b02d3af45d]","[waggle/plugin-iio:0.6.0, waggle/plugin-iio:0....","[bme280, bme280]","[wes-iio-bme280, wes-iio-bme280]","[W070, W068]","[core, core]","[82732.82421800001, 101542.78125]","[California, Oregon]","[None, None]",,


## Step 6: Stopping Data Consumption and Cleaning Up 

To wrap up, we will: 
1. Stop the data consumer to halt data processing.
2. Delete the created stream from the Kafka topic using the Streaming client.
3. Remove the registered dataset using the POP client.

This ensures all resources and background tasks are properly released.

In [9]:
# Stop the Kafka consumer
consumer.stop()

# Delete the Kafka stream
await streaming.delete_stream(stream)

# Delete all registered datasets from the POP system
for result in search_results:
    client.delete_resource_by_id(result["id"])
    print(f"Deleted dataset with ID: {result['id']}")

print("Cleanup completed: All registered datasets deleted.")

Cleanup completed: Stream and registered dataset deleted.
