Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions docs/docs/tutorials/streaming/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ print("Final output: ", program_output)

### Understand `StreamResponse`

`StreamResponse` (`dspy.streaming.StreamResponse`) is the wrapper class of streaming tokens. It comes with 3
`StreamResponse` (`dspy.streaming.StreamResponse`) is the wrapper class of streaming tokens. It comes with 4
fields:

- `predict_name`: the name of the predict that holds the `signature_field_name`. The name is the
Expand All @@ -108,7 +108,50 @@ fields:
- `signature_field_name`: the output field that these tokens map to. `predict_name` and `signature_field_name`
together form the unique identifier of the field. We will demonstrate how to handle multiple fields streaming
and duplicated field name later in this guide.
- `chunk`: the value of the stream chunk.
- `chunk`: the value of the stream chunk. This is usually a non-empty string containing one or more tokens,
but the final chunk for each field will have `chunk=""` (empty string).
- `is_last_chunk`: a boolean flag indicating whether this is the final chunk for this field. When `True`,
the streaming for this particular field has completed. Each `StreamListener` guarantees exactly one chunk
with `is_last_chunk=True` per streaming session.

#### Understanding `is_last_chunk`

The `is_last_chunk` field is particularly important for knowing when a field's streaming has completed:

```python
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

async for chunk in output_stream:
if isinstance(chunk, dspy.streaming.StreamResponse):
if chunk.is_last_chunk:
print(f"✓ Finished streaming {chunk.signature_field_name}")
else:
print(f"Chunk: {chunk.chunk}")
elif isinstance(chunk, dspy.Prediction):
print(f"Final output: {chunk}")
```

**Important behaviors to note:**

1. **Final chunk is empty**: The last `StreamResponse` for each field always has `chunk=""` (empty string)
and `is_last_chunk=True`. This empty chunk serves as an end-of-stream marker.

2. **Exactly one per field**: Each `StreamListener` emits exactly one chunk with `is_last_chunk=True`, making
it reliable for detecting completion.

3. **With `allow_reuse=True`**: When a listener is reused for multiple predictions, each prediction gets its
own final chunk with `is_last_chunk=True`.

Example output showing the final empty chunk:

```
StreamResponse(predict_name='self', signature_field_name='answer', chunk='To', is_last_chunk=False)
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get to', is_last_chunk=False)
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the other side!', is_last_chunk=False)
StreamResponse(predict_name='self', signature_field_name='answer', chunk='', is_last_chunk=True)
Prediction(answer='To get to the other side!')
```

### Streaming with Cache

Expand Down Expand Up @@ -461,7 +504,6 @@ Final output: Prediction(
By default calling a streamified DSPy program produces an async generator. In order to get back
a sync generator, you can set the flag `async_streaming=False`:


```python
import os

Expand Down
11 changes: 11 additions & 0 deletions dspy/streaming/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@

@dataclass
class StreamResponse:
"""Response object containing a chunk of streamed output from a field.

Attributes:
predict_name: Name of the predictor module producing this output.
signature_field_name: Name of the output field being streamed.
chunk: The content chunk (string). The final chunk for each field is always empty ("").
is_last_chunk: Boolean indicating if this is the final chunk for this field.
Each StreamListener emits exactly one chunk with is_last_chunk=True per streaming session.
The final chunk always has chunk="" (empty string) and is_last_chunk=True.
"""

predict_name: str
signature_field_name: str
chunk: str
Expand Down
22 changes: 20 additions & 2 deletions dspy/streaming/streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,25 @@


class StreamListener:
"""Class that listens to the stream to capture the streeaming of a specific output field of a predictor."""
"""Listener that captures streaming output from a specific field of a predictor.
The listener monitors the token stream and emits StreamResponse objects for the specified field.
Each streaming session is guaranteed to emit exactly one chunk with is_last_chunk=True, which
will have chunk="" (empty string) to signal the end of streaming for that field.
Example:
```python
listener = dspy.streaming.StreamListener(signature_field_name="answer")
program = dspy.streamify(my_program, stream_listeners=[listener])
async for chunk in program(question="test"):
if isinstance(chunk, dspy.streaming.StreamResponse):
if chunk.is_last_chunk:
print("Streaming complete for", chunk.signature_field_name)
else:
print("Content:", chunk.chunk)
```
"""

def __init__(
self,
Expand Down Expand Up @@ -292,7 +310,7 @@ def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> Strea
token = token + last_token if token else last_token
token = token.rstrip() # Remove the trailing \n\n

if token:
if token or self.stream_end:
return StreamResponse(
self.predict_name,
self.signature_field_name,
Expand Down
20 changes: 17 additions & 3 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,27 @@ async def gemini_stream_2(*args, **kwargs):
assert all_chunks[0].predict_name == "predict1"
assert all_chunks[0].signature_field_name == "answer"
assert all_chunks[0].chunk == "To get to the other side."
assert all_chunks[0].is_last_chunk is False

assert all_chunks[1].predict_name == "predict2"
assert all_chunks[1].signature_field_name == "judgement"
assert all_chunks[1].chunk == (
# Empty final chunk from predict1 with is_last_chunk=True
assert all_chunks[1].predict_name == "predict1"
assert all_chunks[1].signature_field_name == "answer"
assert all_chunks[1].chunk == ""
assert all_chunks[1].is_last_chunk is True

assert all_chunks[2].predict_name == "predict2"
assert all_chunks[2].signature_field_name == "judgement"
assert all_chunks[2].chunk == (
"The answer provides the standard punchline for this classic joke format, adapted to the specific location "
"mentioned in the question. It is the expected and appropriate response."
)
assert all_chunks[2].is_last_chunk is False

# Empty final chunk from predict2 with is_last_chunk=True
assert all_chunks[3].predict_name == "predict2"
assert all_chunks[3].signature_field_name == "judgement"
assert all_chunks[3].chunk == ""
assert all_chunks[3].is_last_chunk is True


@pytest.mark.anyio
Expand Down
Loading