<div>
    <div>
        <img src="https://scidx.sci.utah.edu/wp-content/uploads/2024/12/logo-sm.png" alt="scidx Logo"/>
        <img src="https://nationaldataplatform.org/National_Data_Platform_horiz_stacked.svg" alt="NDP Logo" width="400" style="padding-left:100px"/>
    </div>
</div>

# SciDX Streaming Capabilities Demonstration: Kafka Stream 

This demonstration showcases the **SciDX Streaming capabilities**, leveraging both the **SciDX NDP Endpoint Library** for managing data objects and the **Streaming Library** for real-time data streaming and processing of a Kafka Stream.

**`SciDX NDP Endpoint Library`:** Used to register and discover data objects (acts as the Data Provider). Interacts with the NDP endpoint API to register and manage data objects. <br>
**`Streaming Library`:** Used to create, manage, and consume real-time data streams (acts as the Data Consumer). Manages real-time data streams, including applying filters and consuming messages.

<br>

## **Setup**: Client preparation

1. **Import necessary modules** for handling data streams:

   a. Import `StreamingClient` from `scidx_streaming` module for handling streaming functionality <br>
   b. Import `APIClient` from `pointofpresence` module for API interactions


In [None]:
from scidx_streaming import StreamingClient
from pointofpresence import APIClient

2. **API Authentication Token Setup**

    1. Navigate to https://token.ndp.utah.edu
    
    2. If not already authenticated:
       - Select the `CILogon` button
       - Choose your institution from the Identity Provider list
       - Complete the institutional login process
    
    3. Upon successful authentication, you will be redirected to token.ndp.utah.edu
    
    4. Locate the `Access Token` field and copy the token value
    
    5. Replace `<your_token>` in the configuration below with your copied access token

In [None]:
TOKEN="<your_token>"

3. **Initialize the API or Kafka client** to register and discover data streams.

    a. `APIClient`: handle data registration and discovery. <br>
    b. `StreamingClient`: handle real-time data streams.

In [None]:
# The URL of the API Client to connect to the NDP endpoint service
API_URL="155.101.6.191:8003"

# Initialize the NDP endpoint client for data registration and discovery
client = APIClient(base_url=API_URL, token=TOKEN)

# Initialize the Streaming client for real-time data streaming
streaming = StreamingClient(client)
print(f"Streaming Client initialized. User ID: {streaming.user_id}")

<br><br>

## **Basic Usage**

**Data Provider**: Register data stream

1. Register the data source metadata.
2. Verify its discoverability.

> In addition to using `client.register_url(metadata)` for API-based and static file-based streams, SciDX NDP Endpoint also supports real-time data ingestion from Kafka topics. To register a Kafka-based data stream, use **`client.register_kafka_topic(metadata)`**.

> **Note:**  
> Each registration requires defining `metadata`, which includes:  
> - **resource_name**: Unique identifier for the resource.  
> - **resource_title**: A descriptive name for the resource.  
> - **owner_org**: The ID of the CKAN organization registering the stream.  
> - **resource_url**: The API endpoint, static file URL, or Kafka connection details.  
> - **file_type**: Type of file (CSV, TXT, JSON, NetCDF, or `stream` for real-time APIs).  
> - **notes**: Optional field for additional information.  
> - **mapping**: Defines how fields from the stream or file should be extracted and renamed.  
> - **processing**: Indicates whether the data requires transformations before ingestion.  
> - **extras**: Additional metadata such as authentication parameters or custom settings.

> **Additional `extras` Configuration for Kafka Streams**  
> The following optional parameters can be included in the `extras` dictionary when registering a Kafka-based data stream using `client.register_kafka_topic(metadata)`:
> 
> - **sasl_mechanism**:  
>   Defines the authentication mechanism for Kafka security.  
>   - Example: `"SCRAM-SHA-512"`  
>   - Required only if connecting to secured Kafka brokers.
> 
> - **security_protocol**:  
>   Specifies the communication protocol used for the Kafka connection.  
>   - Example: `"SASL_SSL"` (for encrypted and authenticated connections)  
>   - Required only for secured Kafka clusters.
> 
> - **auto_offset_reset**:  
>   Controls where the Kafka consumer starts reading when no previous offset is available.  
>   - `"latest"`: Read from the most recent message.  
>   - `"earliest"`: Read from the beginning of the topic.
> 
> - **time_window**:  
>   Maximum time (in seconds) to wait for new messages before considering the stream inactive.  
>   - If no messages arrive within this window, the consumer automatically stops.
> 
> - **batch_interval**:  
>   Time interval (in seconds) for buffering and processing messages in batches.  
>   - Helps optimize performance for high-throughput Kafka topics.
> 
> - **batch_mode**:  
>   Determines how messages containing multiple records are handled.  
>   - `"False"` (default): Each Kafka message is treated as a single record.  
>   - `"True"`: Messages containing arrays of values will be expanded into multiple structured records.  
>     Example:  
>     Input message:
>     ```json
>     {"id": [1, 2, 3], "value": [10, 20, 30]}
>     ```
>     Expanded records:
>     ```json
>     [
>       {"id": 1, "value": 10},
>       {"id": 2, "value": 20},
>       {"id": 3, "value": 30}
>     ]
>     ```

