#  scidx_streaming URL Registration

This notebook crawls the public index at:

**https://horel.chpc.utah.edu/data/meop/data/**

It extracts file links (recursively), filters them to only include files whose **filename contains a year ≥ 2020**, and then **registers** those URLs into your **`scidx_streaming`** deployment.

> **Note:** You will need valid credentials and the correct POP/API endpoint for your `scidx_streaming` instance. This notebook includes a dry-run mode so you can verify which URLs would be registered before actually registering them.

In [17]:
from typing import List
from ndp_ep import APIClient
from scidx_streaming import StreamingClient
import os, datetime
import pandas as pd
import msgpack
import blosc
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import MessageSizeTooLargeError
from dotenv import load_dotenv
import os
import time


In [18]:
# ---- Configuration ----
load_dotenv(override=True)


# Registration settings
# read token from .env file
TOKEN = os.getenv("TOKEN")
API_URL = os.getenv("API_URL")
SERVER = os.getenv("SERVER")

# Kafka Configuration
KAFKA_HOST = os.getenv("KAFKA_HOST")
KAFKA_PORT = os.getenv("KAFKA_PORT")
BOOTSTRAP = f"{KAFKA_HOST}:{KAFKA_PORT}"
CHUNK_SIZE = 25_000  # starting rows per message
SOFT_CAP_BYTES = 950_000  # stay under common 1MB broker limit

# initializing ndp_ep APIClient
client = APIClient(base_url=API_URL, token=TOKEN)
streaming = StreamingClient(client)
print(f"Streaming Client initialized. User ID: {streaming.user_id}")
date_time_now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
org_name = "kafka_stream"


Streaming Client initialized. User ID: fc624925-ef09-447d-bf16-378066799275


In [3]:
# Get the list of organizations
organizations = client.list_organizations(server=SERVER)


# If the organization already exists, delete it
if org_name in organizations:
    print(f"Organization '{org_name}' already exists.")
else:
    print(f"Organization '{org_name}' does not exist. Proceeding to create it.")
    # registering organization
    org_data = {
        "name": org_name,
        "title": org_name,
        "description": "Sumaiya test organization for testing purposes",
    }
    try:
        client.register_organization(org_data,server=SERVER)
        print(f"Organization '{org_name}' registered successfully.")
    except ValueError as e:
        print(e)

Organization 'kafka_stream' already exists.


In [19]:
def update_csv_resource(resource_id: str, topic: str) -> dict:
        payload = {
            "topic": topic,
            "status": "active",
            "format": "kafka",
            "url": BOOTSTRAP,
            "description": f"Kafka stream for topic {topic}. This is a general stream without any filters.",
            "name": f"stream_dataset {topic}"
        }
        try:
            response = client.patch_general_dataset(
                dataset_id=resource_id,
                server=SERVER,
                data={"resources": [payload]}
            )
        except Exception as e:
            response = {"error": str(e)}
        return response

def register_in_scidx(resource_name: str, resource_title: str, topic: str) -> dict:
    """Register URL-based data objects in scidx_streaming.
    Replace the body with your actual scidx_streaming client calls.
    """

    # Define the payload data for the Kafka topic registration
    kafka_stream_metadata = {
            "dataset_name": f"kafka_{resource_name}",
            "dataset_title": f"Kafka {resource_title}",
            "owner_org": org_name,
            "kafka_topic": topic,
            "kafka_host": KAFKA_HOST,
            "kafka_port": KAFKA_PORT
        }

        # Call the register_kafka_topic method to add the Kafka topic
    try:
        response = client.register_kafka_topic(kafka_stream_metadata, server=SERVER)
    except ValueError as e:
        print("Failed to register Kafka topic:", e)
        response = {"error": str(e)}
    return response


In [4]:
keywords = ['bus13', 'data is not modified']
keywords = ['bus13_2025_01-csv']

In [20]:
search_result = client.search_datasets(keywords,server=SERVER)
for result in search_result:
    print(f"Found dataset: {result['name']}")

Found dataset: bus13_2025_01-csv


In [21]:
def compress_data(data: dict) -> bytes:
    packed = msgpack.packb(data, use_bin_type=True)
    return blosc.compress(packed, cname="zstd", clevel=5, shuffle=blosc.SHUFFLE)

