LCORE-177: Implemented streaming_query endpoint#126
Conversation
* Added streaming_query endpoint so that it conforms with road-core streaming_query endpoint.
* Added unit testing for streaming_query endpoint, it is very closely based off of test_query. * test_streaming_query was generated using vibe coding.
|
@TamiTakamiya Looks like @jrobertboos has beaten you to it... but we need to be sure this covers our requirement. Could you please take a look. |
tisnik
left a comment
There was a problem hiding this comment.
please re-use existing functions from query.py (or we can move these function to new module if it make more sense). The approach you choosen seems to be correct. Thank you!
|
Also to nitpick something - core coverage report found not covered lines: (error case handling) |
* Still not passing `black` test. Thats because streaming_query is based off of query so they have similar code.
|
@tisnik @jrobertboos @manstis Though I am aware that same codes are found in the non-streaming |
|
Totally agree @TamiTakamiya I've already raised a GitHub Issue about not creating the client for everything incoming request and, IMO, streaming should definitely use the same I've not looked at |
| if query_request.attachments: | ||
| validate_attachments_metadata(query_request.attachments) | ||
|
|
||
| agent = Agent( |
There was a problem hiding this comment.
Shouldn't this be AsyncAgent instead of Agent?
| input_shields=available_shields if available_shields else [], | ||
| tools=[], | ||
| ) | ||
| session_id = agent.create_session("chat_session") |
There was a problem hiding this comment.
If Agent is replaced with AsyncAgent, class methods like create_session, create_turn, etc. are called asynchronously and probably we need to be add await to each call.
There was a problem hiding this comment.
@jrobertboos I attempted to replace Agent with AsyncAgent on the top of this PR. Would you take a look at this commit?
There was a problem hiding this comment.
@jrobertboos I have added a small change to support AsyncLlamaStackAsLibraryClient.
| conversation_id = retrieve_conversation_id(query_request) | ||
| response = retrieve_response(client, model_id, query_request) | ||
|
|
||
| def response_generator(turn_response: Any) -> Iterator[str]: |
There was a problem hiding this comment.
The RAG agent sample in the llama-stack documentation uses AgentEventLogger, which is acturally EventLogger, which uses TurnStreamEventPrinter. TurnStreamEventPrinter can process different event types including errors. I think we'd better reuse the code found in TurnStreamEventPrinter to implement this response_generator method so that we can print various ouputs from a llama-stack agent.
There was a problem hiding this comment.
I chose to make the response_generator in the way that it was implemented in the llama-stack playground UI component. I haven't used the TurnStreamEventPrinter but I will experiment with it tomorrow :).
There was a problem hiding this comment.
Please look at the video recording attached to ansible/ansible-ai-connect-service#1655
Since it may take a longer time to process a user query that involes tool callings, we want to show some intermediate outputs from LLM's "thinking" process" before showing the final output.
|
@TamiTakamiya It is clear to me that you have in-depth knowledge about streaming support in Perhaps this PR, that I believe was put together rather hastily, should be closed and your PR updated instead? I worry that this PR may also lack support for "Referenced documents". |
| @router.post("/streaming_query") | ||
| async def streaming_query_endpoint_handler( | ||
| _request: Request, | ||
| query_request: QueryRequest, |
There was a problem hiding this comment.
One parameter that was left off when implementing /query was "media_type" [0]. I think we should create a StreamQueryRequest that inherits from QueryRequest and add that parameter.
By default the /streaming_query returns "plain/text" but can also output JSON when media_type is set to "application/json".
There was a problem hiding this comment.
There's a TODO for this already
lightspeed-stack/src/models/requests.py
Line 58 in a26c46b
| ) | ||
|
|
||
| logger = logging.getLogger("app.endpoints.handlers") | ||
| router = APIRouter(tags=["query"]) |
There was a problem hiding this comment.
What do you mean by this ^?
There was a problem hiding this comment.
https://phoenixnap.com/kb/sed-replace
Replace query with streaming_query.
There was a problem hiding this comment.
Yes, replace the tags=["query"] with tags=["streaming_query"]
The tags should match the endpoint
| { | ||
| "event": "end", | ||
| "data": { | ||
| "referenced_documents": None, # TODO(jboos): implement referenced documents |
There was a problem hiding this comment.
by default should be an empty list
* Switched to EventLogger in order to capture more output.
My PR is outdated and requires some rework. Since @jrobertboos is actively working on this PR, I will wait until this is merged and add referenced documents support on the top of it. For reusing LlamaStackClient, I think we can have a separate PR, but I wish we can include the |
|
@jrobertboos As Ansible Lightspeed chatbot will use streaming mode only, this streaming API support is critical. Could you give us the ETA for this? |
|
@TamiTakamiya Sorry, Ive been really under the weather these last 2 days so I haven't been able to get a lot done, Im planning on finishing this up over the weekend and will hopefully have all the requested changes implemented by Monday or Tuesday. Sorry for the delay. |
* Removed use of EventLogger due to new use of AsyncAgent
* Added tool execution events to output stream. * Separated response generator out event more.
* I need to find a way to automate this!!!
|
Im pretty sure this PR is complete from a content standpoint. I'm not sure what I should do about the 2 checks that are failing.
P.S Sorry for the string of commits 😞 |
| complete_response += json.loads(event.replace("data: ", ""))["data"][ | ||
| "token" | ||
| ] | ||
| yield event |
There was a problem hiding this comment.
I think we need to increment chunk_id by inserting chunk_id += 1 or similar code here.
There was a problem hiding this comment.
Yes, I missed that, thanks! Ill wait to change that until you are done reviewing :).
TamiTakamiya
left a comment
There was a problem hiding this comment.
I want issues found by Pyright and Python linter to be fixed and I also add one minor comment on stream chunk id. Other than these points, changes looked good. Thanks!
Description
Added streaming_query endpoint as well as unit tests. The streaming_query endpoint is closely based off of the query endpoint. The unit tests for streaming_query were generated using vibe coding.
Type of change
Related Tickets & Documents
Checklist before requesting a review
Testing