Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(internal): add internal helpers #1092

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/openai/_compat.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Union, TypeVar, cast
from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, cast, overload
from datetime import date, datetime
from typing_extensions import Self

import pydantic
from pydantic.fields import FieldInfo

from ._types import StrBytesIntFloat

_T = TypeVar("_T")
_ModelT = TypeVar("_ModelT", bound=pydantic.BaseModel)

# --------------- Pydantic v2 compatibility ---------------
Expand Down Expand Up @@ -178,8 +180,43 @@ class GenericModel(pydantic.generics.GenericModel, pydantic.BaseModel):
# cached properties
if TYPE_CHECKING:
cached_property = property

# we define a separate type (copied from typeshed)
# that represents that `cached_property` is `set`able
# at runtime, which differs from `@property`.
#
# this is a separate type as editors likely special case
# `@property` and we don't want to cause issues just to have
# more helpful internal types.

class typed_cached_property(Generic[_T]):
func: Callable[[Any], _T]
attrname: str | None

def __init__(self, func: Callable[[Any], _T]) -> None:
...

@overload
def __get__(self, instance: None, owner: type[Any] | None = None) -> Self:
...

@overload
def __get__(self, instance: object, owner: type[Any] | None = None) -> _T:
...

def __get__(self, instance: object, owner: type[Any] | None = None) -> _T | Self:
raise NotImplementedError()

def __set_name__(self, owner: type[Any], name: str) -> None:
...

# __set__ is not defined at runtime, but @cached_property is designed to be settable
def __set__(self, instance: object, value: _T) -> None:
...
else:
try:
from functools import cached_property as cached_property
except ImportError:
from cached_property import cached_property as cached_property

typed_cached_property = cached_property
1 change: 1 addition & 0 deletions src/openai/_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._sync import asyncify as asyncify
from ._proxy import LazyProxy as LazyProxy
from ._utils import (
flatten as flatten,
Expand Down
64 changes: 64 additions & 0 deletions src/openai/_utils/_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import functools
from typing import TypeVar, Callable, Awaitable
from typing_extensions import ParamSpec

import anyio
import anyio.to_thread

T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")


# copied from `asyncer`, https://github.com/tiangolo/asyncer
def asyncify(
function: Callable[T_ParamSpec, T_Retval],
*,
cancellable: bool = False,
limiter: anyio.CapacityLimiter | None = None,
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
"""
Take a blocking function and create an async one that receives the same
positional and keyword arguments, and that when called, calls the original function
in a worker thread using `anyio.to_thread.run_sync()`. Internally,
`asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
keyword arguments additional to positional arguments and it adds better support for
autocompletion and inline errors for the arguments of the function called and the
return value.

If the `cancellable` option is enabled and the task waiting for its completion is
cancelled, the thread will still run its course but its return value (or any raised
exception) will be ignored.

Use it like this:

```Python
def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
# Do work
return "Some result"


result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
print(result)
```

## Arguments

`function`: a blocking regular callable (e.g. a function)
`cancellable`: `True` to allow cancellation of the operation
`limiter`: capacity limiter to use to limit the total amount of threads running
(if omitted, the default limiter is used)

## Return

An async function that takes the same positional and keyword arguments as the
original one, that when called runs the same original function in a thread worker
and returns the result.
"""

async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
partial_f = functools.partial(function, *args, **kwargs)
return await anyio.to_thread.run_sync(partial_f, cancellable=cancellable, limiter=limiter)

return wrapper