**Data Consumer**:

1. Discover and apply optional filters to registered data sources to create a custom data stream.
2. Subscribe to and consume the custom data stream in real-time.
3. Process and visualize the incoming data dynamically.

<br>

#### **1**. Register a **`Kafka Stream`**

In this step, we will use the **NDP endpoint client**, and the metadata for resgitsering an online **Kafka Stream** into our NDP endpoint.

> #### **Important:**
> The example Kafka topic used here is hosted on the SciDX team’s Kafka server. The topic is named **`timestamp-topic-1`**, and the SciDX team continuously produces timestamped data to this topic every 5 seconds. Each message contains a UTC timestamp and a random measurement value, for example:
>```sh
>[17:29:13] Sent: {'timestamp': '2025-06-02T23:29:13.798342Z', 'measurement': 0.06916792150409812}
>[17:29:18] Sent: {'timestamp': '2025-06-02T23:29:18.922484Z', 'measurement': 0.8232872454560856}
>[17:29:23] Sent: {'timestamp': '2025-06-02T23:29:23.930366Z', 'measurement': 0.28348114450447137}
>[17:29:28] Sent: {'timestamp': '2025-06-02T23:29:28.937124Z', 'measurement': 0.4795100980289084}
>```
> Please note, this is for demonstration purposes. A real-world example could involve a scientific group hosting a publicly accessible Kafka topic. Using the SciDX NDP Endpoint and Streaming libraries, we can streamline data consumption and apply filtered semantics.

In [None]:
# Define the payload data for the Kafka topic registration
kafka_stream_metadata = {
  "dataset_name": "timestamp-example",
  "dataset_title": "timestamp-example",
  "owner_org": "saleem_test",
  "kafka_topic": "timestamp-topic-1",
  "kafka_host": "155.101.6.191",
  "kafka_port": "9092"
}

# Call the register_kafka_topic method to add the Kafka topic
try:
    response = client.register_kafka_topic(kafka_stream_metadata)
    print("Kafka topic registered successfully with ID:", response["id"])
except ValueError as e:
    print("Failed to register Kafka topic.")
    print(f"{e}.")

#### **2**. Search for the registered entry

Now that we have registered data source, we will: 
1. Use the **NDP endpoint client** to search for datasets using the `search_datasets` method.
2. Verify that the **registered data object** is correctly stored and available for discovery.
3. Confirm meta data accuracy before data consumption.

This ensures the dataset is discoverable for use by the Data Consumers.

In [None]:
# Search for the registered Kafka data stream
import json

search_results = client.search_datasets("timestamp-example", server="local")
print(f"Number of datasets found: {len(search_results)}")
print("Search result:\n" + json.dumps(search_results, indent=4))

#### **3**. Create a Data Stream from the registered entry

Now we will leverage the functionalities of `Streaming Client` to create a data stream with the metadata registered by `API Client`.

The `create_kafka_stream` function searches for datasets matching the provided keywords, applies filtering semantics, and creates a real-time Kafka stream for consumption.

Function parameters
- **`keywords`**: List of keywords to filter relevant datasets.
- filter_semantics: Optional, defines filtering criteria for datasets.
- match_all: Optional, if True only data sources with all the keywords will be selected.
- username: Optional username for authentication in protected data sources.
- password: Optional password for authentication in protected data sources.

In [None]:
# Create a Kafka stream data
stream = await streaming.create_kafka_stream(
    keywords=["timestamp-example"],
    match_all=True
)

# Retrieve the stream's topic name
topic = stream.data_stream_id
print(f"Stream created: {topic}")

#### **4**. Consume the Streamed Data 

Now that we have successfully created a Kafka data stream, we transition to real-time data consumption. This step involves:

1. Initializing a kafka consumer: Passing the data stream topic to the consume_kafka_messages function.
2. Listening for incoming messages: Continuously receiving new filtered data in real-time.
3. Processing and updating the data dynamically: Messages are appended to a DataFrame for analysis or visualization.

In [None]:
# Start consuming the filtered Kafka stream
consumer = streaming.consume_kafka_messages(topic)

**Note**: It may take a few seconds for data to NDP endpointulate due to real-time processing.

In [None]:
# After some seconds you can visualize the dataset
consumer.dataframe

#### **5**. Stop Data Consumption and Clean up 

To wrap up, we will: 
1. Stop the data consumer to halt data processing.
2. Delete the created stream from the Kafka topic using the Streaming client.
3. Remove the registered dataset using the NDP endpoint client.

This ensures all resources and background tasks are properly released.

In [None]:
# Stop the Kafka consumer
consumer.stop()

# Delete the Kafka stream - this will cause error
await streaming.delete_stream(stream)

# Delete the registered dataset from the NDP endpoint system
client.delete_resource_by_id(search_results[0]["id"])
print("Cleanup completed: Stream and registered dataset deleted.")