Skip to content

Commit

Permalink
chore(internal): add internal helpers (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-bot committed Jan 22, 2024
1 parent 15488ce commit 629bde5
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 1 deletion.
39 changes: 38 additions & 1 deletion src/openai/_compat.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Union, TypeVar, cast
from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, cast, overload
from datetime import date, datetime
from typing_extensions import Self

import pydantic
from pydantic.fields import FieldInfo

from ._types import StrBytesIntFloat

_T = TypeVar("_T")
_ModelT = TypeVar("_ModelT", bound=pydantic.BaseModel)

# --------------- Pydantic v2 compatibility ---------------
Expand Down Expand Up @@ -178,8 +180,43 @@ class GenericModel(pydantic.generics.GenericModel, pydantic.BaseModel):
# cached properties
if TYPE_CHECKING:
cached_property = property

# we define a separate type (copied from typeshed)
# that represents that `cached_property` is `set`able
# at runtime, which differs from `@property`.
#
# this is a separate type as editors likely special case
# `@property` and we don't want to cause issues just to have
# more helpful internal types.

class typed_cached_property(Generic[_T]):
func: Callable[[Any], _T]
attrname: str | None

def __init__(self, func: Callable[[Any], _T]) -> None:
...

@overload
def __get__(self, instance: None, owner: type[Any] | None = None) -> Self:
...

@overload
def __get__(self, instance: object, owner: type[Any] | None = None) -> _T:
...

def __get__(self, instance: object, owner: type[Any] | None = None) -> _T | Self:
raise NotImplementedError()

def __set_name__(self, owner: type[Any], name: str) -> None:
...

# __set__ is not defined at runtime, but @cached_property is designed to be settable
def __set__(self, instance: object, value: _T) -> None:
...
else:
try:
from functools import cached_property as cached_property
except ImportError:
from cached_property import cached_property as cached_property

typed_cached_property = cached_property
1 change: 1 addition & 0 deletions src/openai/_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._sync import asyncify as asyncify
from ._proxy import LazyProxy as LazyProxy
from ._utils import (
flatten as flatten,
Expand Down
64 changes: 64 additions & 0 deletions src/openai/_utils/_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import functools
from typing import TypeVar, Callable, Awaitable
from typing_extensions import ParamSpec

import anyio
import anyio.to_thread

T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")


# copied from `asyncer`, https://github.com/tiangolo/asyncer
def asyncify(
function: Callable[T_ParamSpec, T_Retval],
*,
cancellable: bool = False,
limiter: anyio.CapacityLimiter | None = None,
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
"""
Take a blocking function and create an async one that receives the same
positional and keyword arguments, and that when called, calls the original function
in a worker thread using `anyio.to_thread.run_sync()`. Internally,
`asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
keyword arguments additional to positional arguments and it adds better support for
autocompletion and inline errors for the arguments of the function called and the
return value.
If the `cancellable` option is enabled and the task waiting for its completion is
cancelled, the thread will still run its course but its return value (or any raised
exception) will be ignored.
Use it like this:
```Python
def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
# Do work
return "Some result"
result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
print(result)
```
## Arguments
`function`: a blocking regular callable (e.g. a function)
`cancellable`: `True` to allow cancellation of the operation
`limiter`: capacity limiter to use to limit the total amount of threads running
(if omitted, the default limiter is used)
## Return
An async function that takes the same positional and keyword arguments as the
original one, that when called runs the same original function in a thread worker
and returns the result.
"""

async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
partial_f = functools.partial(function, *args, **kwargs)
return await anyio.to_thread.run_sync(partial_f, cancellable=cancellable, limiter=limiter)

return wrapper

0 comments on commit 629bde5

Please sign in to comment.