Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ def stream_end_event(metadata_map: dict) -> str:
def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterator[str]:
"""Build a streaming event from a chunk response.

This function processes chunks from the Llama Stack streaming response and formats
them into Server-Sent Events (SSE) format for the client. It handles two main
event types:
This function processes chunks from the Llama Stack streaming response and
formats them into Server-Sent Events (SSE) format for the client. It
dispatches on (event_type, step_type):

1. step_progress: Contains text deltas from the model inference process
2. step_complete: Contains information about completed tool execution steps
1. turn_start, turn_awaiting_input -> start token
2. turn_complete -> final output message
3. step_* with step_type in {"shield_call", "inference", "tool_execution"} -> delegated handlers
4. anything else -> heartbeat

Args:
chunk: The streaming chunk from Llama Stack containing event data
Expand All @@ -154,18 +156,24 @@ def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterato
event_type = chunk.event.payload.event_type
step_type = getattr(chunk.event.payload, "step_type", None)

if event_type in {"turn_start", "turn_awaiting_input"}:
yield from _handle_turn_start_event(chunk_id)
elif event_type == "turn_complete":
yield from _handle_turn_complete_event(chunk, chunk_id)
elif step_type == "shield_call":
yield from _handle_shield_event(chunk, chunk_id)
elif step_type == "inference":
yield from _handle_inference_event(chunk, chunk_id)
elif step_type == "tool_execution":
yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map)
else:
yield from _handle_heartbeat_event(chunk_id)
match (event_type, step_type):
case (("turn_start" | "turn_awaiting_input"), _):
yield from _handle_turn_start_event(chunk_id)
case ("turn_complete", _):
yield from _handle_turn_complete_event(chunk, chunk_id)
case (_, "shield_call"):
yield from _handle_shield_event(chunk, chunk_id)
case (_, "inference"):
yield from _handle_inference_event(chunk, chunk_id)
case (_, "tool_execution"):
yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map)
case _:
logger.debug(
"Unhandled event combo: event_type=%s, step_type=%s",
event_type,
step_type,
)
yield from _handle_heartbeat_event(chunk_id)


# -----------------------------------
Expand Down