diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a2c4d9..0d348aa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +* Fix exception for convert sync to async iterator +* Fixed start many sync writers/readers in parallel + ## 3.3.0 ## * Added support to set many topics and topic reader settings for read in one reader * Added ydb.TopicWriterInitInfo, ydb.TopicWriteResult as public types diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index d93ff2e2..327cb81e 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -1,3 +1,6 @@ +from __future__ import annotations +from typing import List + import pytest import ydb.aio @@ -196,3 +199,16 @@ def test_write_encoded(self, driver_sync: ydb.Driver, topic_path: str, codec): writer.write("a" * 1000) writer.write("b" * 1000) writer.write("c" * 1000) + + def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topic_path): + target_count = 100 + writers = [] # type: List[ydb.TopicWriter] + for i in range(target_count): + writer = driver_sync.topic_client.writer(topic_path) + writers.append(writer) + + for i, writer in enumerate(writers): + writer.write(str(i)) + + for writer in writers: + writer.close() diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index faec03c2..bc294025 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -2,6 +2,7 @@ import abc import asyncio +import concurrent.futures import contextvars import datetime import functools @@ -111,19 +112,20 @@ def __next__(self): return item -class SyncIteratorToAsyncIterator: - def __init__(self, sync_iterator: Iterator): +class SyncToAsyncIterator: + def __init__(self, sync_iterator: Iterator, executor: concurrent.futures.Executor): self._sync_iterator = sync_iterator + self._executor = executor def __aiter__(self): return self async def __anext__(self): try: - res = await to_thread(self._sync_iterator.__next__) + res = await to_thread(self._sync_iterator.__next__, executor=self._executor) return res - except StopAsyncIteration: - raise StopIteration() + except StopIteration: + raise StopAsyncIteration() class IGrpcWrapperAsyncIO(abc.ABC): @@ -149,12 +151,17 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): convert_server_grpc_to_wrapper: Callable[[Any], Any] _connection_state: str _stream_call: Optional[Union[grpc.aio.StreamStreamCall, "grpc._channel._MultiThreadedRendezvous"]] + _wait_executor: Optional[concurrent.futures.ThreadPoolExecutor] def __init__(self, convert_server_grpc_to_wrapper): self.from_client_grpc = asyncio.Queue() self.convert_server_grpc_to_wrapper = convert_server_grpc_to_wrapper self._connection_state = "new" self._stream_call = None + self._wait_executor = None + + def __del__(self): + self._clean_executor(wait=False) async def start(self, driver: SupportedDriverType, stub, method): if asyncio.iscoroutinefunction(driver.__call__): @@ -168,6 +175,12 @@ def close(self): if self._stream_call: self._stream_call.cancel() + self._clean_executor(wait=True) + + def _clean_executor(self, wait: bool): + if self._wait_executor: + self._wait_executor.shutdown(wait) + async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc) stream_call = await driver( @@ -180,14 +193,11 @@ async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): async def _start_sync_driver(self, driver: ydb.Driver, stub, method): requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc) - stream_call = await to_thread( - driver, - requests_iterator, - stub, - method, - ) + self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + + stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor) self._stream_call = stream_call - self.from_server_grpc = SyncIteratorToAsyncIterator(stream_call.__iter__()) + self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor) async def receive(self) -> Any: # todo handle grpc exceptions and convert it to internal exceptions @@ -255,7 +265,7 @@ def callback_from_asyncio(callback: Union[Callable, Coroutine]) -> [asyncio.Futu return loop.run_in_executor(None, callback) -async def to_thread(func, /, *args, **kwargs): +async def to_thread(func, *args, executor: Optional[concurrent.futures.Executor], **kwargs): """Asynchronously run function *func* in a separate thread. Any *args and **kwargs supplied for this function are directly passed @@ -271,7 +281,7 @@ async def to_thread(func, /, *args, **kwargs): loop = asyncio.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) - return await loop.run_in_executor(None, func_call) + return await loop.run_in_executor(executor, func_call) def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> Optional[ProtoDuration]: