diff --git a/docs/openapi.json b/docs/openapi.json index 3fb04467..37314307 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -1500,7 +1500,7 @@ "streaming_query" ], "summary": "Streaming Query Endpoint Handler", - "description": "Handle request to the /streaming_query endpoint using Agent API.\n\nThis is a wrapper around streaming_query_endpoint_handler_base that provides\nthe Agent API specific retrieve_response and response generator functions.\n\nReturns:\n StreamingResponse: An HTTP streaming response yielding\n SSE-formatted events for the query lifecycle.\n\nRaises:\n HTTPException: Returns HTTP 500 if unable to connect to the\n Llama Stack server.", + "description": "Handle request to the /streaming_query endpoint using Agent API.\n\nReturns a streaming response using Server-Sent Events (SSE) format with\ncontent type text/event-stream.\n\nReturns:\n StreamingResponse: An HTTP streaming response yielding\n SSE-formatted events for the query lifecycle with content type\n text/event-stream.\n\nRaises:\n HTTPException:\n - 401: Unauthorized - Missing or invalid credentials\n - 403: Forbidden - Insufficient permissions or model override not allowed\n - 404: Not Found - Conversation, model, or provider not found\n - 422: Unprocessable Entity - Request validation failed\n - 429: Too Many Requests - Quota limit exceeded\n - 500: Internal Server Error - Configuration not loaded or other server errors\n - 503: Service Unavailable - Unable to connect to Llama Stack backend", "operationId": "streaming_query_endpoint_handler_v1_streaming_query_post", "requestBody": { "content": { @@ -1514,16 +1514,14 @@ }, "responses": { "200": { - "description": "Streaming response (Server-Sent Events)", + "description": "Successful response", "content": { - "application/json": { - "schema": {} - }, "text/event-stream": { "schema": { - "type": "string" + "type": "string", + "format": "text/event-stream" }, - "example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 0, \"output_tokens\": 0}, \"available_quotas\": {}}\n\n" + "example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"rag_chunks\": [], \"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19, \"available_quotas\": {}}}\n\n" } } }, @@ -3719,7 +3717,7 @@ "streaming_query_v2" ], "summary": "Streaming Query Endpoint Handler V2", - "description": "Handle request to the /streaming_query endpoint using Responses API.\n\nThis is a wrapper around streaming_query_endpoint_handler_base that provides\nthe Responses API specific retrieve_response and response generator functions.\n\nReturns:\n StreamingResponse: An HTTP streaming response yielding\n SSE-formatted events for the query lifecycle.\n\nRaises:\n HTTPException: Returns HTTP 500 if unable to connect to the\n Llama Stack server.", + "description": "Handle request to the /streaming_query endpoint using Responses API.\n\nReturns a streaming response using Server-Sent Events (SSE) format with\ncontent type text/event-stream.\n\nReturns:\n StreamingResponse: An HTTP streaming response yielding\n SSE-formatted events for the query lifecycle with content type\n text/event-stream.\n\nRaises:\n HTTPException:\n - 401: Unauthorized - Missing or invalid credentials\n - 403: Forbidden - Insufficient permissions or model override not allowed\n - 404: Not Found - Conversation, model, or provider not found\n - 422: Unprocessable Entity - Request validation failed\n - 429: Too Many Requests - Quota limit exceeded\n - 500: Internal Server Error - Configuration not loaded or other server errors\n - 503: Service Unavailable - Unable to connect to Llama Stack backend", "operationId": "streaming_query_endpoint_handler_v2_v2_streaming_query_post", "requestBody": { "content": { @@ -3733,19 +3731,14 @@ }, "responses": { "200": { - "description": "Streaming response with Server-Sent Events", + "description": "Successful response", "content": { - "application/json": { - "schema": { - "type": "string", - "example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 0, \"output_tokens\": 0}, \"available_quotas\": {}}\n\n" - } - }, - "text/plain": { + "text/event-stream": { "schema": { "type": "string", - "example": "Hello world!\n\n---\n\nReference: https://example.com/doc" - } + "format": "text/event-stream" + }, + "example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"rag_chunks\": [], \"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19, \"available_quotas\": {}}}\n\n" } } }, diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 1b440a33..98ca5e7a 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -55,6 +55,7 @@ NotFoundResponse, QuotaExceededResponse, ServiceUnavailableResponse, + StreamingQueryResponse, UnauthorizedResponse, UnprocessableEntityResponse, ) @@ -76,22 +77,7 @@ streaming_query_responses: dict[int | str, dict[str, Any]] = { - 200: { - "description": "Streaming response (Server-Sent Events)", - "content": { - "text/event-stream": { - "schema": {"type": "string"}, - "example": ( - 'data: {"event": "start", ' - '"data": {"conversation_id": "123e4567-e89b-12d3-a456-426614174000"}}\n\n' - 'data: {"event": "token", "data": {"id": 0, "token": "Hello"}}\n\n' - 'data: {"event": "end", "data": {"referenced_documents": [], ' - '"truncated": null, "input_tokens": 0, "output_tokens": 0}, ' - '"available_quotas": {}}\n\n' - ), - } - }, - }, + 200: StreamingQueryResponse.openapi_response(), 401: UnauthorizedResponse.openapi_response( examples=["missing header", "missing token"] ), @@ -937,7 +923,11 @@ async def error_generator() -> AsyncGenerator[str, None]: return StreamingResponse(error_generator(), media_type=content_type) -@router.post("/streaming_query", responses=streaming_query_responses) +@router.post( + "/streaming_query", + response_class=StreamingResponse, + responses=streaming_query_responses, +) @authorize(Action.STREAMING_QUERY) async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals,too-many-statements request: Request, @@ -948,16 +938,23 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals,t """ Handle request to the /streaming_query endpoint using Agent API. - This is a wrapper around streaming_query_endpoint_handler_base that provides - the Agent API specific retrieve_response and response generator functions. + Returns a streaming response using Server-Sent Events (SSE) format with + content type text/event-stream. Returns: StreamingResponse: An HTTP streaming response yielding - SSE-formatted events for the query lifecycle. + SSE-formatted events for the query lifecycle with content type + text/event-stream. Raises: - HTTPException: Returns HTTP 500 if unable to connect to the - Llama Stack server. + HTTPException: + - 401: Unauthorized - Missing or invalid credentials + - 403: Forbidden - Insufficient permissions or model override not allowed + - 404: Not Found - Conversation, model, or provider not found + - 422: Unprocessable Entity - Request validation failed + - 429: Too Many Requests - Quota limit exceeded + - 500: Internal Server Error - Configuration not loaded or other server errors + - 503: Service Unavailable - Unable to connect to Llama Stack backend """ return await streaming_query_endpoint_handler_base( request=request, diff --git a/src/app/endpoints/streaming_query_v2.py b/src/app/endpoints/streaming_query_v2.py index f5e8f026..b6f37947 100644 --- a/src/app/endpoints/streaming_query_v2.py +++ b/src/app/endpoints/streaming_query_v2.py @@ -40,6 +40,7 @@ NotFoundResponse, QuotaExceededResponse, ServiceUnavailableResponse, + StreamingQueryResponse, UnauthorizedResponse, UnprocessableEntityResponse, ) @@ -58,30 +59,7 @@ auth_dependency = get_auth_dependency() streaming_query_v2_responses: dict[int | str, dict[str, Any]] = { - 200: { - "description": "Streaming response with Server-Sent Events", - "content": { - "application/json": { - "schema": { - "type": "string", - "example": ( - 'data: {"event": "start", ' - '"data": {"conversation_id": "123e4567-e89b-12d3-a456-426614174000"}}\n\n' - 'data: {"event": "token", "data": {"id": 0, "token": "Hello"}}\n\n' - 'data: {"event": "end", "data": {"referenced_documents": [], ' - '"truncated": null, "input_tokens": 0, "output_tokens": 0}, ' - '"available_quotas": {}}\n\n' - ), - } - }, - "text/plain": { - "schema": { - "type": "string", - "example": "Hello world!\n\n---\n\nReference: https://example.com/doc", - } - }, - }, - }, + 200: StreamingQueryResponse.openapi_response(), 401: UnauthorizedResponse.openapi_response( examples=["missing header", "missing token"] ), @@ -313,7 +291,11 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat return response_generator -@router.post("/streaming_query", responses=streaming_query_v2_responses) +@router.post( + "/streaming_query", + response_class=StreamingResponse, + responses=streaming_query_v2_responses, +) @authorize(Action.STREAMING_QUERY) async def streaming_query_endpoint_handler_v2( # pylint: disable=too-many-locals request: Request, @@ -324,16 +306,23 @@ async def streaming_query_endpoint_handler_v2( # pylint: disable=too-many-local """ Handle request to the /streaming_query endpoint using Responses API. - This is a wrapper around streaming_query_endpoint_handler_base that provides - the Responses API specific retrieve_response and response generator functions. + Returns a streaming response using Server-Sent Events (SSE) format with + content type text/event-stream. Returns: StreamingResponse: An HTTP streaming response yielding - SSE-formatted events for the query lifecycle. + SSE-formatted events for the query lifecycle with content type + text/event-stream. Raises: - HTTPException: Returns HTTP 500 if unable to connect to the - Llama Stack server. + HTTPException: + - 401: Unauthorized - Missing or invalid credentials + - 403: Forbidden - Insufficient permissions or model override not allowed + - 404: Not Found - Conversation, model, or provider not found + - 422: Unprocessable Entity - Request validation failed + - 429: Too Many Requests - Quota limit exceeded + - 500: Internal Server Error - Configuration not loaded or other server errors + - 503: Service Unavailable - Unable to connect to Llama Stack backend """ return await streaming_query_endpoint_handler_base( request=request, diff --git a/src/models/responses.py b/src/models/responses.py index e893d4e7..6ce8fbb1 100644 --- a/src/models/responses.py +++ b/src/models/responses.py @@ -11,6 +11,7 @@ from quota.quota_exceed_error import QuotaExceedError from models.config import Action, Configuration +SUCCESSFUL_RESPONSE_DESCRIPTION = "Successful response" BAD_REQUEST_DESCRIPTION = "Invalid request format" UNAUTHORIZED_DESCRIPTION = "Unauthorized" FORBIDDEN_DESCRIPTION = "Permission denied" @@ -52,7 +53,7 @@ def openapi_response(cls) -> dict[str, Any]: content = {"application/json": {"example": example_value}} return { - "description": "Successful response", + "description": SUCCESSFUL_RESPONSE_DESCRIPTION, "model": cls, "content": content, } @@ -449,6 +450,74 @@ class QueryResponse(AbstractSuccessfulResponse): } +class StreamingQueryResponse(AbstractSuccessfulResponse): + """Documentation-only model for streaming query responses using Server-Sent Events (SSE).""" + + @classmethod + def openapi_response(cls) -> dict[str, Any]: + """Generate FastAPI response dict for SSE streaming with examples. + + Note: This is used for OpenAPI documentation only. The actual endpoint + returns a StreamingResponse object, not this Pydantic model. + """ + schema = cls.model_json_schema() + model_examples = schema.get("examples") + if not model_examples: + raise SchemaError(f"Examples not found in {cls.__name__}") + example_value = model_examples[0] + content = { + "text/event-stream": { + "schema": {"type": "string", "format": "text/event-stream"}, + "example": example_value, + } + } + + return { + "description": SUCCESSFUL_RESPONSE_DESCRIPTION, + "content": content, + # Note: No "model" key since we're not actually serializing this model + } + + model_config = { + "json_schema_extra": { + "examples": [ + ( + 'data: {"event": "start", "data": {' + '"conversation_id": "123e4567-e89b-12d3-a456-426614174000"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 0, "token": "No Violation"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 1, "token": ""}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 2, "token": "Hello"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 3, "token": "!"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 4, "token": " How"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 5, "token": " can"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 6, "token": " I"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 7, "token": " assist"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 8, "token": " you"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 9, "token": " today"}}\n\n' + 'data: {"event": "token", "data": {' + '"id": 10, "token": "?"}}\n\n' + 'data: {"event": "turn_complete", "data": {' + '"token": "Hello! How can I assist you today?"}}\n\n' + 'data: {"event": "end", "data": {' + '"rag_chunks": [], "referenced_documents": [], ' + '"truncated": null, "input_tokens": 11, "output_tokens": 19, ' + '"available_quotas": {}}}\n\n' + ), + ] + } + } + + class InfoResponse(AbstractSuccessfulResponse): """Model representing a response to an info request. @@ -806,7 +875,7 @@ def openapi_response(cls) -> dict[str, Any]: content = {"application/json": {"examples": named_examples or None}} return { - "description": "Successful response", + "description": SUCCESSFUL_RESPONSE_DESCRIPTION, "model": cls, "content": content, } diff --git a/tests/unit/models/responses/test_successful_responses.py b/tests/unit/models/responses/test_successful_responses.py index 470c5d57..80fcba41 100644 --- a/tests/unit/models/responses/test_successful_responses.py +++ b/tests/unit/models/responses/test_successful_responses.py @@ -1,4 +1,4 @@ -# pylint: disable=unsupported-membership-test,unsubscriptable-object +# pylint: disable=unsupported-membership-test,unsubscriptable-object, too-many-lines """Unit tests for all successful response models.""" @@ -39,6 +39,7 @@ ReferencedDocument, ShieldsResponse, StatusResponse, + StreamingQueryResponse, ToolCall, ToolsResponse, ) @@ -956,6 +957,35 @@ def test_openapi_response(self) -> None: assert expected_count == 1 +class TestStreamingQueryResponse: + """Test cases for StreamingQueryResponse.""" + + def test_openapi_response_structure(self) -> None: + """Test that openapi_response() returns correct structure.""" + result = StreamingQueryResponse.openapi_response() + + assert "description" in result + assert "content" in result + assert result["description"] == "Successful response" + assert "model" not in result + + assert "text/event-stream" in result["content"] + content = result["content"]["text/event-stream"] + assert "schema" in content + assert "example" in content + + schema = content["schema"] + assert schema["type"] == "string" + assert schema["format"] == "text/event-stream" + + def test_model_json_schema_has_examples(self) -> None: + """Test that model_json_schema() includes examples.""" + schema = StreamingQueryResponse.model_json_schema() + assert "examples" in schema + assert len(schema["examples"]) == 1 + assert isinstance(schema["examples"][0], str) + + class TestAbstractSuccessfulResponseOpenAPI: """Test cases for AbstractSuccessfulResponse.openapi_response() edge cases."""