# AIOS Streaming Facility Tutorial

This notebook demonstrates how to use the AIOS (Artificial Intelligence Operating System) streaming functionality. Streaming allows real-time communication with LLM models, enabling interactive chat experiences and real-time response generation.

## Overview

The AIOS streaming facility provides:
- Real-time WebSocket-based communication with LLM models
- Session-based streaming for maintaining conversation context
- Support for both programmatic access and UI-based interaction
- Asynchronous message handling for responsive user experiences

## Prerequisites

- AIOS cluster running and accessible
- Block(s) deployed and running (e.g., gemma3-27b-block2)
- Network access to AIOS management API and streaming endpoints
- Required Python packages for WebSocket communication

In [1]:
# Install required packages
!pip install requests websockets nest_asyncio streamlit pyngrok > /dev/null

import requests
import json
import time
import asyncio
import websockets
import uuid
import nest_asyncio

# Allow nested event loops (required for Jupyter notebooks)
nest_asyncio.apply()

print("✅ Required packages installed and configured")

✅ Required packages installed and configured


## Configuration

Set up the basic configuration parameters for your AIOS environment:

In [10]:
# AIOS Configuration
AIOS_MGMT_API = "http://MANAGEMENTMASTER:30501/api/executeMgmtCommand"
INFERENCE_API = "http://CLUSTER1MASTER:31504/v1/infer"

# Block Configuration - Change this to your deployed block
BLOCK_ID = "magistral-small-2506-llama-cpp-block"  # Update this to match your block
BLOCK_ID = "llama4-scout-17b-block"  # Alternative example

# Session Configuration
SESSION_ID = f"session-{uuid.uuid4()}"  # Generate unique session ID
print(f"Using Block ID: {BLOCK_ID}")
print(f"Generated Session ID: {SESSION_ID}")

Using Block ID: llama4-scout-17b-block
Generated Session ID: session-730377a0-a94c-48f6-92a2-51b0f9418e51


# Step 1: Enable Streaming

The first step is to enable streaming on the target block. This configures the block to accept WebSocket connections for real-time communication.

## Parameters Explained:
- **blockId**: The identifier of the deployed block you want to enable streaming for
- **service**: Always "executor" for streaming operations
- **mgmtCommand**: "enable_streaming" to activate streaming capability
- **mgmtData**: Empty object for this command

In [11]:
def enable_streaming(block_id):
    """
    Enable streaming functionality for a specific block.
    
    Args:
        block_id (str): The ID of the block to enable streaming for
        
    Returns:
        dict: Response from the API call
    """
    payload = {
        "header": {
            "templateUri": "Parser/V1",
            "parameters": {}
        },
        "body": {
            "spec": {
                "values": {
                    "blockId": block_id,
                    "service": "executor",
                    "mgmtCommand": "enable_streaming",
                    "mgmtData": {}
                }
            }
        }
    }
    
    try:
        print(f"🔄 Enabling streaming for block: {block_id}")
        response = requests.post(
            AIOS_MGMT_API, 
            headers={"Content-Type": "application/json"}, 
            json=payload,
            timeout=30
        )
        
        response.raise_for_status()
        result = response.json()
        print(f"✅ Streaming enabled successfully!")
        print(f"📄 Response: {json.dumps(result, indent=2)}")
        return result
        
    except requests.exceptions.RequestException as e:
        print(f"❌ Error enabling streaming: {e}")
        if hasattr(e.response, 'text'):
            print(f"📄 Error details: {e.response.text}")
        return None

# Enable streaming for the configured block
enable_result = enable_streaming(BLOCK_ID)

🔄 Enabling streaming for block: llama4-scout-17b-block
✅ Streaming enabled successfully!
📄 Response: {
  "result": {
    "message": "streaming enabled",
    "success": true
  },
  "success": true,
  "task_id": ""
}


# Step 2: Get Streaming URL

After enabling streaming, we need to obtain the WebSocket URL for the specific session. This URL will be used to establish the WebSocket connection for real-time communication.

## Parameters Explained:
- **blockId**: The same block ID used in Step 1
- **service**: Always "executor" for streaming operations  
- **mgmtCommand**: "get_streaming_url" to retrieve the WebSocket endpoint
- **mgmtData**: Contains the session_id for this streaming session

