Skip to content

Commit df3a46b

Browse files
authored
Merge 19fd31c into b0b33e3
2 parents b0b33e3 + 19fd31c commit df3a46b

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

src/workflows/handler.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any, AsyncGenerator
88

99
from .context import Context
10-
from .errors import WorkflowDone
10+
from .errors import WorkflowDone, WorkflowRuntimeError
1111
from .events import Event, StopEvent
1212
from .types import RunResultT
1313
from .utils import BUSY_WAIT_DELAY
@@ -24,6 +24,7 @@ def __init__(
2424
super().__init__(*args, **kwargs)
2525
self.run_id = run_id
2626
self._ctx = ctx
27+
self._all_events_consumed = False
2728

2829
@property
2930
def ctx(self) -> Context | None:
@@ -36,15 +37,60 @@ def is_done(self) -> bool:
3637
return self.done()
3738

3839
async def stream_events(self) -> AsyncGenerator[Event, None]:
40+
"""
41+
Stream events from the workflow execution as they occur.
42+
43+
This method provides real-time access to events generated during workflow
44+
execution, allowing for monitoring and processing of intermediate results.
45+
Events are yielded in the order they are generated by the workflow.
46+
47+
The stream includes all events written to the context's streaming queue,
48+
and terminates when a StopEvent is encountered, indicating the workflow
49+
has completed.
50+
51+
Returns:
52+
AsyncGenerator[Event, None]: An async generator that yields Event objects
53+
as they are produced by the workflow.
54+
55+
Raises:
56+
ValueError: If the context is not set on the handler.
57+
WorkflowRuntimeError: If all events have already been consumed by a
58+
previous call to stream_events() on the same handler instance.
59+
60+
Examples:
61+
```python
62+
handler = workflow.run()
63+
64+
# Stream and process events in real-time
65+
async for event in handler.stream_events():
66+
if isinstance(event, StopEvent):
67+
print(f"Workflow completed with result: {event.result}")
68+
else:
69+
print(f"Received event: {event}")
70+
71+
# Get final result
72+
result = await handler
73+
```
74+
75+
Note:
76+
Events can only be streamed once per handler instance. Subsequent
77+
calls to stream_events() will raise a WorkflowRuntimeError.
78+
"""
3979
if self.ctx is None:
4080
raise ValueError("Context is not set!")
4181

82+
# Check if we already consumed all the streamed events
83+
if self._all_events_consumed:
84+
msg = "All the streamed events have already been consumed."
85+
raise WorkflowRuntimeError(msg)
86+
4287
while True:
4388
ev = await self.ctx.streaming_queue.get()
4489

4590
yield ev
4691

4792
if isinstance(ev, StopEvent):
93+
self._all_events_consumed = True
4894
break
4995

5096
async def run_step(self) -> list[Event] | None:

tests/test_handler.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest
77

88
from workflows.context import Context
9+
from workflows.errors import WorkflowRuntimeError
910
from workflows.handler import WorkflowHandler
1011

1112

@@ -42,3 +43,18 @@ async def test_run_step_no_stepwise() -> None:
4243
match="Workflow must be created passing stepwise=True to call this method.",
4344
):
4445
await h.run_step()
46+
47+
48+
@pytest.mark.asyncio
49+
async def test_stream_events_consume_only_once() -> None:
50+
ctx = mock.MagicMock(spec=Context)
51+
52+
h = WorkflowHandler(ctx=ctx)
53+
h._all_events_consumed = True
54+
55+
with pytest.raises(
56+
WorkflowRuntimeError,
57+
match="All the streamed events have already been consumed.",
58+
):
59+
async for _ in h.stream_events():
60+
pass

tests/test_streaming.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ async def test_multiple_sequential_streams() -> None:
116116
await r
117117

118118

119+
@pytest.mark.asyncio
120+
async def test_consume_only_once() -> None:
121+
wf = StreamingWorkflow()
122+
handler = wf.run()
123+
124+
async for _ in handler.stream_events():
125+
pass
126+
127+
with pytest.raises(
128+
WorkflowRuntimeError,
129+
match="All the streamed events have already been consumed.",
130+
):
131+
async for _ in handler.stream_events():
132+
pass
133+
134+
await handler
135+
136+
119137
@pytest.mark.asyncio
120138
async def test_multiple_ongoing_streams() -> None:
121139
wf = StreamingWorkflow()

0 commit comments

Comments
 (0)