From 45c10c14f3a7f0b615da75f9e7a74a1c7cde783b Mon Sep 17 00:00:00 2001 From: Pavel Tisnovsky Date: Mon, 1 Sep 2025 10:23:08 +0200 Subject: [PATCH 1/2] Refactoring: table based branching --- src/app/endpoints/streaming_query.py | 30 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 5613dbfd..9f6af297 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -154,18 +154,24 @@ def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterato event_type = chunk.event.payload.event_type step_type = getattr(chunk.event.payload, "step_type", None) - if event_type in {"turn_start", "turn_awaiting_input"}: - yield from _handle_turn_start_event(chunk_id) - elif event_type == "turn_complete": - yield from _handle_turn_complete_event(chunk, chunk_id) - elif step_type == "shield_call": - yield from _handle_shield_event(chunk, chunk_id) - elif step_type == "inference": - yield from _handle_inference_event(chunk, chunk_id) - elif step_type == "tool_execution": - yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map) - else: - yield from _handle_heartbeat_event(chunk_id) + match (event_type, step_type): + case (("turn_start" | "turn_awaiting_input"), _): + yield from _handle_turn_start_event(chunk_id) + case ("turn_complete", _): + yield from _handle_turn_complete_event(chunk, chunk_id) + case (_, "shield_call"): + yield from _handle_shield_event(chunk, chunk_id) + case (_, "inference"): + yield from _handle_inference_event(chunk, chunk_id) + case (_, "tool_execution"): + yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map) + case _: + logger.debug( + "Unhandled event combo: event_type=%s, step_type=%s", + event_type, + step_type, + ) + yield from _handle_heartbeat_event(chunk_id) # ----------------------------------- From facf446e2d31db1d3f4357640557f92ccb9a4c5c Mon Sep 17 00:00:00 2001 From: Pavel Tisnovsky Date: Mon, 1 Sep 2025 10:39:02 +0200 Subject: [PATCH 2/2] Updated docstring --- src/app/endpoints/streaming_query.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 9f6af297..4aafc6f9 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -134,12 +134,14 @@ def stream_end_event(metadata_map: dict) -> str: def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterator[str]: """Build a streaming event from a chunk response. - This function processes chunks from the Llama Stack streaming response and formats - them into Server-Sent Events (SSE) format for the client. It handles two main - event types: - - 1. step_progress: Contains text deltas from the model inference process - 2. step_complete: Contains information about completed tool execution steps + This function processes chunks from the Llama Stack streaming response and + formats them into Server-Sent Events (SSE) format for the client. It + dispatches on (event_type, step_type): + + 1. turn_start, turn_awaiting_input -> start token + 2. turn_complete -> final output message + 3. step_* with step_type in {"shield_call", "inference", "tool_execution"} -> delegated handlers + 4. anything else -> heartbeat Args: chunk: The streaming chunk from Llama Stack containing event data