In [12]:
def get_streaming_url(block_id, session_id):
    """
    Get the WebSocket URL for streaming communication.
    
    Args:
        block_id (str): The ID of the block
        session_id (str): Unique session identifier
        
    Returns:
        str: WebSocket URL if successful, None otherwise
    """
    payload = {
        "header": {
            "templateUri": "Parser/V1",
            "parameters": {}
        },
        "body": {
            "spec": {
                "values": {
                    "blockId": block_id,
                    "service": "executor", 
                    "mgmtCommand": "get_streaming_url",
                    "mgmtData": {
                        "session_id": session_id
                    }
                }
            }
        }
    }
    
    try:
        print(f"🔄 Getting streaming URL for session: {session_id}")
        response = requests.post(
            AIOS_MGMT_API,
            headers={"Content-Type": "application/json"},
            json=payload,
            timeout=30
        )
        
        response.raise_for_status()
        result = response.json()
        print(result)
        # Extract WebSocket URL from response
        websocket_url = None
        if 'result' in result and 'url' in result['result']:
            websocket_url = result['result']['url']
        
        if websocket_url:
            print(f"✅ WebSocket URL obtained: {websocket_url}")
            return websocket_url
        else:
            print(f"⚠️  WebSocket URL not found in response")
            print(f"📄 Full response: {json.dumps(result, indent=2)}")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"❌ Error getting streaming URL: {e}")
        if hasattr(e.response, 'text'):
            print(f"📄 Error details: {e.response.text}")
        return None

# Get the streaming URL
websocket_url = get_streaming_url(BLOCK_ID, SESSION_ID)

🔄 Getting streaming URL for session: session-730377a0-a94c-48f6-92a2-51b0f9418e51
{'result': {'success': True, 'url': 'ws://CLUSTER1MASTER:30233'}, 'success': True, 'task_id': ''}
✅ WebSocket URL obtained: ws://CLUSTER1MASTER:30233


# Step 3: WebSocket Connection Test (Async)

This step establishes a WebSocket connection to test the streaming functionality. This is an asynchronous operation that connects to the WebSocket endpoint and listens for messages.

## WebSocket Protocol:
- **Connection**: Establish WebSocket connection using the URL from Step 2
- **Authentication**: Send session connection message with session_id
- **Listening**: Continuously listen for streaming responses
- **Error Handling**: Handle connection errors and timeouts gracefully

## Note
Keep AIOS_Streaming_Inference.ipynb open in another tab,w in the same path as this notebook to perform an inference request, 
Run the below cell in this notebook,copy the session_id and use it in the AIOS_Streaming_Inference.ipynb

In [None]:
async def test_websocket_connection(websocket_url, session_id, duration=60):
    """
    Test WebSocket connection and listen for messages.
    
    Args:
        websocket_url (str): WebSocket URL from Step 2
        session_id (str): Session identifier
        duration (int): How long to listen for messages (seconds)
    """
    if not websocket_url:
        print("❌ No WebSocket URL available. Please complete Step 2 first.")
        return
    
    try:
        print(f"🔌 Connecting to WebSocket: {websocket_url}")
        
        async with websockets.connect(
            websocket_url,
            ping_interval=60,
            ping_timeout=120,
            close_timeout=120
        ) as websocket:
            print("✅ Connected to WebSocket server!")
            
            # Send connection message
            connection_message = {
                "session_id": session_id,
                "connect": True
            }
            
            await websocket.send(json.dumps(connection_message))
            print(f"📤 Sent connection message: {connection_message}")
            
            # Listen for messages for specified duration
            print(f"👂 Listening for messages for {duration} seconds... (Run the inference notebook now)")
            
            start_time = time.time()
            message_count = 0
            
            try:
                while time.time() - start_time < duration:
                    try:
                        # Wait for message with timeout
                        message = await asyncio.wait_for(websocket.recv(), timeout=2.0)
                        message_count += 1
                        print(f"📥 Message {message_count}: {message}")
                        
                    except asyncio.TimeoutError:
                        # No message received within timeout, continue listening
                        continue
                        
            except Exception as e:
                print(f"⚠️  Error during message listening: {e}")
            
            print(f"✅ WebSocket test completed. Received {message_count} messages.")
            
    except Exception as e:
        print(f"❌ WebSocket connection error: {e}")

# Test the WebSocket connection
if websocket_url:
    await test_websocket_connection(websocket_url, SESSION_ID, duration=60)
else:
    print("⚠️  Skipping WebSocket test - no URL available")

