Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.18
3.12.9
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ from openai import OpenAI

client = OpenAI()

response = client.chat.responses.create(
response = client.responses.create(
input=[
{
"role": "user",
Expand Down Expand Up @@ -622,6 +622,35 @@ client.with_options(timeout=5.0).chat.completions.create(
model="gpt-4o",
)
```
- from openai import OpenAI
+ import httpx
+ from openai import OpenAI

import httpx
from openai import OpenAI

# Configure the default for all requests:
client = OpenAI(
# 20 seconds (default is 10 minutes)
timeout=20.0,
)

# More granular control:
client = OpenAI(
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)

# Override per-request:
client.with_options(timeout=5.0).chat.completions.create(
messages=[
{
"role": "user",
"content": "How can I list all files in a directory using Python?",
}
],
model="gpt-4o",
)


On timeout, an `APITimeoutError` is thrown.

Expand Down
22 changes: 22 additions & 0 deletions examples/async_chat_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

#!/usr/bin/env -S rye run python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main() -> None:
# Chat Completions is still supported indefinitely, but it's no longer the primary API.
# This example remains useful for users who still rely on chat.completions.
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Say this is a test (streaming)."}],
stream=True,
)
async for chunk in stream:
# Some users prefer accessing delta objects; for demo purposes, printing the chunk is enough.
print(chunk)

if __name__ == "__main__":
asyncio.run(main())

19 changes: 19 additions & 0 deletions examples/async_responses_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env -S rye run python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main() -> None:
# Async streaming with the Responses API (the recommended primary API).
stream = await client.responses.create(
model="gpt-4o-mini",
input="Write a one-sentence bedtime story about a unicorn.",
stream=True,
)
async for event in stream:
# Each event may contain deltas and final results; printing directly is sufficient for demo purposes.
print(event)

if __name__ == "__main__":
asyncio.run(main())
25 changes: 25 additions & 0 deletions examples/async_stream_unified.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env -S rye run python
import asyncio
from openai import AsyncOpenAI
from openai._streaming.unified import extract_text, StreamEvent
from openai._streaming.adapters import ResponsesEventAdapter # or ChatCompletionsEventAdapter


async def main():
client = AsyncOpenAI()
# Example with Responses stream (manual adapter for now)
async with client.responses.stream(
model="gpt-4.1-mini",
input="Write a single haiku about async streams.",
) as stream:
async for raw_evt in stream:
# Convert raw event into a unified StreamEvent
ev: StreamEvent = ResponsesEventAdapter.adapt(raw_evt)
txt = extract_text(ev)
if txt:
print(txt, end="")
print() # newline at the end


if __name__ == "__main__":
asyncio.run(main())
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"sniffio",
"tqdm > 4",
"jiter>=0.4.0, <1",
"starlette>=0.48.0",
]
requires-python = ">= 3.8"
classifiers = [
Expand Down Expand Up @@ -56,7 +57,7 @@ dev-dependencies = [
"pyright==1.1.399",
"mypy",
"respx",
"pytest",
"pytest>=8.4.2",
"pytest-asyncio",
"ruff",
"time-machine",
Expand Down
17 changes: 5 additions & 12 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ annotated-types==0.6.0
anyio==4.1.0
# via httpx
# via openai
# via starlette
argcomplete==3.1.2
# via nox
asttokens==2.4.1
# via inline-snapshot
async-timeout==5.0.1
# via aiohttp
attrs==24.2.0
# via aiohttp
# via outcome
Expand Down Expand Up @@ -57,10 +56,6 @@ distlib==0.3.7
# via virtualenv
distro==1.8.0
# via openai
exceptiongroup==1.2.2
# via anyio
# via pytest
# via trio
execnet==2.1.1
# via pytest-xdist
executing==2.2.0
Expand Down Expand Up @@ -146,7 +141,7 @@ pygments==2.18.0
pyjwt==2.8.0
# via msal
pyright==1.1.399
pytest==8.4.1
pytest==8.4.2
# via inline-snapshot
# via pytest-asyncio
# via pytest-xdist
Expand Down Expand Up @@ -179,11 +174,9 @@ sortedcontainers==2.4.0
# via trio
sounddevice==0.5.1
# via openai
starlette==0.48.0
# via openai
time-machine==2.9.0
tomli==2.0.2
# via inline-snapshot
# via mypy
# via pytest
tqdm==4.66.5
# via openai
trio==0.27.0
Expand All @@ -194,12 +187,12 @@ types-tqdm==4.66.0.20240417
typing-extensions==4.12.2
# via azure-core
# via azure-identity
# via multidict
# via mypy
# via openai
# via pydantic
# via pydantic-core
# via pyright
# via starlette
tzdata==2024.1
# via pandas
urllib3==2.2.1
Expand Down
9 changes: 4 additions & 5 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ annotated-types==0.6.0
anyio==4.1.0
# via httpx
# via openai
async-timeout==5.0.1
# via aiohttp
# via starlette
attrs==25.3.0
# via aiohttp
certifi==2023.7.22
Expand All @@ -33,8 +32,6 @@ cffi==1.17.1
# via sounddevice
distro==1.8.0
# via openai
exceptiongroup==1.2.2
# via anyio
frozenlist==1.7.0
# via aiohttp
# via aiosignal
Expand Down Expand Up @@ -84,15 +81,17 @@ sniffio==1.3.0
# via openai
sounddevice==0.5.1
# via openai
starlette==0.48.0
# via openai
tqdm==4.66.5
# via openai
types-pytz==2024.2.0.20241003
# via pandas-stubs
typing-extensions==4.12.2
# via multidict
# via openai
# via pydantic
# via pydantic-core
# via starlette
tzdata==2024.1
# via pandas
websockets==15.0.1
Expand Down
24 changes: 20 additions & 4 deletions src/openai/_streaming.py → src/openai/_streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import httpx

from ._utils import is_mapping, extract_type_var_from_base
from ._exceptions import APIError
from .._utils import is_mapping, extract_type_var_from_base
from .._exceptions import APIError

if TYPE_CHECKING:
from ._client import OpenAI, AsyncOpenAI
from .._client import OpenAI, AsyncOpenAI


_T = TypeVar("_T")
Expand Down Expand Up @@ -400,11 +400,27 @@ class MyStream(Stream[bytes]):
extract_stream_chunk_type(MyStream) -> bytes
```
"""
from ._base_client import Stream, AsyncStream
from .._base_client import Stream, AsyncStream

return extract_type_var_from_base(
stream_cls,
index=0,
generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)),
failure_message=failure_message,
)


from .unified import StreamEvent, extract_text
from .adapters import ResponsesEventAdapter, ChatCompletionsEventAdapter
from .wrap import _wrap_unified


__all__ = [
"Stream",
"AsyncStream",
"StreamEvent",
"extract_text",
"ResponsesEventAdapter",
"ChatCompletionsEventAdapter",
"_wrap_unified",
]
47 changes: 47 additions & 0 deletions src/openai/_streaming/adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

from typing import Any

from .unified import StreamEvent


class ResponsesEventAdapter:
"""Map Responses API streaming events into the unified stream shape."""

@staticmethod
def adapt(event: Any) -> StreamEvent:
event_type = getattr(event, "type", None)

if event_type == "response.output_text.delta":
return StreamEvent(
type="output_text.delta",
delta=getattr(event, "delta", None),
raw=event,
)

if event_type == "response.completed":
return StreamEvent(type="response.completed", raw=event)

return StreamEvent(type="response.error", raw=event)


class ChatCompletionsEventAdapter:
"""Map Chat Completions streaming chunks into the unified stream shape."""

@staticmethod
def adapt(chunk: Any) -> StreamEvent:
try:
choice = chunk.choices[0]
delta = getattr(getattr(choice, "delta", None), "content", None)
if delta:
return StreamEvent(type="output_text.delta", delta=delta, raw=chunk)
except Exception: # pragma: no cover - defensive adapter guard
pass

return StreamEvent(type="response.completed", raw=chunk)


__all__ = [
"ChatCompletionsEventAdapter",
"ResponsesEventAdapter",
]
37 changes: 37 additions & 0 deletions src/openai/_streaming/unified.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Literal, Optional

EventType = Literal[
"message_start",
"output_text.delta",
"output_text.done",
"tool_call.delta",
"tool_call.done",
"response.completed",
"response.error",
]


@dataclass
class StreamEvent:
type: EventType
model: Optional[str] = None
delta: Optional[str] = None
full_text: Optional[str] = None
tool_call_id: Optional[str] = None
raw: Optional[Any] = None


def extract_text(event: StreamEvent) -> str:
"""Return the text delta from a stream event."""

return event.delta or ""


__all__ = [
"EventType",
"StreamEvent",
"extract_text",
]
25 changes: 25 additions & 0 deletions src/openai/_streaming/wrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Callable
from .unified import StreamEvent

async def _map_async_iter(
source: AsyncIterator[Any],
fn: Callable[[Any], StreamEvent],
) -> AsyncIterator[StreamEvent]:
async for item in source:
yield fn(item)

@asynccontextmanager
async def _wrap_unified(cm, adapter_fn: Callable[[Any], StreamEvent]):
"""
Wrap an existing async context manager (cm) that yields an async iterator of raw events,
and expose a context manager that yields an async iterator of adapted StreamEvent.
"""
async with cm as underlying:
async def _mapped():
async for raw in underlying:
yield adapter_fn(raw)
yield _mapped()

# Optional alias if something imported the non-underscored name before
map_async_iter = _map_async_iter
Loading