def stream_register(list_of_keywords: list):
    search_result = client.search_datasets(list_of_keywords, server=SERVER)
    print(f"Search result count: {len(search_result)}")
    topics = []

    for dataset in search_result[:1]:
        print(dataset)
        resource_id = dataset["id"]
        resource_url = dataset["resources"][0]["url"]
        resource_name = dataset["resources"][0]["name"]
        resource_title = dataset["title"]
        print(f"Found dataset: {resource_name} - {resource_url}")

        df = pd.read_csv(resource_url, low_memory=False)
        total_rows = len(df)
        print(f"Loaded CSV with {total_rows} rows and {len(df.columns)} columns")

        # Kafka Producer
        producer = KafkaProducer(
            bootstrap_servers=BOOTSTRAP,
            acks="all",
            linger_ms=0,
            max_request_size=5 * 1024 * 1024,  # client cap; broker may be lower
        )

        key = resource_url.encode("utf-8")
        topic = resource_name
        topics.append(topic)

        # ----- adaptive loop (replaces the for-range loop) -----
        i = 0
        chunk_size = CHUNK_SIZE
        min_rows = 1

        while i < total_rows:
            j = min(i + chunk_size, total_rows)
            chunk = df.iloc[i:j]

            payload = {
                "values": chunk.to_dict(orient="list"),
                "stream_info": {
                    "source_url": resource_url,
                    "rows": int(len(chunk)),
                    "cols": list(chunk.columns),
                    "chunk_index": int(i // max(1, chunk_size)),
                    "start_row": int(i),
                    "end_row": int(j - 1),
                    "encoding": "msgpack+blosc(zstd5,shuffle)",
                },
            }
            blob = compress_data(payload)

            # pre-shrink if we're near/over a conservative cap
            if len(blob) > SOFT_CAP_BYTES and len(chunk) > min_rows:
                ratio = (SOFT_CAP_BYTES * 0.85) / len(blob)
                new_size = max(min_rows, int(len(chunk) * max(0.10, min(0.80, ratio))))
                print(f"Chunk ~{len(blob)} bytes > cap; reducing rows {len(chunk)} → {new_size} and retrying.")
                chunk_size = new_size
                continue  # retry same offset

            try:
                producer.send(topic, key=key, value=blob).get(timeout=30)
                print(f"Sent chunk {i // max(1, chunk_size)} with {len(chunk)} rows (compressed: {len(blob)} bytes)")
                i = j  # advance
            except MessageSizeTooLargeError:
                if len(chunk) <= min_rows:
                    # a single row is too large even after compression → cannot proceed
                    raise
                # halve and retry same offset
                new_size = max(min_rows, len(chunk) // 2)
                print(f"Broker rejected message (too large). Reducing rows {len(chunk)} → {new_size} and retrying.")
                chunk_size = new_size
                # loop continues with same i

        producer.flush()

        response = register_in_scidx(resource_name, resource_title, topic)

        # if you want to update the resources field of the ckan registry
        # response = update_csv_resource(resource_id, topic)

        print(response)

    return topics

# Example call
topics = stream_register(keywords)


Search result count: 1
{'id': 'f7d4999e-8998-42c6-afa3-1428a369ef7d', 'name': 'bus13_2025_01-csv', 'title': 'Sensor Data – BUS13 2025 01', 'owner_org': 'ebus_data', 'notes': "This dataset is available at https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv. Vehicle: Bus 13 Data period: 2025-01 File type: CSV File marked 'meop' (Mobile Environment Observation Platform) where sensors are attached to UTA. Data processing level: data is not modified.", 'resources': [{'id': '6531df3a-656f-41af-9dc9-8a062fa8490e', 'url': 'https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv', 'name': 'bus13_2025_01-csv', 'description': 'Resource pointing to https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv', 'format': 'url'}], 'extras': {'file_type': 'CSV'}}
Found dataset: bus13_2025_01-csv - https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv
Loaded CSV with 288066 rows and 27 columns
Sent chunk 0 with 25000 rows (compressed: 409715 bytes)
Sent chunk 1 with 25000 rows (co

In [None]:
def try_decompress(blob: bytes):
    try:
        unpacked = blosc.decompress(blob)
        return msgpack.unpackb(unpacked, raw=False)
    except Exception:
        return None  # Not a compressed binary message

def stream_consumption(topics: List[str], silence_timeout: int = 30):
    for topic in topics:
        print(f"Listening to Kafka topic {topic}")
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=f"{BOOTSTRAP}",
            auto_offset_reset='earliest',
            group_id=None,
            value_deserializer=lambda x: x  # Raw bytes; decode manually
        )

        seen_chunks = set()
        last_msg_time = time.time()

        while True:
            msg_pack = consumer.poll(timeout_ms=1000)  # Poll every second
            if not msg_pack:
                if time.time() - last_msg_time > silence_timeout:
                    print(f"No new messages for {silence_timeout} seconds. Stopping consumption.")
                    break
                continue

            for tp, messages in msg_pack.items():
                for message in messages:
                    raw = message.value
                    last_msg_time = time.time()

                    data = try_decompress(raw)
                    if data:
                        info = data.get("stream_info", {})
                        print(info)
                        print(f"Source: {info.get('source_url', 'N/A')}")
                        print(f"Binary chunk received: {info.get('rows', '?')} rows")
                        print(f"Columns: {info.get('cols', '?')}")
                        chunk_index = info.get('chunk_index', 'N/A')
                        print(f"Chunk index: {chunk_index}")
                        if chunk_index in seen_chunks:
                            print("Duplicate chunk index detected; stopping consumption.")
                            consumer.close()
                            return
                        seen_chunks.add(chunk_index)

                        preview = list(zip(*data["values"].values()))[:3]
                        for row in preview:
                            print("→", row)
                    else:
                        try:
                            text = raw.decode("utf-8")
                            print(f"Text message: {text}")
                        except UnicodeDecodeError as e:
                            print(f"Unrecognized message format: {e}")

        consumer.close()

stream_consumption(["bus13_2025_01-csv"])


Listening to Kafka topic bus13_2025_01-csv
{'source_url': 'https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv', 'rows': 25000, 'cols': ['Timestamp', 'Latitude', 'Longitude', 'Elevation', 'GPS_Speed', 'GPS_Direction', 'GPS_RMC_Valid', 'Battery_Voltage', 'Bus_Box_Temperature', 'Bus_Top_Temperature', 'Bus_Top_Relative_Humidity', 'ES405_PM1_Concentration', 'ES405_PM2.5_Concentration', 'ES405_PM4_Concentration', 'ES405_PM10_Concentration', 'ES405_Air_Flow_Rate', 'ES405_Internal_Air_Temperature', 'ES405_Internal_Relative_Humidity', 'ES405_Internal_Air_Pressure', 'ES405_Error_Code', '2B_Ozone_Concentration', '2B_Air_Flow_Rate', '2B_Internal_Air_Temperature', '2B_Internal_Air_Pressure', 'PM2.5_Data_Flagged', 'Ozone_Data_Flagged', 'GPS_Data_Flagged'], 'chunk_index': 0, 'start_row': 0, 'end_row': 24999, 'encoding': 'msgpack+blosc(zstd5,shuffle)'}
Source: https://horel.chpc.utah.edu/data/meop/data/BUS13_2025_01.csv
Binary chunk received: 25000 rows
Columns: ['Timestamp', 'Latitude', 'Lo

In [22]:
print(topics)

['bus13_2025_01-csv']


In [None]:
stream = await streaming.create_kafka_stream(
    keywords=['bus13_2025_01-csv'],
    match_all=True,
    filter_semantics=[]
)

topic = stream.data_stream_id
print(f"Stream created: {topic}")

In [25]:
consumer = streaming.consume_kafka_messages("bus13_2025_01-csv")

In [26]:
# Get the data from the consumer
df=consumer.dataframe
df = pd.DataFrame(df)
df = pd.DataFrame(df.iloc[0].to_dict())
df.reset_index(drop=True, inplace=True)

print(df)

                 Timestamp   Latitude    Longitude Elevation GPS_Speed  \
0                      UTC       ddeg         ddeg         m       m/s   
1      2025-01-01T00:00:00  40.649887  -111.865868   1297.30      0.00   
2      2025-01-01T00:00:05  40.649887  -111.865868   1297.30      0.00   
3      2025-01-01T00:00:10  40.649696  -111.865883   1296.70      6.69   
4      2025-01-01T00:00:15  40.649242  -111.865891   1296.00     11.06   
...                    ...        ...          ...       ...       ...   
24995  2025-01-03T16:59:45  40.692993  -111.967369   1303.40      0.00   
24996  2025-01-03T16:59:50  40.692982  -111.967369   1301.50      0.00   
24997  2025-01-03T16:59:55  40.692982  -111.967369   1301.50      0.00   
24998  2025-01-03T17:00:00  40.692982  -111.967369   1301.50      0.00   
24999  2025-01-03T17:00:05  40.692982  -111.967369   1301.50      0.00   

      GPS_Direction GPS_RMC_Valid Battery_Voltage Bus_Box_Temperature  \
0               deg        binary     

In [27]:
# Define the payload data for the Kafka topic registration
kafka_stream_metadata = {
  "dataset_name": "kafka_bus13_2025_01-csv",
  "dataset_title": "Kafka Kafka Sensor Data – BUS13 2025 01",
  "owner_org": org_name,
  "kafka_topic": "bus13_2025_01-csv",
  "kafka_host": KAFKA_HOST,
  "kafka_port": KAFKA_PORT
}

# Call the register_kafka_topic method to add the Kafka topic
try:
    response = client.register_kafka_topic(kafka_stream_metadata)
    print("Kafka topic registered successfully with ID:", response["id"])
except ValueError as e:
    print("Failed to register Kafka topic.")
    print(f"{e}.")

Failed to register Kafka topic.
Error creating Kafka dataset: {'error': 'Duplicate Dataset', 'detail': 'A dataset with the given name or URL already exists.'}.


In [None]:
# Create a Kafka stream data
stream = await streaming.create_kafka_stream(
    keywords=["timestamp-example"],
    match_all=True
)

# Retrieve the stream's topic name
topic = stream.data_stream_id
print(f"Stream created: {topic}")

In [None]:
from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': '10.244.2.206:9092'})

metadata = admin.list_topics(timeout=10)
all_topics = list(metadata.topics.keys())
if all_topics:
    print(f"Found {len(all_topics)} topics: {all_topics}")
    fs = admin.delete_topics(all_topics, operation_timeout=30)

# for topic, f in fs.items():
#     try:
#         f.result()  # raises exception if failed
#         print(f"Topic '{topic}' deleted successfully.")
#     except Exception as e:
#         print(f"Failed to delete topic '{topic}': {e}")


In [16]:
result = client.search_datasets([org_name], server=SERVER)
for dataset in result:
    if dataset["owner_org"] == org_name:
        client.delete_resource_by_id(dataset["id"], server=SERVER)