New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(streaming): initial implementation of streaming agentservice execution #554
feat(streaming): initial implementation of streaming agentservice execution #554
Conversation
12b0933
to
832eb1f
Compare
from steamship.invocable import PackageService, post | ||
|
||
|
||
class StreamingResponse(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@douglas-reid if you extend BaseModel
instead of CamelModel
then the serialization is underscored rather than the camelcase that we use for engine comms.
It sneaks through in the case since it's internal to an object's response, but it breaks the parsing of the task in the Typescript client.
Would it be possible to add a test that, e.g., response.task.requestId
is serialized as requestId
instead of request_id
just to make sure that the clients demarshalling it won't explode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
832eb1f
to
f9c4b17
Compare
925d586
to
0b65f6c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few non-blocking thoughts in there, but this looks fantastic. Excited for it to be merged!!
|
||
assert num_blocks > 0, "Blocks should have been streamed during execution" | ||
assert llm_prompt_event_count == 2, ( | ||
"At least 2 llm prompts should have happened (first for tool selection, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error messages are awesome -- thank you for adding them
|
||
# if you can't find a consistent context_id, then something has gone wrong, preventing streaming | ||
if not ctx_id: | ||
# TODO(dougreid): this points to a slight flaw in the context_keys vs. context_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the situation where in practice we've just been using context_id
whereas in theory we're hashing the whole context_keys
object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. we have a mismatch in our exposed args in /prompt
vs. our actual capability. not sure if that matter practically yet.
self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs | ||
) -> StreamingResponse: | ||
ctx_id, history_file = self._streaming_context_id_and_file(context_id=context_id, **kwargs) | ||
task = self.invoke_later( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in a V2 we'll want to follow up and see if we can have the things that are later-invoked auto-block this task from completing (thinking about the comment string about the semantics of the Task below.. we could maybe try to make conformance to that automatic)
This PR presents a series of changes that should support a way to stream response information back to a client via an AgentService. In order to achieve the streaming result, a new method on the AgentService is exposed: `async_prompt`. This new method returns a new `StreamingResponse` that has two fields: `task` and `file`. These fields provide access to (a) the async task that will be streaming results and (b) a file (here the `ChatHistory` file) to which all status messages and assistant interactions will be saved. This PR relies on a full deployment of steamship-plugins/gpt4#10 to the target environment for testing / validation.
0421c4c
to
a66d6c7
Compare
a66d6c7
to
ad653c6
Compare
This PR establishes an initial implementation of streaming utilities for
AgentService
endpoints, based onFunctionsBasedAgent
runs. This will allow clients to invoke an agent run through an async endpoint and stream, via SSE, block creation events related to the agent's execution. These events can include status messages (from Agents and Tools) as well as generated blocks (from LLMs and Tools).With this code, new async endpoints can be exposed with code like the following:
The PR ensures all
Block
s created in the service of arun_agent
call are tagged with the properrequest-id
, and that all calls to the LLM are streaming-compatible.A test is provided that demonstrates an approach to consuming a stream of events, based on the sseclient-py library. A Generator is constructed that emits
Block
s until a terminal block for a request is found in the stream.