From 9aedefdb8c9b54a7bf2acb6e191e768898976104 Mon Sep 17 00:00:00 2001 From: Lucas Alencar Xisto Date: Sun, 31 Aug 2025 21:33:09 -0300 Subject: [PATCH 1/5] tests(images): add coverage for optional content_filter_results on Image --- tests/test_images_missing_fields.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tests/test_images_missing_fields.py diff --git a/tests/test_images_missing_fields.py b/tests/test_images_missing_fields.py new file mode 100644 index 0000000000..3baef8ce21 --- /dev/null +++ b/tests/test_images_missing_fields.py @@ -0,0 +1,50 @@ +import httpx +import pytest +from openai import AsyncOpenAI, DefaultAsyncHttpxClient + +@pytest.mark.anyio +async def test_images_generate_includes_content_filter_results_async(): + """ + Ensure the Image model exposes optional fields returned by the API, + specifically `content_filter_results` (keeping `revised_prompt` coverage). + """ + mock_json = { + "created": 1711111111, + "data": [ + { + "url": "https://example.test/cat.png", + "revised_prompt": "a cute cat wearing sunglasses", + "content_filter_results": { + "sexual_minors": {"filtered": False}, + "violence": {"filtered": False}, + }, + } + ], + } + + # Async handler because we'll use AsyncOpenAI (httpx.AsyncClient under the hood) + async def ahandler(request: httpx.Request) -> httpx.Response: + assert "images" in str(request.url).lower() + return httpx.Response(200, json=mock_json) + + atransport = httpx.MockTransport(ahandler) + + client = AsyncOpenAI( + api_key="test", + http_client=DefaultAsyncHttpxClient(transport=atransport), + timeout=10.0, + ) + + resp = await client.images.generate(model="gpt-image-1", prompt="cat with glasses") # type: ignore + + assert hasattr(resp, "data") and isinstance(resp.data, list) and resp.data + item = resp.data[0] + + # existing field + assert item.revised_prompt == "a cute cat wearing sunglasses" + + # new optional field + cfr = item.content_filter_results + assert isinstance(cfr, dict), f"content_filter_results should be dict, got {type(cfr)}" + assert cfr.get("violence", {}).get("filtered") is False + assert cfr.get("sexual_minors", {}).get("filtered") is False From 6ae5e2b1afefb7f743ba16b8a1752501212d1283 Mon Sep 17 00:00:00 2001 From: Lucas Alencar Xisto Date: Sat, 6 Sep 2025 21:27:14 -0300 Subject: [PATCH 2/5] test: dedupe retries/timeouts, add conftest with fake OPENAI_API_KEY --- tests/conftest.py | 8 +++ tests/retries/__init__.py | 2 + tests/retries/test_retry_after.py | 42 ++++++++++++ tests/test_images_missing_fields.py | 100 ++++++++++++++-------------- tests/timeouts/__init__.py | 2 + tests/timeouts/_util.py | 19 ++++++ tests/timeouts/test_overrides.py | 28 ++++++++ 7 files changed, 151 insertions(+), 50 deletions(-) create mode 100644 tests/retries/__init__.py create mode 100644 tests/retries/test_retry_after.py create mode 100644 tests/timeouts/__init__.py create mode 100644 tests/timeouts/_util.py create mode 100644 tests/timeouts/test_overrides.py diff --git a/tests/conftest.py b/tests/conftest.py index 408bcf76c0..4cd6109426 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,14 @@ logging.getLogger("openai").setLevel(logging.DEBUG) +# Autouse fixture to ensure an API key is always set for tests +@pytest.fixture(autouse=True) +def _fake_openai_key(monkeypatch: pytest.MonkeyPatch) -> None: + # evita dependência real de credencial + monkeypatch.setenv("OPENAI_API_KEY", "test") + yield + + # automatically add `pytest.mark.asyncio()` to all of our async tests # so we don't have to add that boilerplate everywhere def pytest_collection_modifyitems(items: list[pytest.Function]) -> None: diff --git a/tests/retries/__init__.py b/tests/retries/__init__.py new file mode 100644 index 0000000000..5c8018e448 --- /dev/null +++ b/tests/retries/__init__.py @@ -0,0 +1,2 @@ +"""Tests related to retry behavior.""" + diff --git a/tests/retries/test_retry_after.py b/tests/retries/test_retry_after.py new file mode 100644 index 0000000000..d6b03602fd --- /dev/null +++ b/tests/retries/test_retry_after.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os +from unittest import mock + +import httpx +import pytest +from respx import MockRouter + +from openai import OpenAI + + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") + + +def _low_retry_timeout(*_args, **_kwargs) -> float: + return 0.01 + + +@mock.patch("openai._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) +@pytest.mark.respx(base_url=base_url) +def test_retry_after_header_is_respected(respx_mock: MockRouter, client: OpenAI) -> None: + attempts = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["n"] += 1 + if attempts["n"] == 1: + return httpx.Response(429, headers={"Retry-After": "2"}, json={"err": "rate"}) + return httpx.Response(200, json={"ok": True}) + + respx_mock.post("/chat/completions").mock(side_effect=handler) + + client = client.with_options(max_retries=3) + + response = client.chat.completions.with_raw_response.create( + messages=[{"content": "hi", "role": "user"}], + model="gpt-4o", + ) + + assert response.retries_taken == 1 + assert int(response.http_request.headers.get("x-stainless-retry-count")) == 1 + diff --git a/tests/test_images_missing_fields.py b/tests/test_images_missing_fields.py index 3baef8ce21..3dfe1bff04 100644 --- a/tests/test_images_missing_fields.py +++ b/tests/test_images_missing_fields.py @@ -1,50 +1,50 @@ -import httpx -import pytest -from openai import AsyncOpenAI, DefaultAsyncHttpxClient - -@pytest.mark.anyio -async def test_images_generate_includes_content_filter_results_async(): - """ - Ensure the Image model exposes optional fields returned by the API, - specifically `content_filter_results` (keeping `revised_prompt` coverage). - """ - mock_json = { - "created": 1711111111, - "data": [ - { - "url": "https://example.test/cat.png", - "revised_prompt": "a cute cat wearing sunglasses", - "content_filter_results": { - "sexual_minors": {"filtered": False}, - "violence": {"filtered": False}, - }, - } - ], - } - - # Async handler because we'll use AsyncOpenAI (httpx.AsyncClient under the hood) - async def ahandler(request: httpx.Request) -> httpx.Response: - assert "images" in str(request.url).lower() - return httpx.Response(200, json=mock_json) - - atransport = httpx.MockTransport(ahandler) - - client = AsyncOpenAI( - api_key="test", - http_client=DefaultAsyncHttpxClient(transport=atransport), - timeout=10.0, - ) - - resp = await client.images.generate(model="gpt-image-1", prompt="cat with glasses") # type: ignore - - assert hasattr(resp, "data") and isinstance(resp.data, list) and resp.data - item = resp.data[0] - - # existing field - assert item.revised_prompt == "a cute cat wearing sunglasses" - - # new optional field - cfr = item.content_filter_results - assert isinstance(cfr, dict), f"content_filter_results should be dict, got {type(cfr)}" - assert cfr.get("violence", {}).get("filtered") is False - assert cfr.get("sexual_minors", {}).get("filtered") is False +import httpx +import pytest +from openai import AsyncOpenAI, DefaultAsyncHttpxClient + +@pytest.mark.anyio +async def test_images_generate_includes_content_filter_results_async(): + """ + Ensure the Image model exposes optional fields returned by the API, + specifically `content_filter_results` (keeping `revised_prompt` coverage). + """ + mock_json = { + "created": 1711111111, + "data": [ + { + "url": "https://example.test/cat.png", + "revised_prompt": "a cute cat wearing sunglasses", + "content_filter_results": { + "sexual_minors": {"filtered": False}, + "violence": {"filtered": False}, + }, + } + ], + } + + # Async handler because we'll use AsyncOpenAI (httpx.AsyncClient under the hood) + async def ahandler(request: httpx.Request) -> httpx.Response: + assert "images" in str(request.url).lower() + return httpx.Response(200, json=mock_json) + + atransport = httpx.MockTransport(ahandler) + + client = AsyncOpenAI( + api_key="test", + http_client=DefaultAsyncHttpxClient(transport=atransport), + timeout=10.0, + ) + + resp = await client.images.generate(model="gpt-image-1", prompt="cat with glasses") # type: ignore + + assert hasattr(resp, "data") and isinstance(resp.data, list) and resp.data + item = resp.data[0] + + # existing field + assert item.revised_prompt == "a cute cat wearing sunglasses" + + # new optional field + cfr = item.content_filter_results + assert isinstance(cfr, dict), f"content_filter_results should be dict, got {type(cfr)}" + assert cfr.get("violence", {}).get("filtered") is False + assert cfr.get("sexual_minors", {}).get("filtered") is False diff --git a/tests/timeouts/__init__.py b/tests/timeouts/__init__.py new file mode 100644 index 0000000000..dec9aed6b3 --- /dev/null +++ b/tests/timeouts/__init__.py @@ -0,0 +1,2 @@ +"""Tests related to timeout behavior.""" + diff --git a/tests/timeouts/_util.py b/tests/timeouts/_util.py new file mode 100644 index 0000000000..f37fc027cb --- /dev/null +++ b/tests/timeouts/_util.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +def assert_timeout_eq(value, expected: float) -> None: + """Assert that a timeout-like value equals the expected seconds. + + Supports plain numeric timeouts or httpx.Timeout instances. + """ + from httpx import Timeout + + if isinstance(value, (int, float)): + assert float(value) == expected + elif isinstance(value, Timeout): + assert any( + getattr(value, f, None) in (None, expected) + for f in ("read", "connect", "write") + ), f"Timeout fields do not match {expected}: {value!r}" + else: + raise AssertionError(f"Unexpected timeout type: {type(value)}") + diff --git a/tests/timeouts/test_overrides.py b/tests/timeouts/test_overrides.py new file mode 100644 index 0000000000..649a2df36f --- /dev/null +++ b/tests/timeouts/test_overrides.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import os + +import httpx +import pytest + +from openai import OpenAI +from openai._models import FinalRequestOptions +from openai._base_client import DEFAULT_TIMEOUT + + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") + + +def test_per_request_timeout_overrides_default(client: OpenAI) -> None: + # default timeout applied when none provided per-request + request = client._build_request(FinalRequestOptions(method="get", url="/foo")) + timeout = httpx.Timeout(**request.extensions["timeout"]) # type: ignore[arg-type] + assert timeout == DEFAULT_TIMEOUT + + # per-request timeout overrides the default + request = client._build_request( + FinalRequestOptions(method="get", url="/foo", timeout=httpx.Timeout(100.0)) + ) + timeout = httpx.Timeout(**request.extensions["timeout"]) # type: ignore[arg-type] + assert timeout == httpx.Timeout(100.0) + From 4a8456a936531d92d6cd0c927062f01a65512d09 Mon Sep 17 00:00:00 2001 From: Lucas Alencar Xisto Date: Sun, 7 Sep 2025 02:33:29 -0300 Subject: [PATCH 3/5] chore: snapshot before archive --- README.md | 31 ++++++++++++++++++++++++++++++- src/openai/types/image.py | 9 ++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d4b8d8d170..96fd309859 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ from openai import OpenAI client = OpenAI() -response = client.chat.responses.create( +response = client.responses.create( input=[ { "role": "user", @@ -622,6 +622,35 @@ client.with_options(timeout=5.0).chat.completions.create( model="gpt-4o", ) ``` +- from openai import OpenAI ++ import httpx ++ from openai import OpenAI + +import httpx +from openai import OpenAI + +# Configure the default for all requests: +client = OpenAI( + # 20 seconds (default is 10 minutes) + timeout=20.0, +) + +# More granular control: +client = OpenAI( + timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0), +) + +# Override per-request: +client.with_options(timeout=5.0).chat.completions.create( + messages=[ + { + "role": "user", + "content": "How can I list all files in a directory using Python?", + } + ], + model="gpt-4o", +) + On timeout, an `APITimeoutError` is thrown. diff --git a/src/openai/types/image.py b/src/openai/types/image.py index ecaef3fd58..c3334365d2 100644 --- a/src/openai/types/image.py +++ b/src/openai/types/image.py @@ -1,6 +1,6 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. -from typing import Optional +from typing import Optional, Dict, Any from .._models import BaseModel @@ -24,3 +24,10 @@ class Image(BaseModel): `response_format` is set to `url` (default value). Unsupported for `gpt-image-1`. """ + + content_filter_results: Optional[Dict[str, Any]] = None + """Optional content filter metadata returned by the API. + + Includes safety-related categories (e.g. sexual_minors, violence, etc.) + indicating whether the image was flagged or filtered. + """ From 57e5a0eafac042f1cbcbb0545fa0701e639cd1eb Mon Sep 17 00:00:00 2001 From: Lucas Alencar Xisto Date: Thu, 18 Sep 2025 20:49:51 -0300 Subject: [PATCH 4/5] docs(examples): add async streaming demos for Responses and Chat Completions --- examples/async_chat_stream.py | 22 ++++++++++++++++++++++ examples/async_responses_stream.py | 21 +++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 examples/async_chat_stream.py create mode 100644 examples/async_responses_stream.py diff --git a/examples/async_chat_stream.py b/examples/async_chat_stream.py new file mode 100644 index 0000000000..22a0c748d4 --- /dev/null +++ b/examples/async_chat_stream.py @@ -0,0 +1,22 @@ + +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI + +client = AsyncOpenAI() + +async def main() -> None: + # Chat Completions is still supported indefinitely, but it's no longer the primary API. + # This example remains useful for users who still rely on chat.completions. + stream = await client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": "Say this is a test (streaming)."}], + stream=True, + ) + async for chunk in stream: + # Some users prefer accessing delta objects; for demo purposes, printing the chunk is enough. + print(chunk) + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/examples/async_responses_stream.py b/examples/async_responses_stream.py new file mode 100644 index 0000000000..880ee89d2f --- /dev/null +++ b/examples/async_responses_stream.py @@ -0,0 +1,21 @@ + +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI + +client = AsyncOpenAI() + +async def main() -> None: + # Async streaming with the Responses API (the recommended primary API). + stream = await client.responses.create( + model="gpt-4o-mini", + input="Write a one-sentence bedtime story about a unicorn.", + stream=True, + ) + async for event in stream: + # Each event may contain deltas and final results; printing directly is sufficient for demo purposes. + print(event) + +if __name__ == "__main__": + asyncio.run(main()) + From 8e3227346b8e3cc5c76adcd60a5c535d9f2b3e5a Mon Sep 17 00:00:00 2001 From: Lucas Alencar Xisto Date: Sun, 21 Sep 2025 12:25:53 -0300 Subject: [PATCH 5/5] tests: solidify timeout harness (MockTransport+TCP) + local SSE; drop client-timeout wrapper test for now --- .python-version | 2 +- README.md | 2 +- examples/async_responses_stream.py | 2 - examples/async_stream_unified.py | 25 ++++ pyproject.toml | 3 +- requirements-dev.lock | 17 +-- requirements.lock | 9 +- .../{_streaming.py => _streaming/__init__.py} | 24 +++- src/openai/_streaming/adapters.py | 47 +++++++ src/openai/_streaming/unified.py | 37 ++++++ src/openai/_streaming/wrap.py | 25 ++++ .../resources/chat/completions/completions.py | 17 ++- src/openai/resources/responses/responses.py | 32 +++-- src/openai/streaming/adapters.py | 37 ++++++ src/openai/streaming/unified.py | 24 ++++ src/openai/types/image.py | 2 +- tests/retries/__init__.py | 1 - tests/retries/test_retry_after.py | 2 - tests/test_images_missing_fields.py | 2 + tests/test_streaming_state_harness.py | 70 ++++++++++ tests/test_streaming_unified_basic.py | 38 ++++++ tests/test_streaming_unified_integration.py | 88 +++++++++++++ tests/test_streaming_with_local_sse.py | 120 ++++++++++++++++++ tests/test_timeouts.py | 54 ++++++++ tests/test_timeouts_client.py | 27 ++++ tests/tests/test_timeouts_client.py | 0 tests/timeouts/__init__.py | 1 - tests/timeouts/_util.py | 9 +- tests/timeouts/test_overrides.py | 7 +- 29 files changed, 666 insertions(+), 58 deletions(-) create mode 100644 examples/async_stream_unified.py rename src/openai/{_streaming.py => _streaming/__init__.py} (96%) create mode 100644 src/openai/_streaming/adapters.py create mode 100644 src/openai/_streaming/unified.py create mode 100644 src/openai/_streaming/wrap.py create mode 100644 src/openai/streaming/adapters.py create mode 100644 src/openai/streaming/unified.py create mode 100644 tests/test_streaming_state_harness.py create mode 100644 tests/test_streaming_unified_basic.py create mode 100644 tests/test_streaming_unified_integration.py create mode 100644 tests/test_streaming_with_local_sse.py create mode 100644 tests/test_timeouts.py create mode 100644 tests/test_timeouts_client.py create mode 100644 tests/tests/test_timeouts_client.py diff --git a/.python-version b/.python-version index 43077b2460..f3fe474aee 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.9.18 +3.12.9 diff --git a/README.md b/README.md index 96fd309859..ba603a716e 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ from openai import OpenAI client = OpenAI() -response = client.responses.create( +response = client.responses.create( input=[ { "role": "user", diff --git a/examples/async_responses_stream.py b/examples/async_responses_stream.py index 880ee89d2f..d2a3ffa9e4 100644 --- a/examples/async_responses_stream.py +++ b/examples/async_responses_stream.py @@ -1,4 +1,3 @@ - #!/usr/bin/env -S rye run python import asyncio from openai import AsyncOpenAI @@ -18,4 +17,3 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) - diff --git a/examples/async_stream_unified.py b/examples/async_stream_unified.py new file mode 100644 index 0000000000..5b8ee2ccc9 --- /dev/null +++ b/examples/async_stream_unified.py @@ -0,0 +1,25 @@ +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI +from openai._streaming.unified import extract_text, StreamEvent +from openai._streaming.adapters import ResponsesEventAdapter # or ChatCompletionsEventAdapter + + +async def main(): + client = AsyncOpenAI() + # Example with Responses stream (manual adapter for now) + async with client.responses.stream( + model="gpt-4.1-mini", + input="Write a single haiku about async streams.", + ) as stream: + async for raw_evt in stream: + # Convert raw event into a unified StreamEvent + ev: StreamEvent = ResponsesEventAdapter.adapt(raw_evt) + txt = extract_text(ev) + if txt: + print(txt, end="") + print() # newline at the end + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 6736c1ad9e..1be0e62865 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "sniffio", "tqdm > 4", "jiter>=0.4.0, <1", + "starlette>=0.48.0", ] requires-python = ">= 3.8" classifiers = [ @@ -56,7 +57,7 @@ dev-dependencies = [ "pyright==1.1.399", "mypy", "respx", - "pytest", + "pytest>=8.4.2", "pytest-asyncio", "ruff", "time-machine", diff --git a/requirements-dev.lock b/requirements-dev.lock index e8bea53014..8eb0deab87 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -22,12 +22,11 @@ annotated-types==0.6.0 anyio==4.1.0 # via httpx # via openai + # via starlette argcomplete==3.1.2 # via nox asttokens==2.4.1 # via inline-snapshot -async-timeout==5.0.1 - # via aiohttp attrs==24.2.0 # via aiohttp # via outcome @@ -57,10 +56,6 @@ distlib==0.3.7 # via virtualenv distro==1.8.0 # via openai -exceptiongroup==1.2.2 - # via anyio - # via pytest - # via trio execnet==2.1.1 # via pytest-xdist executing==2.2.0 @@ -146,7 +141,7 @@ pygments==2.18.0 pyjwt==2.8.0 # via msal pyright==1.1.399 -pytest==8.4.1 +pytest==8.4.2 # via inline-snapshot # via pytest-asyncio # via pytest-xdist @@ -179,11 +174,9 @@ sortedcontainers==2.4.0 # via trio sounddevice==0.5.1 # via openai +starlette==0.48.0 + # via openai time-machine==2.9.0 -tomli==2.0.2 - # via inline-snapshot - # via mypy - # via pytest tqdm==4.66.5 # via openai trio==0.27.0 @@ -194,12 +187,12 @@ types-tqdm==4.66.0.20240417 typing-extensions==4.12.2 # via azure-core # via azure-identity - # via multidict # via mypy # via openai # via pydantic # via pydantic-core # via pyright + # via starlette tzdata==2024.1 # via pandas urllib3==2.2.1 diff --git a/requirements.lock b/requirements.lock index 3b6ece87e2..7d1b27c358 100644 --- a/requirements.lock +++ b/requirements.lock @@ -22,8 +22,7 @@ annotated-types==0.6.0 anyio==4.1.0 # via httpx # via openai -async-timeout==5.0.1 - # via aiohttp + # via starlette attrs==25.3.0 # via aiohttp certifi==2023.7.22 @@ -33,8 +32,6 @@ cffi==1.17.1 # via sounddevice distro==1.8.0 # via openai -exceptiongroup==1.2.2 - # via anyio frozenlist==1.7.0 # via aiohttp # via aiosignal @@ -84,15 +81,17 @@ sniffio==1.3.0 # via openai sounddevice==0.5.1 # via openai +starlette==0.48.0 + # via openai tqdm==4.66.5 # via openai types-pytz==2024.2.0.20241003 # via pandas-stubs typing-extensions==4.12.2 - # via multidict # via openai # via pydantic # via pydantic-core + # via starlette tzdata==2024.1 # via pandas websockets==15.0.1 diff --git a/src/openai/_streaming.py b/src/openai/_streaming/__init__.py similarity index 96% rename from src/openai/_streaming.py rename to src/openai/_streaming/__init__.py index f586de74ff..40d21b8976 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming/__init__.py @@ -9,11 +9,11 @@ import httpx -from ._utils import is_mapping, extract_type_var_from_base -from ._exceptions import APIError +from .._utils import is_mapping, extract_type_var_from_base +from .._exceptions import APIError if TYPE_CHECKING: - from ._client import OpenAI, AsyncOpenAI + from .._client import OpenAI, AsyncOpenAI _T = TypeVar("_T") @@ -400,7 +400,7 @@ class MyStream(Stream[bytes]): extract_stream_chunk_type(MyStream) -> bytes ``` """ - from ._base_client import Stream, AsyncStream + from .._base_client import Stream, AsyncStream return extract_type_var_from_base( stream_cls, @@ -408,3 +408,19 @@ class MyStream(Stream[bytes]): generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)), failure_message=failure_message, ) + + +from .unified import StreamEvent, extract_text +from .adapters import ResponsesEventAdapter, ChatCompletionsEventAdapter +from .wrap import _wrap_unified + + +__all__ = [ + "Stream", + "AsyncStream", + "StreamEvent", + "extract_text", + "ResponsesEventAdapter", + "ChatCompletionsEventAdapter", + "_wrap_unified", +] diff --git a/src/openai/_streaming/adapters.py b/src/openai/_streaming/adapters.py new file mode 100644 index 0000000000..ec68490e75 --- /dev/null +++ b/src/openai/_streaming/adapters.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from typing import Any + +from .unified import StreamEvent + + +class ResponsesEventAdapter: + """Map Responses API streaming events into the unified stream shape.""" + + @staticmethod + def adapt(event: Any) -> StreamEvent: + event_type = getattr(event, "type", None) + + if event_type == "response.output_text.delta": + return StreamEvent( + type="output_text.delta", + delta=getattr(event, "delta", None), + raw=event, + ) + + if event_type == "response.completed": + return StreamEvent(type="response.completed", raw=event) + + return StreamEvent(type="response.error", raw=event) + + +class ChatCompletionsEventAdapter: + """Map Chat Completions streaming chunks into the unified stream shape.""" + + @staticmethod + def adapt(chunk: Any) -> StreamEvent: + try: + choice = chunk.choices[0] + delta = getattr(getattr(choice, "delta", None), "content", None) + if delta: + return StreamEvent(type="output_text.delta", delta=delta, raw=chunk) + except Exception: # pragma: no cover - defensive adapter guard + pass + + return StreamEvent(type="response.completed", raw=chunk) + + +__all__ = [ + "ChatCompletionsEventAdapter", + "ResponsesEventAdapter", +] diff --git a/src/openai/_streaming/unified.py b/src/openai/_streaming/unified.py new file mode 100644 index 0000000000..4c98ed853e --- /dev/null +++ b/src/openai/_streaming/unified.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Literal, Optional + +EventType = Literal[ + "message_start", + "output_text.delta", + "output_text.done", + "tool_call.delta", + "tool_call.done", + "response.completed", + "response.error", +] + + +@dataclass +class StreamEvent: + type: EventType + model: Optional[str] = None + delta: Optional[str] = None + full_text: Optional[str] = None + tool_call_id: Optional[str] = None + raw: Optional[Any] = None + + +def extract_text(event: StreamEvent) -> str: + """Return the text delta from a stream event.""" + + return event.delta or "" + + +__all__ = [ + "EventType", + "StreamEvent", + "extract_text", +] diff --git a/src/openai/_streaming/wrap.py b/src/openai/_streaming/wrap.py new file mode 100644 index 0000000000..8b88aae74b --- /dev/null +++ b/src/openai/_streaming/wrap.py @@ -0,0 +1,25 @@ +from contextlib import asynccontextmanager +from typing import Any, AsyncIterator, Callable +from .unified import StreamEvent + +async def _map_async_iter( + source: AsyncIterator[Any], + fn: Callable[[Any], StreamEvent], +) -> AsyncIterator[StreamEvent]: + async for item in source: + yield fn(item) + +@asynccontextmanager +async def _wrap_unified(cm, adapter_fn: Callable[[Any], StreamEvent]): + """ + Wrap an existing async context manager (cm) that yields an async iterator of raw events, + and expose a context manager that yields an async iterator of adapted StreamEvent. + """ + async with cm as underlying: + async def _mapped(): + async for raw in underlying: + yield adapter_fn(raw) + yield _mapped() + +# Optional alias if something imported the non-underscored name before +map_async_iter = _map_async_iter diff --git a/src/openai/resources/chat/completions/completions.py b/src/openai/resources/chat/completions/completions.py index 7e209ff0ee..38ec3a0a2a 100644 --- a/src/openai/resources/chat/completions/completions.py +++ b/src/openai/resources/chat/completions/completions.py @@ -3,8 +3,9 @@ from __future__ import annotations import inspect -from typing import Dict, List, Type, Union, Iterable, Optional, cast from functools import partial +from typing import Any, Dict, List, Type, Union, Iterable, Optional, AsyncIterator, AsyncContextManager, cast + from typing_extensions import Literal, overload import httpx @@ -2847,13 +2848,14 @@ def stream( user: str | NotGiven = NOT_GIVEN, verbosity: Optional[Literal["low", "medium", "high"]] | NotGiven = NOT_GIVEN, web_search_options: completion_create_params.WebSearchOptions | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncChatCompletionStreamManager[ResponseFormatT]: + ) -> AsyncChatCompletionStreamManager[ResponseFormatT] | AsyncContextManager[AsyncIterator[Any]]: """Wrapper over the `client.chat.completions.create(stream=True)` method that provides a more granular event API and automatic accumulation of each delta. @@ -2923,12 +2925,19 @@ def stream( extra_body=extra_body, timeout=timeout, ) - return AsyncChatCompletionStreamManager( + manager: AsyncChatCompletionStreamManager[ResponseFormatT] = AsyncChatCompletionStreamManager( api_request, response_format=response_format, input_tools=tools, ) + if not unified: + return manager + + from openai._streaming import _wrap_unified, ChatCompletionsEventAdapter + + return _wrap_unified(manager, ChatCompletionsEventAdapter.adapt) + class CompletionsWithRawResponse: def __init__(self, completions: Completions) -> None: @@ -3040,8 +3049,6 @@ def __init__(self, completions: AsyncCompletions) -> None: @cached_property def messages(self) -> AsyncMessagesWithStreamingResponse: return AsyncMessagesWithStreamingResponse(self._completions.messages) - - def validate_response_format(response_format: object) -> None: if inspect.isclass(response_format) and issubclass(response_format, pydantic.BaseModel): raise TypeError( diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index 062fd491f2..fd7ecb4fa0 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any, List, Type, Union, Iterable, Optional, cast +from typing import Any, List, Type, Union, Iterable, Optional, AsyncIterator, AsyncContextManager, cast from functools import partial from typing_extensions import Literal, overload @@ -895,7 +895,7 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, @@ -1057,7 +1057,7 @@ def parse( stream: Optional[Literal[False]] | Literal[True] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, top_logprobs: Optional[int] | NotGiven = NOT_GIVEN, @@ -2263,12 +2263,13 @@ def stream( text_format: type[TextFormatT] | NotGiven = NOT_GIVEN, starting_after: int | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, + unified: bool = False, # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: ... + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: ... @overload def stream( @@ -2289,18 +2290,19 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, user: str | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: ... + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: ... def stream( self, @@ -2321,19 +2323,20 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, user: str | NotGiven = NOT_GIVEN, starting_after: int | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: new_response_args = { "input": input, "model": model, @@ -2405,7 +2408,7 @@ def stream( timeout=timeout, ) - return AsyncResponseStreamManager( + manager: AsyncResponseStreamManager[TextFormatT] = AsyncResponseStreamManager( api_request, text_format=text_format, input_tools=tools, @@ -2424,13 +2427,20 @@ def stream( extra_body=extra_body, timeout=timeout, ) - return AsyncResponseStreamManager( + manager = AsyncResponseStreamManager( api_request, text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, ) + if not unified: + return manager + + from openai._streaming import _wrap_unified, ResponsesEventAdapter + + return _wrap_unified(manager, ResponsesEventAdapter.adapt) + async def parse( self, *, @@ -2455,7 +2465,7 @@ async def parse( stream: Optional[Literal[False]] | Literal[True] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, top_logprobs: Optional[int] | NotGiven = NOT_GIVEN, diff --git a/src/openai/streaming/adapters.py b/src/openai/streaming/adapters.py new file mode 100644 index 0000000000..e21af62d4d --- /dev/null +++ b/src/openai/streaming/adapters.py @@ -0,0 +1,37 @@ +from typing import Any +from .unified import StreamEvent + + +class ResponsesEventAdapter: + @staticmethod + def adapt(evt: Any) -> StreamEvent: + t = getattr(evt, "type", None) + if t == "response.output_text.delta": + return StreamEvent( + type="output_text.delta", + delta=getattr(evt, "delta", None), + raw=evt, + ) + if t == "response.completed": + return StreamEvent(type="response.completed", raw=evt) + # TODO: map additional event types (tool_call.*, errors, etc.) + return StreamEvent(type="response.error", raw=evt) + + +class ChatCompletionsEventAdapter: + @staticmethod + def adapt(chunk: Any) -> StreamEvent: + # Try to extract delta.content from the first choice + try: + choice0 = chunk.choices[0] + delta = getattr(getattr(choice0, "delta", None), "content", None) + if delta: + return StreamEvent( + type="output_text.delta", + delta=delta, + raw=chunk, + ) + except Exception: + pass + # TODO: add heuristics for completed and error events + return StreamEvent(type="response.completed", raw=chunk) diff --git a/src/openai/streaming/unified.py b/src/openai/streaming/unified.py new file mode 100644 index 0000000000..a93dbc1ea5 --- /dev/null +++ b/src/openai/streaming/unified.py @@ -0,0 +1,24 @@ +from dataclasses import dataclass +from typing import Optional, Literal, Any + +EventType = Literal[ + "message_start", + "output_text.delta", + "output_text.done", + "tool_call.delta", + "tool_call.done", + "response.completed", + "response.error", +] + +@dataclass +class StreamEvent: + type: EventType + model: Optional[str] = None + delta: Optional[str] = None + full_text: Optional[str] = None + tool_call_id: Optional[str] = None + raw: Optional[Any] = None + +def extract_text(ev: "StreamEvent") -> str: + return ev.delta or "" diff --git a/src/openai/types/image.py b/src/openai/types/image.py index c3334365d2..d024bf8f3f 100644 --- a/src/openai/types/image.py +++ b/src/openai/types/image.py @@ -1,6 +1,6 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional from .._models import BaseModel diff --git a/tests/retries/__init__.py b/tests/retries/__init__.py index 5c8018e448..e7938fcabb 100644 --- a/tests/retries/__init__.py +++ b/tests/retries/__init__.py @@ -1,2 +1 @@ """Tests related to retry behavior.""" - diff --git a/tests/retries/test_retry_after.py b/tests/retries/test_retry_after.py index d6b03602fd..d394e4cb0c 100644 --- a/tests/retries/test_retry_after.py +++ b/tests/retries/test_retry_after.py @@ -9,7 +9,6 @@ from openai import OpenAI - base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") @@ -39,4 +38,3 @@ def handler(request: httpx.Request) -> httpx.Response: assert response.retries_taken == 1 assert int(response.http_request.headers.get("x-stainless-retry-count")) == 1 - diff --git a/tests/test_images_missing_fields.py b/tests/test_images_missing_fields.py index 3dfe1bff04..c054c842f4 100644 --- a/tests/test_images_missing_fields.py +++ b/tests/test_images_missing_fields.py @@ -1,7 +1,9 @@ import httpx import pytest + from openai import AsyncOpenAI, DefaultAsyncHttpxClient + @pytest.mark.anyio async def test_images_generate_includes_content_filter_results_async(): """ diff --git a/tests/test_streaming_state_harness.py b/tests/test_streaming_state_harness.py new file mode 100644 index 0000000000..8c68108a02 --- /dev/null +++ b/tests/test_streaming_state_harness.py @@ -0,0 +1,70 @@ +import asyncio +import pytest +from dataclasses import dataclass + +pytestmark = pytest.mark.asyncio + +def test_pytest_collection_sanity(): + assert True + +@dataclass +class _SSE: + event: str + data: dict + +class _FakeStream: + def __init__(self, events): + self._events = events + + def __aiter__(self): + self._it = iter(self._events) + return self + + async def __anext__(self): + try: + await asyncio.sleep(0) + return next(self._it) + except StopIteration: + raise StopAsyncIteration + +def _mk_events_ok(): + return [ + _SSE("response.created", {"id": "resp_1", "type": "response", "model": "gpt-4o-mini"}), + _SSE("response.output_text.created", {"index": 0, "id": "txt_1"}), + _SSE("response.output_text.delta", {"index": 0, "value": "Hello, "}), + _SSE("response.output_text.delta", {"index": 0, "value": "world!"}), + _SSE("response.completed", {"id": "resp_1", "status": "ok"}), + ] + +async def _consume(stream): + seen = [] + buffers = {} + async for ev in stream: + seen.append(ev.event) + if ev.event == "response.output_text.delta": + idx = ev.data.get("index", 0) + buffers[idx] = buffers.get(idx, "") + ev.data.get("value", "") + return seen, buffers.get(0, "") + +async def test_unified_happy_path_harness(): + seen, text = await _consume(_FakeStream(_mk_events_ok())) + assert seen == [ + "response.created", + "response.output_text.created", + "response.output_text.delta", + "response.output_text.delta", + "response.completed", + ] + assert text == "Hello, world!" + +@pytest.mark.parametrize("missing", [ + "response.created", + "response.output_text.created", + "response.completed", +]) +async def test_unified_resilience_harness(missing): + base = _mk_events_ok() + filtered = [e for e in base if e.event != missing] + seen, text = await _consume(_FakeStream(filtered)) + assert missing not in seen + assert isinstance(text, str) diff --git a/tests/test_streaming_unified_basic.py b/tests/test_streaming_unified_basic.py new file mode 100644 index 0000000000..f98f1fa336 --- /dev/null +++ b/tests/test_streaming_unified_basic.py @@ -0,0 +1,38 @@ +import pytest +from openai._streaming import ( + StreamEvent, + extract_text, + ResponsesEventAdapter, + ChatCompletionsEventAdapter, +) + +def test_responses_delta_mapping(): + class FakeEvt: + type = "response.output_text.delta" + delta = "foo" + ev: StreamEvent = ResponsesEventAdapter.adapt(FakeEvt()) + assert ev.type == "output_text.delta" + assert ev.delta == "foo" + +def test_responses_completed_mapping(): + class FakeEvt: + type = "response.completed" + ev = ResponsesEventAdapter.adapt(FakeEvt()) + assert ev.type == "response.completed" + +def test_chat_delta_mapping(): + class FakeDelta: content = "bar" + class FakeChoice: delta = FakeDelta() + class FakeChunk: choices = [FakeChoice()] + ev: StreamEvent = ChatCompletionsEventAdapter.adapt(FakeChunk()) + assert ev.type == "output_text.delta" + assert ev.delta == "bar" + +def test_chat_completed_when_no_delta(): + class FakeChunk: choices = [] # no delta available + ev = ChatCompletionsEventAdapter.adapt(FakeChunk()) + assert ev.type == "response.completed" + +def test_extract_text_returns_delta_or_empty(): + assert extract_text(StreamEvent(type="output_text.delta", delta="X")) == "X" + assert extract_text(StreamEvent(type="response.completed")) == "" diff --git a/tests/test_streaming_unified_integration.py b/tests/test_streaming_unified_integration.py new file mode 100644 index 0000000000..5b8a3bdc78 --- /dev/null +++ b/tests/test_streaming_unified_integration.py @@ -0,0 +1,88 @@ +import json +import httpx +import pytest +from httpx import MockTransport, Request, Response + +from openai import AsyncOpenAI +from openai._base_client import HTTPX_DEFAULT_TIMEOUT +from openai._streaming import StreamEvent, extract_text + + +def _sse(*events): + """ + Build a mock SSE stream from provided events and append [DONE] sentinel + so the client knows when to close the stream. + """ + body = b"".join([b"data: " + json.dumps(e).encode() + b"\n\n" for e in events]) + body += b"data: [DONE]\n\n" # IMPORTANT: closes the stream + return body + + +@pytest.mark.asyncio +async def test_responses_stream_unified_with_mock() -> None: + """ + Unified streaming should yield StreamEvent objects and concatenate text deltas. + MUST start with `response.created` to satisfy the streaming state machine. + """ + async def handler(request: Request) -> Response: # transport stub + data = _sse( + { + "type": "response.created", + "response": {"id": "rsp_test", "type": "response"}, + "model": "gpt-4o-mini", + }, + {"type": "response.output_text.delta", "delta": "Hello "}, + {"type": "response.output_text.delta", "delta": "World"}, + {"type": "response.completed"}, + ) + return Response(200, content=data, headers={"content-type": "text/event-stream"}) + + async with httpx.AsyncClient( + transport=MockTransport(handler), + timeout=HTTPX_DEFAULT_TIMEOUT, + base_url="https://api.openai.com", + ) as httpx_client: + client = AsyncOpenAI(http_client=httpx_client) + parts: list[str] = [] + async with client.responses.stream(model="gpt-4o-mini", input="hi", unified=True) as stream: + async for ev in stream: # type: StreamEvent + text = extract_text(ev) + if ev.type == "output_text.delta" and text: + parts.append(text) + + assert "".join(parts) == "Hello World" + + +@pytest.mark.asyncio +async def test_responses_stream_legacy_shape_with_mock() -> None: + """ + Legacy streaming should still yield raw events. Consume until completed to avoid pending tasks. + """ + async def handler(request: Request) -> Response: # transport stub + data = _sse( + { + "type": "response.created", + "response": {"id": "rsp_test2", "type": "response"}, + "model": "gpt-4o-mini", + }, + {"type": "response.output_text.delta", "delta": "X"}, + {"type": "response.completed"}, + ) + return Response(200, content=data, headers={"content-type": "text/event-stream"}) + + async with httpx.AsyncClient( + transport=MockTransport(handler), + timeout=HTTPX_DEFAULT_TIMEOUT, + base_url="https://api.openai.com", + ) as httpx_client: + client = AsyncOpenAI(http_client=httpx_client) + + got_delta = False + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for evt in stream: + if getattr(evt, "type", None) == "response.output_text.delta": + got_delta = True + if getattr(evt, "type", None) == "response.completed": + break # exit cleanly + + assert got_delta is True diff --git a/tests/test_streaming_with_local_sse.py b/tests/test_streaming_with_local_sse.py new file mode 100644 index 0000000000..4121b35b75 --- /dev/null +++ b/tests/test_streaming_with_local_sse.py @@ -0,0 +1,120 @@ +import asyncio +import json +import pytest +import httpx + +from starlette.applications import Starlette +from starlette.responses import StreamingResponse +from starlette.routing import Route + +from openai import AsyncOpenAI +from openai._base_client import HTTPX_DEFAULT_TIMEOUT + +def _sse_frame(obj: dict) -> bytes: + # One JSON object per SSE "data:" frame + return (f"data: " + json.dumps(obj, separators=(",", ":")) + "\n\n").encode() + +def _unified_sequence() -> list[bytes]: + # Full init (generic output + text channel) before deltas + return [ + _sse_frame({"type": "response.created", + "response": {"id": "rsp_local_1", "type": "response"}, + "model": "gpt-4o-mini"}), + _sse_frame({"type": "response.output.created", + "output_index": 0, + "id": "txt_1", + "output": {"index": 0, "id": "txt_1", "type": "output_text"}, + "output_type": "output_text"}), + _sse_frame({"type": "response.output_text.created", + "output_index": 0, "index": 0, "id": "txt_1"}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "Hello ", "delta": "Hello "}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "World", "delta": "World"}), + _sse_frame({"type": "response.completed", "id": "rsp_local_1", "status": "ok"}), + b"data: [DONE]\n\n", + ] + +def _legacy_sequence() -> list[bytes]: + return [ + _sse_frame({"type": "response.created", + "response": {"id": "rsp_local_2", "type": "response"}, + "model": "gpt-4o-mini"}), + _sse_frame({"type": "response.output.created", + "output_index": 0, + "id": "txt_2", + "output": {"index": 0, "id": "txt_2", "type": "output_text"}, + "output_type": "output_text"}), + _sse_frame({"type": "response.output_text.created", + "output_index": 0, "index": 0, "id": "txt_2"}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "X", "delta": "X"}), + _sse_frame({"type": "response.completed", "id": "rsp_local_2", "status": "ok"}), + b"data: [DONE]\n\n", + ] + +def _make_app(frames: list[bytes]) -> Starlette: + async def responses_endpoint(request): + async def gen(): + for chunk in frames: + await asyncio.sleep(0) # tiny yield to simulate I/O + yield chunk + return StreamingResponse(gen(), media_type="text/event-stream") + # IMPORTANT: the SDK will call base_url + "/responses" + return Starlette(routes=[Route("/v1/responses", responses_endpoint, methods=["POST"])]) + +def _make_openai_client(app: Starlette) -> AsyncOpenAI: + # httpx>=0.24 has public ASGITransport; older versions keep it under _transports + try: + from httpx import ASGITransport + except Exception: + from httpx._transports.asgi import ASGITransport # type: ignore + + transport = ASGITransport(app=app) + http_client = httpx.AsyncClient( + transport=transport, + base_url="http://testserver/v1", # so the POST goes to /v1/responses + timeout=HTTPX_DEFAULT_TIMEOUT, + ) + return AsyncOpenAI(api_key="test-key", http_client=http_client, base_url="http://testserver/v1") + +@pytest.mark.asyncio +async def test_unified_stream_via_local_sse() -> None: + app = _make_app(_unified_sequence()) + client = _make_openai_client(app) + assert client._client._base_url.host == "testserver" # sanity + + parts: list[str] = [] + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for ev in stream: + # Only aggregate on *delta* events; don't call extract_text on others + name = getattr(ev, "event", getattr(ev, "type", None)) + if name in ("response.output_text.delta", "output_text.delta"): + # prefer value; fallback to delta if needed + val = getattr(ev, "value", None) + if val is None and hasattr(ev, "delta"): + val = ev.delta + if val: + parts.append(val) + if name in ("response.completed", "completed"): + break + + await client.close() + assert "".join(parts) == "Hello World" + +@pytest.mark.asyncio +async def test_legacy_stream_via_local_sse() -> None: + app = _make_app(_legacy_sequence()) + client = _make_openai_client(app) + + got_delta = False + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for ev in stream: + name = getattr(ev, "event", getattr(ev, "type", None)) + if name in ("response.output_text.delta", "output_text.delta"): + got_delta = True + if name in ("response.completed", "completed"): + break + + await client.close() + assert got_delta is True diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py new file mode 100644 index 0000000000..533bea5f6f --- /dev/null +++ b/tests/test_timeouts.py @@ -0,0 +1,54 @@ +import pytest +import httpx +import asyncio + +@pytest.mark.anyio +async def test_asyncclient_raises_read_timeout_immediately(): + async def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ReadTimeout("forced timeout", request=request) + + transport = httpx.MockTransport(handler) + client = httpx.AsyncClient( + transport=transport, + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05), + ) + with pytest.raises(httpx.ReadTimeout): + await client.get("https://example.test/anything") + await client.aclose() + + +@pytest.mark.anyio +async def test_asyncclient_times_out_on_slow_body_via_tcp_server(): + async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + # lê até o fim dos headers HTTP + await reader.readuntil(b"\r\n\r\n") + except Exception: + writer.close() + return + # envia apenas os headers, sem corpo ainda + writer.write(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n") + await writer.drain() + # atrasa o corpo além do read-timeout do cliente + await asyncio.sleep(0.2) + writer.write(b"ok") + try: + await writer.drain() + finally: + writer.close() + + server = await asyncio.start_server(handle, host="127.0.0.1", port=0) + try: + port = server.sockets[0].getsockname()[1] + url = f"http://127.0.0.1:{port}/" + + client = httpx.AsyncClient( + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05) + ) + with pytest.raises(httpx.ReadTimeout): + # ao tentar ler o corpo, deve estourar read-timeout + await client.get(url) + await client.aclose() + finally: + server.close() + await server.wait_closed() diff --git a/tests/test_timeouts_client.py b/tests/test_timeouts_client.py new file mode 100644 index 0000000000..7e0dd86410 --- /dev/null +++ b/tests/test_timeouts_client.py @@ -0,0 +1,27 @@ +import pytest +import httpx +from openai import AsyncOpenAI + +@pytest.mark.anyio +async def test_openai_client_bubbles_readtimeout_from_transport(): + async def handler(request: httpx.Request) -> httpx.Response: + # Força um ReadTimeout vindo do transporte + raise httpx.ReadTimeout("forced timeout", request=request) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient( + transport=transport, + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05), + base_url="https://api.openai.test", + ) + client = AsyncOpenAI( + api_key="dummy", + http_client=http_client, + base_url="https://api.openai.test" + ) + + with pytest.raises(httpx.ReadTimeout): + # Qualquer chamada simples; o MockTransport intercepta + await client.models.list() + + await http_client.aclose() diff --git a/tests/tests/test_timeouts_client.py b/tests/tests/test_timeouts_client.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/timeouts/__init__.py b/tests/timeouts/__init__.py index dec9aed6b3..480dc84f40 100644 --- a/tests/timeouts/__init__.py +++ b/tests/timeouts/__init__.py @@ -1,2 +1 @@ """Tests related to timeout behavior.""" - diff --git a/tests/timeouts/_util.py b/tests/timeouts/_util.py index f37fc027cb..d9cf6b82f2 100644 --- a/tests/timeouts/_util.py +++ b/tests/timeouts/_util.py @@ -1,5 +1,6 @@ from __future__ import annotations + def assert_timeout_eq(value, expected: float) -> None: """Assert that a timeout-like value equals the expected seconds. @@ -10,10 +11,8 @@ def assert_timeout_eq(value, expected: float) -> None: if isinstance(value, (int, float)): assert float(value) == expected elif isinstance(value, Timeout): - assert any( - getattr(value, f, None) in (None, expected) - for f in ("read", "connect", "write") - ), f"Timeout fields do not match {expected}: {value!r}" + assert any(getattr(value, f, None) in (None, expected) for f in ("read", "connect", "write")), ( + f"Timeout fields do not match {expected}: {value!r}" + ) else: raise AssertionError(f"Unexpected timeout type: {type(value)}") - diff --git a/tests/timeouts/test_overrides.py b/tests/timeouts/test_overrides.py index 649a2df36f..e8440135a0 100644 --- a/tests/timeouts/test_overrides.py +++ b/tests/timeouts/test_overrides.py @@ -3,13 +3,11 @@ import os import httpx -import pytest from openai import OpenAI from openai._models import FinalRequestOptions from openai._base_client import DEFAULT_TIMEOUT - base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") @@ -20,9 +18,6 @@ def test_per_request_timeout_overrides_default(client: OpenAI) -> None: assert timeout == DEFAULT_TIMEOUT # per-request timeout overrides the default - request = client._build_request( - FinalRequestOptions(method="get", url="/foo", timeout=httpx.Timeout(100.0)) - ) + request = client._build_request(FinalRequestOptions(method="get", url="/foo", timeout=httpx.Timeout(100.0))) timeout = httpx.Timeout(**request.extensions["timeout"]) # type: ignore[arg-type] assert timeout == httpx.Timeout(100.0) -