diff --git a/docs/docs/tutorials/streaming/index.md b/docs/docs/tutorials/streaming/index.md index 8c740dacb6..85c98a5e46 100644 --- a/docs/docs/tutorials/streaming/index.md +++ b/docs/docs/tutorials/streaming/index.md @@ -98,7 +98,7 @@ print("Final output: ", program_output) ### Understand `StreamResponse` -`StreamResponse` (`dspy.streaming.StreamResponse`) is the wrapper class of streaming tokens. It comes with 3 +`StreamResponse` (`dspy.streaming.StreamResponse`) is the wrapper class of streaming tokens. It comes with 4 fields: - `predict_name`: the name of the predict that holds the `signature_field_name`. The name is the @@ -108,7 +108,50 @@ fields: - `signature_field_name`: the output field that these tokens map to. `predict_name` and `signature_field_name` together form the unique identifier of the field. We will demonstrate how to handle multiple fields streaming and duplicated field name later in this guide. -- `chunk`: the value of the stream chunk. +- `chunk`: the value of the stream chunk. This is usually a non-empty string containing one or more tokens, + but the final chunk for each field will have `chunk=""` (empty string). +- `is_last_chunk`: a boolean flag indicating whether this is the final chunk for this field. When `True`, + the streaming for this particular field has completed. Each `StreamListener` guarantees exactly one chunk + with `is_last_chunk=True` per streaming session. + +#### Understanding `is_last_chunk` + +The `is_last_chunk` field is particularly important for knowing when a field's streaming has completed: + +```python +async def read_output_stream(): + output_stream = stream_predict(question="Why did a chicken cross the kitchen?") + + async for chunk in output_stream: + if isinstance(chunk, dspy.streaming.StreamResponse): + if chunk.is_last_chunk: + print(f"✓ Finished streaming {chunk.signature_field_name}") + else: + print(f"Chunk: {chunk.chunk}") + elif isinstance(chunk, dspy.Prediction): + print(f"Final output: {chunk}") +``` + +**Important behaviors to note:** + +1. **Final chunk is empty**: The last `StreamResponse` for each field always has `chunk=""` (empty string) + and `is_last_chunk=True`. This empty chunk serves as an end-of-stream marker. + +2. **Exactly one per field**: Each `StreamListener` emits exactly one chunk with `is_last_chunk=True`, making + it reliable for detecting completion. + +3. **With `allow_reuse=True`**: When a listener is reused for multiple predictions, each prediction gets its + own final chunk with `is_last_chunk=True`. + +Example output showing the final empty chunk: + +``` +StreamResponse(predict_name='self', signature_field_name='answer', chunk='To', is_last_chunk=False) +StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get to', is_last_chunk=False) +StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the other side!', is_last_chunk=False) +StreamResponse(predict_name='self', signature_field_name='answer', chunk='', is_last_chunk=True) +Prediction(answer='To get to the other side!') +``` ### Streaming with Cache @@ -461,7 +504,6 @@ Final output: Prediction( By default calling a streamified DSPy program produces an async generator. In order to get back a sync generator, you can set the flag `async_streaming=False`: - ```python import os diff --git a/dspy/streaming/messages.py b/dspy/streaming/messages.py index 4170a0a496..d8709665a6 100644 --- a/dspy/streaming/messages.py +++ b/dspy/streaming/messages.py @@ -11,6 +11,17 @@ @dataclass class StreamResponse: + """Response object containing a chunk of streamed output from a field. + + Attributes: + predict_name: Name of the predictor module producing this output. + signature_field_name: Name of the output field being streamed. + chunk: The content chunk (string). The final chunk for each field is always empty (""). + is_last_chunk: Boolean indicating if this is the final chunk for this field. + Each StreamListener emits exactly one chunk with is_last_chunk=True per streaming session. + The final chunk always has chunk="" (empty string) and is_last_chunk=True. + """ + predict_name: str signature_field_name: str chunk: str diff --git a/dspy/streaming/streaming_listener.py b/dspy/streaming/streaming_listener.py index 1ea93c2fd8..27458df68f 100644 --- a/dspy/streaming/streaming_listener.py +++ b/dspy/streaming/streaming_listener.py @@ -20,7 +20,25 @@ class StreamListener: - """Class that listens to the stream to capture the streeaming of a specific output field of a predictor.""" + """Listener that captures streaming output from a specific field of a predictor. + + The listener monitors the token stream and emits StreamResponse objects for the specified field. + Each streaming session is guaranteed to emit exactly one chunk with is_last_chunk=True, which + will have chunk="" (empty string) to signal the end of streaming for that field. + + Example: + ```python + listener = dspy.streaming.StreamListener(signature_field_name="answer") + program = dspy.streamify(my_program, stream_listeners=[listener]) + + async for chunk in program(question="test"): + if isinstance(chunk, dspy.streaming.StreamResponse): + if chunk.is_last_chunk: + print("Streaming complete for", chunk.signature_field_name) + else: + print("Content:", chunk.chunk) + ``` + """ def __init__( self, @@ -292,7 +310,7 @@ def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> Strea token = token + last_token if token else last_token token = token.rstrip() # Remove the trailing \n\n - if token: + if token or self.stream_end: return StreamResponse( self.predict_name, self.signature_field_name, diff --git a/tests/streaming/test_streaming.py b/tests/streaming/test_streaming.py index ae5ea57843..769fedf222 100644 --- a/tests/streaming/test_streaming.py +++ b/tests/streaming/test_streaming.py @@ -613,13 +613,27 @@ async def gemini_stream_2(*args, **kwargs): assert all_chunks[0].predict_name == "predict1" assert all_chunks[0].signature_field_name == "answer" assert all_chunks[0].chunk == "To get to the other side." + assert all_chunks[0].is_last_chunk is False - assert all_chunks[1].predict_name == "predict2" - assert all_chunks[1].signature_field_name == "judgement" - assert all_chunks[1].chunk == ( + # Empty final chunk from predict1 with is_last_chunk=True + assert all_chunks[1].predict_name == "predict1" + assert all_chunks[1].signature_field_name == "answer" + assert all_chunks[1].chunk == "" + assert all_chunks[1].is_last_chunk is True + + assert all_chunks[2].predict_name == "predict2" + assert all_chunks[2].signature_field_name == "judgement" + assert all_chunks[2].chunk == ( "The answer provides the standard punchline for this classic joke format, adapted to the specific location " "mentioned in the question. It is the expected and appropriate response." ) + assert all_chunks[2].is_last_chunk is False + + # Empty final chunk from predict2 with is_last_chunk=True + assert all_chunks[3].predict_name == "predict2" + assert all_chunks[3].signature_field_name == "judgement" + assert all_chunks[3].chunk == "" + assert all_chunks[3].is_last_chunk is True @pytest.mark.anyio diff --git a/tests/streaming/test_streaming_is_last_chunk.py b/tests/streaming/test_streaming_is_last_chunk.py new file mode 100644 index 0000000000..16cd6d0323 --- /dev/null +++ b/tests/streaming/test_streaming_is_last_chunk.py @@ -0,0 +1,393 @@ +"""Comprehensive tests for is_last_chunk behavior across all adapters. + +These tests ensure that: +1. Every stream listener always yields at least one chunk with is_last_chunk=True +2. is_last_chunk=False for all non-final chunks +3. is_last_chunk=True only appears on the final chunk for each listener +4. This behavior is consistent across ChatAdapter, JSONAdapter, and XMLAdapter +5. This behavior works with both complete and incomplete completion markers +""" + +from unittest import mock +from unittest.mock import AsyncMock + +import pytest +from litellm import ModelResponseStream +from litellm.types.utils import Delta, StreamingChoices + +import dspy + + +@pytest.mark.anyio +async def test_is_last_chunk_always_present_chat_adapter(): + """Test that ChatAdapter always yields a final chunk with is_last_chunk=True.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def stream_with_completion_marker(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Hello"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=" world"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[stream_with_completion_marker()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Verify we have at least one chunk + assert len(all_chunks) > 0 + + # Verify exactly one chunk has is_last_chunk=True and it's the last one + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + assert last_chunk_count == 1, f"Expected exactly 1 chunk with is_last_chunk=True, got {last_chunk_count}" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True" + + # Verify all non-last chunks have is_last_chunk=False + for chunk in all_chunks[:-1]: + assert chunk.is_last_chunk is False, f"Non-final chunk should have is_last_chunk=False: {chunk}" + + +@pytest.mark.anyio +async def test_is_last_chunk_always_present_json_adapter(): + """Test that JSONAdapter always yields a final chunk with is_last_chunk=True.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def json_stream(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content='{"answer": "'))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Hello"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=" world"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content='"}'))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[json_stream()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.JSONAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Verify we have at least one chunk + assert len(all_chunks) > 0 + + # Verify exactly one chunk has is_last_chunk=True and it's the last one + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + assert last_chunk_count == 1, f"Expected exactly 1 chunk with is_last_chunk=True, got {last_chunk_count}" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True" + + # Verify all non-last chunks have is_last_chunk=False + for chunk in all_chunks[:-1]: + assert chunk.is_last_chunk is False, f"Non-final chunk should have is_last_chunk=False: {chunk}" + + +@pytest.mark.anyio +async def test_is_last_chunk_always_present_xml_adapter(): + """Test that XMLAdapter always yields a final chunk with is_last_chunk=True.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def xml_stream(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=""))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Hello"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=" world"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=""))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[xml_stream()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.XMLAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Verify we have at least one chunk + assert len(all_chunks) > 0 + + # Verify exactly one chunk has is_last_chunk=True and it's the last one + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + assert last_chunk_count == 1, f"Expected exactly 1 chunk with is_last_chunk=True, got {last_chunk_count}" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True" + + # Verify all non-last chunks have is_last_chunk=False + for chunk in all_chunks[:-1]: + assert chunk.is_last_chunk is False, f"Non-final chunk should have is_last_chunk=False: {chunk}" + + +@pytest.mark.anyio +async def test_is_last_chunk_multiple_listeners_all_get_final_chunk(): + """Test that with multiple listeners, each gets its own final chunk with is_last_chunk=True.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict1 = dspy.Predict("question->answer") + self.predict2 = dspy.Predict("question,answer->judgement") + + def forward(self, question, **kwargs): + answer = self.predict1(question=question, **kwargs).answer + judgement = self.predict2(question=question, answer=answer, **kwargs) + return judgement + + async def stream_1(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Answer"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + async def stream_2(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## judgement ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Judgement"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[stream_1(), stream_2()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="answer"), + dspy.streaming.StreamListener(signature_field_name="judgement"), + ], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Separate chunks by listener + answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"] + judgement_chunks = [c for c in all_chunks if c.signature_field_name == "judgement"] + + # Each listener should have exactly one chunk with is_last_chunk=True + assert len(answer_chunks) > 0, "Should have answer chunks" + assert len(judgement_chunks) > 0, "Should have judgement chunks" + + answer_last_count = sum(1 for c in answer_chunks if c.is_last_chunk) + judgement_last_count = sum(1 for c in judgement_chunks if c.is_last_chunk) + + assert answer_last_count == 1, ( + f"Answer listener should have exactly 1 chunk with is_last_chunk=True, got {answer_last_count}" + ) + assert judgement_last_count == 1, ( + f"Judgement listener should have exactly 1 chunk with is_last_chunk=True, got {judgement_last_count}" + ) + + # The final chunk for each listener should be the last one + assert answer_chunks[-1].is_last_chunk is True, "Last answer chunk must have is_last_chunk=True" + assert judgement_chunks[-1].is_last_chunk is True, "Last judgement chunk must have is_last_chunk=True" + + +@pytest.mark.anyio +async def test_is_last_chunk_with_few_tokens(): + """Test that even responses with few tokens have is_last_chunk=True.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def few_tokens_stream(*args, **kwargs): + # Just a few tokens + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="OK"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[few_tokens_stream()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Should have at least one chunk with is_last_chunk=True + assert len(all_chunks) > 0, "Should have at least one chunk" + assert any(c.is_last_chunk for c in all_chunks), "At least one chunk should have is_last_chunk=True" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True" + + +@pytest.mark.anyio +async def test_is_last_chunk_order_invariant(): + """Test that is_last_chunk appears exactly once per listener regardless of chunk ordering.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def varied_chunk_sizes_stream(*args, **kwargs): + # Mix of different sized chunks + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="A"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=""))]) # Empty chunk + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="BC"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="DEF"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content=""))]) # Another empty + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + with mock.patch("litellm.acompletion", new_callable=AsyncMock, side_effect=[varied_chunk_sizes_stream()]): + program = dspy.streamify( + MyProgram(), + stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Count chunks with is_last_chunk=True + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + assert last_chunk_count == 1, f"Should have exactly 1 chunk with is_last_chunk=True, got {last_chunk_count}" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True" + + +@pytest.mark.anyio +async def test_is_last_chunk_with_allow_reuse(): + """Test that is_last_chunk works correctly when allow_reuse=True and listener is used multiple times.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + # Call predict twice to test reusability + self.predict(question=question, **kwargs) + return self.predict(question=question, **kwargs) + + async def stream_call(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Response"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + stream_generators = [stream_call, stream_call] + + async def completion_side_effect(*args, **kwargs): + return stream_generators.pop(0)() + + # Test with allow_reuse=True + with mock.patch("litellm.acompletion", side_effect=completion_side_effect): + program = dspy.streamify( + MyProgram(), + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="answer", allow_reuse=True), + ], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Should have chunks from both predict calls + assert len(all_chunks) > 0, "Should have chunks from reused listener" + + # Count how many times is_last_chunk=True appears + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + + # With allow_reuse=True, the listener should be used for BOTH predict calls + # So we should see TWO final chunks (one for each predict call) + assert last_chunk_count == 2, ( + f"With allow_reuse=True, should have 2 final chunks (one per call), got {last_chunk_count}" + ) + + # Verify the listener captured both responses + concat_message = "".join([chunk.chunk for chunk in all_chunks if chunk.chunk]) + assert "Response" in concat_message, "Should have captured response content" + + +@pytest.mark.anyio +async def test_is_last_chunk_without_allow_reuse(): + """Test that is_last_chunk works correctly when allow_reuse=False (default) and listener stops after first use.""" + + class MyProgram(dspy.Module): + def __init__(self): + super().__init__() + self.predict = dspy.Predict("question->answer") + + def forward(self, question, **kwargs): + # Call predict twice - but with allow_reuse=False, only first should be captured + self.predict(question=question, **kwargs) + return self.predict(question=question, **kwargs) + + async def stream_call(*args, **kwargs): + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## answer ## ]]"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="Response"))]) + yield ModelResponseStream(model="gpt", choices=[StreamingChoices(delta=Delta(content="[[ ## completed ## ]]"))]) + + stream_generators = [stream_call, stream_call] + + async def completion_side_effect(*args, **kwargs): + return stream_generators.pop(0)() + + # Test with allow_reuse=False (default) + with mock.patch("litellm.acompletion", side_effect=completion_side_effect): + program = dspy.streamify( + MyProgram(), + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="answer", allow_reuse=False), + ], + ) + with dspy.context(lm=dspy.LM("gpt", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="test") + all_chunks = [] + async for value in output: + if isinstance(value, dspy.streaming.StreamResponse): + all_chunks.append(value) + + # Should have chunks from only the first predict call + assert len(all_chunks) > 0, "Should have chunks from first call" + + # Count how many times is_last_chunk=True appears + last_chunk_count = sum(1 for chunk in all_chunks if chunk.is_last_chunk) + + # With allow_reuse=False (default), the listener should stop after first use + # So we should see only ONE final chunk + assert last_chunk_count == 1, f"With allow_reuse=False, should have 1 final chunk, got {last_chunk_count}" + assert all_chunks[-1].is_last_chunk is True, "Last chunk must have is_last_chunk=True"