forked from openai/openai-agents-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_events.py
58 lines (40 loc) · 1.51 KB
/
stream_events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Literal, Union
from typing_extensions import TypeAlias
from .agent import Agent
from .items import RunItem, TResponseStreamEvent
@dataclass
class RawResponsesStreamEvent:
"""Streaming event from the LLM. These are 'raw' events, i.e. they are directly passed through
from the LLM.
"""
data: TResponseStreamEvent
"""The raw responses streaming event from the LLM."""
type: Literal["raw_response_event"] = "raw_response_event"
"""The type of the event."""
@dataclass
class RunItemStreamEvent:
"""Streaming events that wrap a `RunItem`. As the agent processes the LLM response, it will
generate these events for new messages, tool calls, tool outputs, handoffs, etc.
"""
name: Literal[
"message_output_created",
"handoff_requested",
"handoff_occured",
"tool_called",
"tool_output",
"reasoning_item_created",
]
"""The name of the event."""
item: RunItem
"""The item that was created."""
type: Literal["run_item_stream_event"] = "run_item_stream_event"
@dataclass
class AgentUpdatedStreamEvent:
"""Event that notifies that there is a new agent running."""
new_agent: Agent[Any]
"""The new agent."""
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"
StreamEvent: TypeAlias = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
"""A streaming event from an agent."""