🔌 Connecting to WebSocket: ws://CLUSTER1MASTER:30233
✅ Connected to WebSocket server!
📤 Sent connection message: {'session_id': 'session-730377a0-a94c-48f6-92a2-51b0f9418e51', 'connect': True}
👂 Listening for messages for 60 seconds... (Run the inference notebook now)
📥 Message 1: {"delta": "**"}
📥 Message 2: {"delta": "Machine"}
📥 Message 3: {"delta": " Learning"}
📥 Message 4: {"delta": ":"}
📥 Message 5: {"delta": " A"}
📥 Message 6: {"delta": " Simple"}
📥 Message 7: {"delta": " Explanation"}
📥 Message 8: {"delta": "**\n"}
📥 Message 10: {"delta": "===="}
📥 Message 11: {"delta": "=\n\n"}
📥 Message 12: {"delta": "Machine"}
📥 Message 13: {"delta": " learning"}
📥 Message 14: {"delta": " is"}
📥 Message 15: {"delta": " a"}
📥 Message 16: {"delta": " type"}
📥 Message 17: {"delta": " of"}
📥 Message 18: {"delta": " artificial"}
📥 Message 19: {"delta": " intelligence"}
📥 Message 20: {"delta": " that"}
📥 Message 21: {"delta": " enables"}
📥 Message 22: {"delta": " computers"}
📥 Message 23: {"delta"

## Note
Use AIOS_Streaming_Inference.ipynb open in another tab in the same path as this notebook to perform an inference request, 
Run the cells in it after running the above cell

# Inferencing with UI

This section provides the code to launch a Streamlit application for a user-friendly, interactive chat experience with the streaming model.

In [9]:
!pip install streamlit pyngrok nest_asyncio websockets > /null

import sys
import os

# Add the parent directory of 'utils' to the path to find the inference_client
sys.path.append(os.path.abspath('../..'))
sys.path.append(os.path.abspath('../utils/streamlit_app'))


from utils import run_streamlit_direct

# Define the block_id and grpc_server_address
BLOCK_ID = "magistral-small-2506-llama-cpp-block"
GRPC_SERVER_ADDRESS = "CLUSTER1MASTER:31500"
streamlit_url = run_streamlit_direct(BLOCK_ID, GRPC_SERVER_ADDRESS)
print(f"Streamlit App URL: {streamlit_url}")

/bin/bash: line 1: /null: Permission denied
🚀 Starting Streamlit with command:
streamlit run /home/srikanth_g_cognitif_ai/files_from_local/documentation/video_tutorial_series/utils/streamlit_app/app_single_model_orig.py -- --block_id magistral-small-2506-llama-cpp-block --grpc_server_address CLUSTER1MASTER:31500
✅ Streamlit started successfully!
🌐 Public URL: http://CLUSTER2NODE1:8501
🔗 Local URL: http://localhost:8501
📊 Block ID: magistral-small-2506-llama-cpp-block
🔌 gRPC Server: CLUSTER1MASTER:31500
Streamlit App URL: http://CLUSTER2NODE1:8501


# How Streaming is Enabled in a Block

This section provides a brief overview of how streaming is enabled within the AIOS block architecture, using the `main.py` from the `sample_codes_for_block_integration` as a reference. Understanding this will help you integrate streaming into your own custom blocks.

### Key Components in `aios_instance/main.py`

The core of the streaming functionality is managed by the `Block` class in `aios_instance/main.py`. Here are the key components involved:

1.  **`WebsocketStreamingManager`**: This class is responsible for managing the WebSocket server and handling incoming connections. It is initialized in the `Block` class's `__init__` method:

    ```python
    # In aios_instance/main.py Block.__init__
    self.ws_server = WebsocketStreamingManager(self.listen_for_jobs_now)
    ```

2.  **`context.write_ws`**: The `Context` object, which is passed to your block's implementation, is equipped with a `write_ws` method. This method is a direct line to the WebSocket manager, allowing your block to send data to a connected client.

    ```python
    # In aios_instance/main.py Block.__init__
    self.context.write_ws = self.ws_server.write_data
    ```

3.  **`is_ws` Flag**: When a job is received, the `listen_for_jobs_now` method checks if it is a WebSocket request and passes an `is_ws=True` flag to your block's `on_data` method. This allows you to differentiate between regular and streaming requests.

    ```python
    # In aios_instance/main.py Block.listen_for_jobs_now
    ret, on_data_result = self.block_module.on_data(entry, is_ws=is_ws)
    ```

### Your Block's Responsibility

Your block's `on_data` method should check for the `is_ws` flag. If it is `True`, you should:

1.  Call your model's streaming inference method.
2.  Pass the `context` object to your inference method so it can call `context.write_ws`.
3.  Ensure that your inference code iterates through the model's response chunks and calls `context.write_ws` for each chunk.

By leveraging these components, you can easily add powerful, real-time streaming capabilities to your custom AIOS blocks.