diff --git a/.python-version b/.python-version index 43077b2460..f3fe474aee 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.9.18 +3.12.9 diff --git a/README.md b/README.md index d4b8d8d170..ba603a716e 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ from openai import OpenAI client = OpenAI() -response = client.chat.responses.create( +response = client.responses.create( input=[ { "role": "user", @@ -622,6 +622,35 @@ client.with_options(timeout=5.0).chat.completions.create( model="gpt-4o", ) ``` +- from openai import OpenAI ++ import httpx ++ from openai import OpenAI + +import httpx +from openai import OpenAI + +# Configure the default for all requests: +client = OpenAI( + # 20 seconds (default is 10 minutes) + timeout=20.0, +) + +# More granular control: +client = OpenAI( + timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0), +) + +# Override per-request: +client.with_options(timeout=5.0).chat.completions.create( + messages=[ + { + "role": "user", + "content": "How can I list all files in a directory using Python?", + } + ], + model="gpt-4o", +) + On timeout, an `APITimeoutError` is thrown. diff --git a/examples/async_chat_stream.py b/examples/async_chat_stream.py new file mode 100644 index 0000000000..22a0c748d4 --- /dev/null +++ b/examples/async_chat_stream.py @@ -0,0 +1,22 @@ + +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI + +client = AsyncOpenAI() + +async def main() -> None: + # Chat Completions is still supported indefinitely, but it's no longer the primary API. + # This example remains useful for users who still rely on chat.completions. + stream = await client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": "Say this is a test (streaming)."}], + stream=True, + ) + async for chunk in stream: + # Some users prefer accessing delta objects; for demo purposes, printing the chunk is enough. + print(chunk) + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/examples/async_responses_stream.py b/examples/async_responses_stream.py new file mode 100644 index 0000000000..d2a3ffa9e4 --- /dev/null +++ b/examples/async_responses_stream.py @@ -0,0 +1,19 @@ +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI + +client = AsyncOpenAI() + +async def main() -> None: + # Async streaming with the Responses API (the recommended primary API). + stream = await client.responses.create( + model="gpt-4o-mini", + input="Write a one-sentence bedtime story about a unicorn.", + stream=True, + ) + async for event in stream: + # Each event may contain deltas and final results; printing directly is sufficient for demo purposes. + print(event) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/async_stream_unified.py b/examples/async_stream_unified.py new file mode 100644 index 0000000000..5b8ee2ccc9 --- /dev/null +++ b/examples/async_stream_unified.py @@ -0,0 +1,25 @@ +#!/usr/bin/env -S rye run python +import asyncio +from openai import AsyncOpenAI +from openai._streaming.unified import extract_text, StreamEvent +from openai._streaming.adapters import ResponsesEventAdapter # or ChatCompletionsEventAdapter + + +async def main(): + client = AsyncOpenAI() + # Example with Responses stream (manual adapter for now) + async with client.responses.stream( + model="gpt-4.1-mini", + input="Write a single haiku about async streams.", + ) as stream: + async for raw_evt in stream: + # Convert raw event into a unified StreamEvent + ev: StreamEvent = ResponsesEventAdapter.adapt(raw_evt) + txt = extract_text(ev) + if txt: + print(txt, end="") + print() # newline at the end + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 6736c1ad9e..1be0e62865 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "sniffio", "tqdm > 4", "jiter>=0.4.0, <1", + "starlette>=0.48.0", ] requires-python = ">= 3.8" classifiers = [ @@ -56,7 +57,7 @@ dev-dependencies = [ "pyright==1.1.399", "mypy", "respx", - "pytest", + "pytest>=8.4.2", "pytest-asyncio", "ruff", "time-machine", diff --git a/requirements-dev.lock b/requirements-dev.lock index e8bea53014..8eb0deab87 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -22,12 +22,11 @@ annotated-types==0.6.0 anyio==4.1.0 # via httpx # via openai + # via starlette argcomplete==3.1.2 # via nox asttokens==2.4.1 # via inline-snapshot -async-timeout==5.0.1 - # via aiohttp attrs==24.2.0 # via aiohttp # via outcome @@ -57,10 +56,6 @@ distlib==0.3.7 # via virtualenv distro==1.8.0 # via openai -exceptiongroup==1.2.2 - # via anyio - # via pytest - # via trio execnet==2.1.1 # via pytest-xdist executing==2.2.0 @@ -146,7 +141,7 @@ pygments==2.18.0 pyjwt==2.8.0 # via msal pyright==1.1.399 -pytest==8.4.1 +pytest==8.4.2 # via inline-snapshot # via pytest-asyncio # via pytest-xdist @@ -179,11 +174,9 @@ sortedcontainers==2.4.0 # via trio sounddevice==0.5.1 # via openai +starlette==0.48.0 + # via openai time-machine==2.9.0 -tomli==2.0.2 - # via inline-snapshot - # via mypy - # via pytest tqdm==4.66.5 # via openai trio==0.27.0 @@ -194,12 +187,12 @@ types-tqdm==4.66.0.20240417 typing-extensions==4.12.2 # via azure-core # via azure-identity - # via multidict # via mypy # via openai # via pydantic # via pydantic-core # via pyright + # via starlette tzdata==2024.1 # via pandas urllib3==2.2.1 diff --git a/requirements.lock b/requirements.lock index 3b6ece87e2..7d1b27c358 100644 --- a/requirements.lock +++ b/requirements.lock @@ -22,8 +22,7 @@ annotated-types==0.6.0 anyio==4.1.0 # via httpx # via openai -async-timeout==5.0.1 - # via aiohttp + # via starlette attrs==25.3.0 # via aiohttp certifi==2023.7.22 @@ -33,8 +32,6 @@ cffi==1.17.1 # via sounddevice distro==1.8.0 # via openai -exceptiongroup==1.2.2 - # via anyio frozenlist==1.7.0 # via aiohttp # via aiosignal @@ -84,15 +81,17 @@ sniffio==1.3.0 # via openai sounddevice==0.5.1 # via openai +starlette==0.48.0 + # via openai tqdm==4.66.5 # via openai types-pytz==2024.2.0.20241003 # via pandas-stubs typing-extensions==4.12.2 - # via multidict # via openai # via pydantic # via pydantic-core + # via starlette tzdata==2024.1 # via pandas websockets==15.0.1 diff --git a/src/openai/_streaming.py b/src/openai/_streaming/__init__.py similarity index 96% rename from src/openai/_streaming.py rename to src/openai/_streaming/__init__.py index f586de74ff..40d21b8976 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming/__init__.py @@ -9,11 +9,11 @@ import httpx -from ._utils import is_mapping, extract_type_var_from_base -from ._exceptions import APIError +from .._utils import is_mapping, extract_type_var_from_base +from .._exceptions import APIError if TYPE_CHECKING: - from ._client import OpenAI, AsyncOpenAI + from .._client import OpenAI, AsyncOpenAI _T = TypeVar("_T") @@ -400,7 +400,7 @@ class MyStream(Stream[bytes]): extract_stream_chunk_type(MyStream) -> bytes ``` """ - from ._base_client import Stream, AsyncStream + from .._base_client import Stream, AsyncStream return extract_type_var_from_base( stream_cls, @@ -408,3 +408,19 @@ class MyStream(Stream[bytes]): generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)), failure_message=failure_message, ) + + +from .unified import StreamEvent, extract_text +from .adapters import ResponsesEventAdapter, ChatCompletionsEventAdapter +from .wrap import _wrap_unified + + +__all__ = [ + "Stream", + "AsyncStream", + "StreamEvent", + "extract_text", + "ResponsesEventAdapter", + "ChatCompletionsEventAdapter", + "_wrap_unified", +] diff --git a/src/openai/_streaming/adapters.py b/src/openai/_streaming/adapters.py new file mode 100644 index 0000000000..ec68490e75 --- /dev/null +++ b/src/openai/_streaming/adapters.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from typing import Any + +from .unified import StreamEvent + + +class ResponsesEventAdapter: + """Map Responses API streaming events into the unified stream shape.""" + + @staticmethod + def adapt(event: Any) -> StreamEvent: + event_type = getattr(event, "type", None) + + if event_type == "response.output_text.delta": + return StreamEvent( + type="output_text.delta", + delta=getattr(event, "delta", None), + raw=event, + ) + + if event_type == "response.completed": + return StreamEvent(type="response.completed", raw=event) + + return StreamEvent(type="response.error", raw=event) + + +class ChatCompletionsEventAdapter: + """Map Chat Completions streaming chunks into the unified stream shape.""" + + @staticmethod + def adapt(chunk: Any) -> StreamEvent: + try: + choice = chunk.choices[0] + delta = getattr(getattr(choice, "delta", None), "content", None) + if delta: + return StreamEvent(type="output_text.delta", delta=delta, raw=chunk) + except Exception: # pragma: no cover - defensive adapter guard + pass + + return StreamEvent(type="response.completed", raw=chunk) + + +__all__ = [ + "ChatCompletionsEventAdapter", + "ResponsesEventAdapter", +] diff --git a/src/openai/_streaming/unified.py b/src/openai/_streaming/unified.py new file mode 100644 index 0000000000..4c98ed853e --- /dev/null +++ b/src/openai/_streaming/unified.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Literal, Optional + +EventType = Literal[ + "message_start", + "output_text.delta", + "output_text.done", + "tool_call.delta", + "tool_call.done", + "response.completed", + "response.error", +] + + +@dataclass +class StreamEvent: + type: EventType + model: Optional[str] = None + delta: Optional[str] = None + full_text: Optional[str] = None + tool_call_id: Optional[str] = None + raw: Optional[Any] = None + + +def extract_text(event: StreamEvent) -> str: + """Return the text delta from a stream event.""" + + return event.delta or "" + + +__all__ = [ + "EventType", + "StreamEvent", + "extract_text", +] diff --git a/src/openai/_streaming/wrap.py b/src/openai/_streaming/wrap.py new file mode 100644 index 0000000000..8b88aae74b --- /dev/null +++ b/src/openai/_streaming/wrap.py @@ -0,0 +1,25 @@ +from contextlib import asynccontextmanager +from typing import Any, AsyncIterator, Callable +from .unified import StreamEvent + +async def _map_async_iter( + source: AsyncIterator[Any], + fn: Callable[[Any], StreamEvent], +) -> AsyncIterator[StreamEvent]: + async for item in source: + yield fn(item) + +@asynccontextmanager +async def _wrap_unified(cm, adapter_fn: Callable[[Any], StreamEvent]): + """ + Wrap an existing async context manager (cm) that yields an async iterator of raw events, + and expose a context manager that yields an async iterator of adapted StreamEvent. + """ + async with cm as underlying: + async def _mapped(): + async for raw in underlying: + yield adapter_fn(raw) + yield _mapped() + +# Optional alias if something imported the non-underscored name before +map_async_iter = _map_async_iter diff --git a/src/openai/resources/chat/completions/completions.py b/src/openai/resources/chat/completions/completions.py index 7e209ff0ee..38ec3a0a2a 100644 --- a/src/openai/resources/chat/completions/completions.py +++ b/src/openai/resources/chat/completions/completions.py @@ -3,8 +3,9 @@ from __future__ import annotations import inspect -from typing import Dict, List, Type, Union, Iterable, Optional, cast from functools import partial +from typing import Any, Dict, List, Type, Union, Iterable, Optional, AsyncIterator, AsyncContextManager, cast + from typing_extensions import Literal, overload import httpx @@ -2847,13 +2848,14 @@ def stream( user: str | NotGiven = NOT_GIVEN, verbosity: Optional[Literal["low", "medium", "high"]] | NotGiven = NOT_GIVEN, web_search_options: completion_create_params.WebSearchOptions | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncChatCompletionStreamManager[ResponseFormatT]: + ) -> AsyncChatCompletionStreamManager[ResponseFormatT] | AsyncContextManager[AsyncIterator[Any]]: """Wrapper over the `client.chat.completions.create(stream=True)` method that provides a more granular event API and automatic accumulation of each delta. @@ -2923,12 +2925,19 @@ def stream( extra_body=extra_body, timeout=timeout, ) - return AsyncChatCompletionStreamManager( + manager: AsyncChatCompletionStreamManager[ResponseFormatT] = AsyncChatCompletionStreamManager( api_request, response_format=response_format, input_tools=tools, ) + if not unified: + return manager + + from openai._streaming import _wrap_unified, ChatCompletionsEventAdapter + + return _wrap_unified(manager, ChatCompletionsEventAdapter.adapt) + class CompletionsWithRawResponse: def __init__(self, completions: Completions) -> None: @@ -3040,8 +3049,6 @@ def __init__(self, completions: AsyncCompletions) -> None: @cached_property def messages(self) -> AsyncMessagesWithStreamingResponse: return AsyncMessagesWithStreamingResponse(self._completions.messages) - - def validate_response_format(response_format: object) -> None: if inspect.isclass(response_format) and issubclass(response_format, pydantic.BaseModel): raise TypeError( diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index 062fd491f2..fd7ecb4fa0 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any, List, Type, Union, Iterable, Optional, cast +from typing import Any, List, Type, Union, Iterable, Optional, AsyncIterator, AsyncContextManager, cast from functools import partial from typing_extensions import Literal, overload @@ -895,7 +895,7 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, @@ -1057,7 +1057,7 @@ def parse( stream: Optional[Literal[False]] | Literal[True] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, top_logprobs: Optional[int] | NotGiven = NOT_GIVEN, @@ -2263,12 +2263,13 @@ def stream( text_format: type[TextFormatT] | NotGiven = NOT_GIVEN, starting_after: int | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, + unified: bool = False, # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: ... + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: ... @overload def stream( @@ -2289,18 +2290,19 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, user: str | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: ... + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: ... def stream( self, @@ -2321,19 +2323,20 @@ def stream( store: Optional[bool] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, top_p: Optional[float] | NotGiven = NOT_GIVEN, truncation: Optional[Literal["auto", "disabled"]] | NotGiven = NOT_GIVEN, user: str | NotGiven = NOT_GIVEN, starting_after: int | NotGiven = NOT_GIVEN, + unified: bool = False, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, - ) -> AsyncResponseStreamManager[TextFormatT]: + ) -> AsyncResponseStreamManager[TextFormatT] | AsyncContextManager[AsyncIterator[Any]]: new_response_args = { "input": input, "model": model, @@ -2405,7 +2408,7 @@ def stream( timeout=timeout, ) - return AsyncResponseStreamManager( + manager: AsyncResponseStreamManager[TextFormatT] = AsyncResponseStreamManager( api_request, text_format=text_format, input_tools=tools, @@ -2424,13 +2427,20 @@ def stream( extra_body=extra_body, timeout=timeout, ) - return AsyncResponseStreamManager( + manager = AsyncResponseStreamManager( api_request, text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, ) + if not unified: + return manager + + from openai._streaming import _wrap_unified, ResponsesEventAdapter + + return _wrap_unified(manager, ResponsesEventAdapter.adapt) + async def parse( self, *, @@ -2455,7 +2465,7 @@ async def parse( stream: Optional[Literal[False]] | Literal[True] | NotGiven = NOT_GIVEN, stream_options: Optional[response_create_params.StreamOptions] | NotGiven = NOT_GIVEN, temperature: Optional[float] | NotGiven = NOT_GIVEN, - text: ResponseTextConfigParam| NotGiven = NOT_GIVEN, + text: ResponseTextConfigParam | NotGiven = NOT_GIVEN, tool_choice: response_create_params.ToolChoice | NotGiven = NOT_GIVEN, tools: Iterable[ParseableToolParam] | NotGiven = NOT_GIVEN, top_logprobs: Optional[int] | NotGiven = NOT_GIVEN, diff --git a/src/openai/streaming/adapters.py b/src/openai/streaming/adapters.py new file mode 100644 index 0000000000..e21af62d4d --- /dev/null +++ b/src/openai/streaming/adapters.py @@ -0,0 +1,37 @@ +from typing import Any +from .unified import StreamEvent + + +class ResponsesEventAdapter: + @staticmethod + def adapt(evt: Any) -> StreamEvent: + t = getattr(evt, "type", None) + if t == "response.output_text.delta": + return StreamEvent( + type="output_text.delta", + delta=getattr(evt, "delta", None), + raw=evt, + ) + if t == "response.completed": + return StreamEvent(type="response.completed", raw=evt) + # TODO: map additional event types (tool_call.*, errors, etc.) + return StreamEvent(type="response.error", raw=evt) + + +class ChatCompletionsEventAdapter: + @staticmethod + def adapt(chunk: Any) -> StreamEvent: + # Try to extract delta.content from the first choice + try: + choice0 = chunk.choices[0] + delta = getattr(getattr(choice0, "delta", None), "content", None) + if delta: + return StreamEvent( + type="output_text.delta", + delta=delta, + raw=chunk, + ) + except Exception: + pass + # TODO: add heuristics for completed and error events + return StreamEvent(type="response.completed", raw=chunk) diff --git a/src/openai/streaming/unified.py b/src/openai/streaming/unified.py new file mode 100644 index 0000000000..a93dbc1ea5 --- /dev/null +++ b/src/openai/streaming/unified.py @@ -0,0 +1,24 @@ +from dataclasses import dataclass +from typing import Optional, Literal, Any + +EventType = Literal[ + "message_start", + "output_text.delta", + "output_text.done", + "tool_call.delta", + "tool_call.done", + "response.completed", + "response.error", +] + +@dataclass +class StreamEvent: + type: EventType + model: Optional[str] = None + delta: Optional[str] = None + full_text: Optional[str] = None + tool_call_id: Optional[str] = None + raw: Optional[Any] = None + +def extract_text(ev: "StreamEvent") -> str: + return ev.delta or "" diff --git a/src/openai/types/image.py b/src/openai/types/image.py index ecaef3fd58..d024bf8f3f 100644 --- a/src/openai/types/image.py +++ b/src/openai/types/image.py @@ -1,6 +1,6 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. -from typing import Optional +from typing import Any, Dict, Optional from .._models import BaseModel @@ -24,3 +24,10 @@ class Image(BaseModel): `response_format` is set to `url` (default value). Unsupported for `gpt-image-1`. """ + + content_filter_results: Optional[Dict[str, Any]] = None + """Optional content filter metadata returned by the API. + + Includes safety-related categories (e.g. sexual_minors, violence, etc.) + indicating whether the image was flagged or filtered. + """ diff --git a/tests/conftest.py b/tests/conftest.py index 408bcf76c0..4cd6109426 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,14 @@ logging.getLogger("openai").setLevel(logging.DEBUG) +# Autouse fixture to ensure an API key is always set for tests +@pytest.fixture(autouse=True) +def _fake_openai_key(monkeypatch: pytest.MonkeyPatch) -> None: + # evita dependência real de credencial + monkeypatch.setenv("OPENAI_API_KEY", "test") + yield + + # automatically add `pytest.mark.asyncio()` to all of our async tests # so we don't have to add that boilerplate everywhere def pytest_collection_modifyitems(items: list[pytest.Function]) -> None: diff --git a/tests/retries/__init__.py b/tests/retries/__init__.py new file mode 100644 index 0000000000..e7938fcabb --- /dev/null +++ b/tests/retries/__init__.py @@ -0,0 +1 @@ +"""Tests related to retry behavior.""" diff --git a/tests/retries/test_retry_after.py b/tests/retries/test_retry_after.py new file mode 100644 index 0000000000..d394e4cb0c --- /dev/null +++ b/tests/retries/test_retry_after.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import os +from unittest import mock + +import httpx +import pytest +from respx import MockRouter + +from openai import OpenAI + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") + + +def _low_retry_timeout(*_args, **_kwargs) -> float: + return 0.01 + + +@mock.patch("openai._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) +@pytest.mark.respx(base_url=base_url) +def test_retry_after_header_is_respected(respx_mock: MockRouter, client: OpenAI) -> None: + attempts = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["n"] += 1 + if attempts["n"] == 1: + return httpx.Response(429, headers={"Retry-After": "2"}, json={"err": "rate"}) + return httpx.Response(200, json={"ok": True}) + + respx_mock.post("/chat/completions").mock(side_effect=handler) + + client = client.with_options(max_retries=3) + + response = client.chat.completions.with_raw_response.create( + messages=[{"content": "hi", "role": "user"}], + model="gpt-4o", + ) + + assert response.retries_taken == 1 + assert int(response.http_request.headers.get("x-stainless-retry-count")) == 1 diff --git a/tests/test_images_missing_fields.py b/tests/test_images_missing_fields.py new file mode 100644 index 0000000000..c054c842f4 --- /dev/null +++ b/tests/test_images_missing_fields.py @@ -0,0 +1,52 @@ +import httpx +import pytest + +from openai import AsyncOpenAI, DefaultAsyncHttpxClient + + +@pytest.mark.anyio +async def test_images_generate_includes_content_filter_results_async(): + """ + Ensure the Image model exposes optional fields returned by the API, + specifically `content_filter_results` (keeping `revised_prompt` coverage). + """ + mock_json = { + "created": 1711111111, + "data": [ + { + "url": "https://example.test/cat.png", + "revised_prompt": "a cute cat wearing sunglasses", + "content_filter_results": { + "sexual_minors": {"filtered": False}, + "violence": {"filtered": False}, + }, + } + ], + } + + # Async handler because we'll use AsyncOpenAI (httpx.AsyncClient under the hood) + async def ahandler(request: httpx.Request) -> httpx.Response: + assert "images" in str(request.url).lower() + return httpx.Response(200, json=mock_json) + + atransport = httpx.MockTransport(ahandler) + + client = AsyncOpenAI( + api_key="test", + http_client=DefaultAsyncHttpxClient(transport=atransport), + timeout=10.0, + ) + + resp = await client.images.generate(model="gpt-image-1", prompt="cat with glasses") # type: ignore + + assert hasattr(resp, "data") and isinstance(resp.data, list) and resp.data + item = resp.data[0] + + # existing field + assert item.revised_prompt == "a cute cat wearing sunglasses" + + # new optional field + cfr = item.content_filter_results + assert isinstance(cfr, dict), f"content_filter_results should be dict, got {type(cfr)}" + assert cfr.get("violence", {}).get("filtered") is False + assert cfr.get("sexual_minors", {}).get("filtered") is False diff --git a/tests/test_streaming_state_harness.py b/tests/test_streaming_state_harness.py new file mode 100644 index 0000000000..8c68108a02 --- /dev/null +++ b/tests/test_streaming_state_harness.py @@ -0,0 +1,70 @@ +import asyncio +import pytest +from dataclasses import dataclass + +pytestmark = pytest.mark.asyncio + +def test_pytest_collection_sanity(): + assert True + +@dataclass +class _SSE: + event: str + data: dict + +class _FakeStream: + def __init__(self, events): + self._events = events + + def __aiter__(self): + self._it = iter(self._events) + return self + + async def __anext__(self): + try: + await asyncio.sleep(0) + return next(self._it) + except StopIteration: + raise StopAsyncIteration + +def _mk_events_ok(): + return [ + _SSE("response.created", {"id": "resp_1", "type": "response", "model": "gpt-4o-mini"}), + _SSE("response.output_text.created", {"index": 0, "id": "txt_1"}), + _SSE("response.output_text.delta", {"index": 0, "value": "Hello, "}), + _SSE("response.output_text.delta", {"index": 0, "value": "world!"}), + _SSE("response.completed", {"id": "resp_1", "status": "ok"}), + ] + +async def _consume(stream): + seen = [] + buffers = {} + async for ev in stream: + seen.append(ev.event) + if ev.event == "response.output_text.delta": + idx = ev.data.get("index", 0) + buffers[idx] = buffers.get(idx, "") + ev.data.get("value", "") + return seen, buffers.get(0, "") + +async def test_unified_happy_path_harness(): + seen, text = await _consume(_FakeStream(_mk_events_ok())) + assert seen == [ + "response.created", + "response.output_text.created", + "response.output_text.delta", + "response.output_text.delta", + "response.completed", + ] + assert text == "Hello, world!" + +@pytest.mark.parametrize("missing", [ + "response.created", + "response.output_text.created", + "response.completed", +]) +async def test_unified_resilience_harness(missing): + base = _mk_events_ok() + filtered = [e for e in base if e.event != missing] + seen, text = await _consume(_FakeStream(filtered)) + assert missing not in seen + assert isinstance(text, str) diff --git a/tests/test_streaming_unified_basic.py b/tests/test_streaming_unified_basic.py new file mode 100644 index 0000000000..f98f1fa336 --- /dev/null +++ b/tests/test_streaming_unified_basic.py @@ -0,0 +1,38 @@ +import pytest +from openai._streaming import ( + StreamEvent, + extract_text, + ResponsesEventAdapter, + ChatCompletionsEventAdapter, +) + +def test_responses_delta_mapping(): + class FakeEvt: + type = "response.output_text.delta" + delta = "foo" + ev: StreamEvent = ResponsesEventAdapter.adapt(FakeEvt()) + assert ev.type == "output_text.delta" + assert ev.delta == "foo" + +def test_responses_completed_mapping(): + class FakeEvt: + type = "response.completed" + ev = ResponsesEventAdapter.adapt(FakeEvt()) + assert ev.type == "response.completed" + +def test_chat_delta_mapping(): + class FakeDelta: content = "bar" + class FakeChoice: delta = FakeDelta() + class FakeChunk: choices = [FakeChoice()] + ev: StreamEvent = ChatCompletionsEventAdapter.adapt(FakeChunk()) + assert ev.type == "output_text.delta" + assert ev.delta == "bar" + +def test_chat_completed_when_no_delta(): + class FakeChunk: choices = [] # no delta available + ev = ChatCompletionsEventAdapter.adapt(FakeChunk()) + assert ev.type == "response.completed" + +def test_extract_text_returns_delta_or_empty(): + assert extract_text(StreamEvent(type="output_text.delta", delta="X")) == "X" + assert extract_text(StreamEvent(type="response.completed")) == "" diff --git a/tests/test_streaming_unified_integration.py b/tests/test_streaming_unified_integration.py new file mode 100644 index 0000000000..5b8a3bdc78 --- /dev/null +++ b/tests/test_streaming_unified_integration.py @@ -0,0 +1,88 @@ +import json +import httpx +import pytest +from httpx import MockTransport, Request, Response + +from openai import AsyncOpenAI +from openai._base_client import HTTPX_DEFAULT_TIMEOUT +from openai._streaming import StreamEvent, extract_text + + +def _sse(*events): + """ + Build a mock SSE stream from provided events and append [DONE] sentinel + so the client knows when to close the stream. + """ + body = b"".join([b"data: " + json.dumps(e).encode() + b"\n\n" for e in events]) + body += b"data: [DONE]\n\n" # IMPORTANT: closes the stream + return body + + +@pytest.mark.asyncio +async def test_responses_stream_unified_with_mock() -> None: + """ + Unified streaming should yield StreamEvent objects and concatenate text deltas. + MUST start with `response.created` to satisfy the streaming state machine. + """ + async def handler(request: Request) -> Response: # transport stub + data = _sse( + { + "type": "response.created", + "response": {"id": "rsp_test", "type": "response"}, + "model": "gpt-4o-mini", + }, + {"type": "response.output_text.delta", "delta": "Hello "}, + {"type": "response.output_text.delta", "delta": "World"}, + {"type": "response.completed"}, + ) + return Response(200, content=data, headers={"content-type": "text/event-stream"}) + + async with httpx.AsyncClient( + transport=MockTransport(handler), + timeout=HTTPX_DEFAULT_TIMEOUT, + base_url="https://api.openai.com", + ) as httpx_client: + client = AsyncOpenAI(http_client=httpx_client) + parts: list[str] = [] + async with client.responses.stream(model="gpt-4o-mini", input="hi", unified=True) as stream: + async for ev in stream: # type: StreamEvent + text = extract_text(ev) + if ev.type == "output_text.delta" and text: + parts.append(text) + + assert "".join(parts) == "Hello World" + + +@pytest.mark.asyncio +async def test_responses_stream_legacy_shape_with_mock() -> None: + """ + Legacy streaming should still yield raw events. Consume until completed to avoid pending tasks. + """ + async def handler(request: Request) -> Response: # transport stub + data = _sse( + { + "type": "response.created", + "response": {"id": "rsp_test2", "type": "response"}, + "model": "gpt-4o-mini", + }, + {"type": "response.output_text.delta", "delta": "X"}, + {"type": "response.completed"}, + ) + return Response(200, content=data, headers={"content-type": "text/event-stream"}) + + async with httpx.AsyncClient( + transport=MockTransport(handler), + timeout=HTTPX_DEFAULT_TIMEOUT, + base_url="https://api.openai.com", + ) as httpx_client: + client = AsyncOpenAI(http_client=httpx_client) + + got_delta = False + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for evt in stream: + if getattr(evt, "type", None) == "response.output_text.delta": + got_delta = True + if getattr(evt, "type", None) == "response.completed": + break # exit cleanly + + assert got_delta is True diff --git a/tests/test_streaming_with_local_sse.py b/tests/test_streaming_with_local_sse.py new file mode 100644 index 0000000000..4121b35b75 --- /dev/null +++ b/tests/test_streaming_with_local_sse.py @@ -0,0 +1,120 @@ +import asyncio +import json +import pytest +import httpx + +from starlette.applications import Starlette +from starlette.responses import StreamingResponse +from starlette.routing import Route + +from openai import AsyncOpenAI +from openai._base_client import HTTPX_DEFAULT_TIMEOUT + +def _sse_frame(obj: dict) -> bytes: + # One JSON object per SSE "data:" frame + return (f"data: " + json.dumps(obj, separators=(",", ":")) + "\n\n").encode() + +def _unified_sequence() -> list[bytes]: + # Full init (generic output + text channel) before deltas + return [ + _sse_frame({"type": "response.created", + "response": {"id": "rsp_local_1", "type": "response"}, + "model": "gpt-4o-mini"}), + _sse_frame({"type": "response.output.created", + "output_index": 0, + "id": "txt_1", + "output": {"index": 0, "id": "txt_1", "type": "output_text"}, + "output_type": "output_text"}), + _sse_frame({"type": "response.output_text.created", + "output_index": 0, "index": 0, "id": "txt_1"}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "Hello ", "delta": "Hello "}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "World", "delta": "World"}), + _sse_frame({"type": "response.completed", "id": "rsp_local_1", "status": "ok"}), + b"data: [DONE]\n\n", + ] + +def _legacy_sequence() -> list[bytes]: + return [ + _sse_frame({"type": "response.created", + "response": {"id": "rsp_local_2", "type": "response"}, + "model": "gpt-4o-mini"}), + _sse_frame({"type": "response.output.created", + "output_index": 0, + "id": "txt_2", + "output": {"index": 0, "id": "txt_2", "type": "output_text"}, + "output_type": "output_text"}), + _sse_frame({"type": "response.output_text.created", + "output_index": 0, "index": 0, "id": "txt_2"}), + _sse_frame({"type": "response.output_text.delta", + "output_index": 0, "index": 0, "value": "X", "delta": "X"}), + _sse_frame({"type": "response.completed", "id": "rsp_local_2", "status": "ok"}), + b"data: [DONE]\n\n", + ] + +def _make_app(frames: list[bytes]) -> Starlette: + async def responses_endpoint(request): + async def gen(): + for chunk in frames: + await asyncio.sleep(0) # tiny yield to simulate I/O + yield chunk + return StreamingResponse(gen(), media_type="text/event-stream") + # IMPORTANT: the SDK will call base_url + "/responses" + return Starlette(routes=[Route("/v1/responses", responses_endpoint, methods=["POST"])]) + +def _make_openai_client(app: Starlette) -> AsyncOpenAI: + # httpx>=0.24 has public ASGITransport; older versions keep it under _transports + try: + from httpx import ASGITransport + except Exception: + from httpx._transports.asgi import ASGITransport # type: ignore + + transport = ASGITransport(app=app) + http_client = httpx.AsyncClient( + transport=transport, + base_url="http://testserver/v1", # so the POST goes to /v1/responses + timeout=HTTPX_DEFAULT_TIMEOUT, + ) + return AsyncOpenAI(api_key="test-key", http_client=http_client, base_url="http://testserver/v1") + +@pytest.mark.asyncio +async def test_unified_stream_via_local_sse() -> None: + app = _make_app(_unified_sequence()) + client = _make_openai_client(app) + assert client._client._base_url.host == "testserver" # sanity + + parts: list[str] = [] + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for ev in stream: + # Only aggregate on *delta* events; don't call extract_text on others + name = getattr(ev, "event", getattr(ev, "type", None)) + if name in ("response.output_text.delta", "output_text.delta"): + # prefer value; fallback to delta if needed + val = getattr(ev, "value", None) + if val is None and hasattr(ev, "delta"): + val = ev.delta + if val: + parts.append(val) + if name in ("response.completed", "completed"): + break + + await client.close() + assert "".join(parts) == "Hello World" + +@pytest.mark.asyncio +async def test_legacy_stream_via_local_sse() -> None: + app = _make_app(_legacy_sequence()) + client = _make_openai_client(app) + + got_delta = False + async with client.responses.stream(model="gpt-4o-mini", input="hi") as stream: + async for ev in stream: + name = getattr(ev, "event", getattr(ev, "type", None)) + if name in ("response.output_text.delta", "output_text.delta"): + got_delta = True + if name in ("response.completed", "completed"): + break + + await client.close() + assert got_delta is True diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py new file mode 100644 index 0000000000..533bea5f6f --- /dev/null +++ b/tests/test_timeouts.py @@ -0,0 +1,54 @@ +import pytest +import httpx +import asyncio + +@pytest.mark.anyio +async def test_asyncclient_raises_read_timeout_immediately(): + async def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ReadTimeout("forced timeout", request=request) + + transport = httpx.MockTransport(handler) + client = httpx.AsyncClient( + transport=transport, + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05), + ) + with pytest.raises(httpx.ReadTimeout): + await client.get("https://example.test/anything") + await client.aclose() + + +@pytest.mark.anyio +async def test_asyncclient_times_out_on_slow_body_via_tcp_server(): + async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + # lê até o fim dos headers HTTP + await reader.readuntil(b"\r\n\r\n") + except Exception: + writer.close() + return + # envia apenas os headers, sem corpo ainda + writer.write(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n") + await writer.drain() + # atrasa o corpo além do read-timeout do cliente + await asyncio.sleep(0.2) + writer.write(b"ok") + try: + await writer.drain() + finally: + writer.close() + + server = await asyncio.start_server(handle, host="127.0.0.1", port=0) + try: + port = server.sockets[0].getsockname()[1] + url = f"http://127.0.0.1:{port}/" + + client = httpx.AsyncClient( + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05) + ) + with pytest.raises(httpx.ReadTimeout): + # ao tentar ler o corpo, deve estourar read-timeout + await client.get(url) + await client.aclose() + finally: + server.close() + await server.wait_closed() diff --git a/tests/test_timeouts_client.py b/tests/test_timeouts_client.py new file mode 100644 index 0000000000..7e0dd86410 --- /dev/null +++ b/tests/test_timeouts_client.py @@ -0,0 +1,27 @@ +import pytest +import httpx +from openai import AsyncOpenAI + +@pytest.mark.anyio +async def test_openai_client_bubbles_readtimeout_from_transport(): + async def handler(request: httpx.Request) -> httpx.Response: + # Força um ReadTimeout vindo do transporte + raise httpx.ReadTimeout("forced timeout", request=request) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient( + transport=transport, + timeout=httpx.Timeout(0.05, connect=0.05, read=0.05, write=0.05), + base_url="https://api.openai.test", + ) + client = AsyncOpenAI( + api_key="dummy", + http_client=http_client, + base_url="https://api.openai.test" + ) + + with pytest.raises(httpx.ReadTimeout): + # Qualquer chamada simples; o MockTransport intercepta + await client.models.list() + + await http_client.aclose() diff --git a/tests/tests/test_timeouts_client.py b/tests/tests/test_timeouts_client.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/timeouts/__init__.py b/tests/timeouts/__init__.py new file mode 100644 index 0000000000..480dc84f40 --- /dev/null +++ b/tests/timeouts/__init__.py @@ -0,0 +1 @@ +"""Tests related to timeout behavior.""" diff --git a/tests/timeouts/_util.py b/tests/timeouts/_util.py new file mode 100644 index 0000000000..d9cf6b82f2 --- /dev/null +++ b/tests/timeouts/_util.py @@ -0,0 +1,18 @@ +from __future__ import annotations + + +def assert_timeout_eq(value, expected: float) -> None: + """Assert that a timeout-like value equals the expected seconds. + + Supports plain numeric timeouts or httpx.Timeout instances. + """ + from httpx import Timeout + + if isinstance(value, (int, float)): + assert float(value) == expected + elif isinstance(value, Timeout): + assert any(getattr(value, f, None) in (None, expected) for f in ("read", "connect", "write")), ( + f"Timeout fields do not match {expected}: {value!r}" + ) + else: + raise AssertionError(f"Unexpected timeout type: {type(value)}") diff --git a/tests/timeouts/test_overrides.py b/tests/timeouts/test_overrides.py new file mode 100644 index 0000000000..e8440135a0 --- /dev/null +++ b/tests/timeouts/test_overrides.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import os + +import httpx + +from openai import OpenAI +from openai._models import FinalRequestOptions +from openai._base_client import DEFAULT_TIMEOUT + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") + + +def test_per_request_timeout_overrides_default(client: OpenAI) -> None: + # default timeout applied when none provided per-request + request = client._build_request(FinalRequestOptions(method="get", url="/foo")) + timeout = httpx.Timeout(**request.extensions["timeout"]) # type: ignore[arg-type] + assert timeout == DEFAULT_TIMEOUT + + # per-request timeout overrides the default + request = client._build_request(FinalRequestOptions(method="get", url="/foo", timeout=httpx.Timeout(100.0))) + timeout = httpx.Timeout(**request.extensions["timeout"]) # type: ignore[arg-type] + assert timeout == httpx.Timeout(100.0)