diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index f586de74ff..649bfbabfc 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -59,6 +59,11 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip events with no data (e.g., standalone retry/id directives) + # Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON + if not sse.data or sse.data.strip() == "": + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() @@ -161,6 +166,11 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip events with no data (e.g., standalone retry/id directives) + # Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON + if not sse.data or sse.data.strip() == "": + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 04f8e51abd..382c2097bc 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -216,6 +216,76 @@ def body() -> Iterator[bytes]: assert sse.json() == {"content": "известни"} +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_retry_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that standalone retry directives (with no data field) are skipped. + + This is a common pattern in SSE streams, especially from providers like Anthropic, + where a retry directive is sent at the beginning of the stream: + + retry: 3000 + + data: {"actual":"content"} + + The retry directive is a valid SSE meta-field but should not be parsed as JSON data. + """ + def body() -> Iterator[bytes]: + # Standalone retry directive (no data field) + yield b"retry: 3000\n" + yield b"\n" + # Actual data event + yield b'data: {"foo":true}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + # First SSE event has retry but no data - should be yielded by decoder + sse = await iter_next(iterator) + assert sse.retry == 3000 + assert sse.data == "" + + # Second SSE event has actual data + sse = await iter_next(iterator) + assert sse.json() == {"foo": True} + + await assert_empty_iter(iterator) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_retry_with_id_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that SSE events with only meta-fields (retry, id, event) and no data are skipped. + + Per the SSE spec, these are valid meta-only events that configure the event stream + but don't contain actual data to be processed. + """ + def body() -> Iterator[bytes]: + # Meta-only event with retry and id + yield b"id: msg_123\n" + yield b"retry: 5000\n" + yield b"\n" + # Actual data event + yield b"id: msg_124\n" + yield b'data: {"bar":false}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + # First SSE event is meta-only + sse = await iter_next(iterator) + assert sse.id == "msg_123" + assert sse.retry == 5000 + assert sse.data == "" + + # Second SSE event has actual data + sse = await iter_next(iterator) + assert sse.id == "msg_124" + assert sse.json() == {"bar": False} + + await assert_empty_iter(iterator) + + async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]: for chunk in iter: yield chunk