From a8c48abe46fad75e348df7e0c318b8c8d6e2ccb6 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 22 Mar 2023 23:23:41 +0300 Subject: [PATCH 1/4] allow save link to parent client - for prevent stop it (and underlay executors) by GC. --- tests/topics/test_topic_reader.py | 8 +++ tests/topics/test_topic_writer.py | 8 +++ ydb/_topic_reader/topic_reader_asyncio.py | 59 +++++++++++++---------- ydb/_topic_reader/topic_reader_sync.py | 4 ++ ydb/_topic_writer/topic_writer_asyncio.py | 11 ++++- ydb/_topic_writer/topic_writer_sync.py | 31 +++++++----- ydb/topic.py | 11 ++--- 7 files changed, 87 insertions(+), 45 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 03731b76..d7a6d7a4 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -16,6 +16,10 @@ async def test_read_batch( await reader.close() + async def test_link_to_client(self, driver, topic_path, topic_consumer): + reader = driver.topic_client.reader(topic_path, topic_consumer) + assert reader._parent is driver.topic_client + async def test_read_message( self, driver, topic_path, topic_with_messages, topic_consumer ): @@ -84,6 +88,10 @@ def test_read_batch( reader.close() + def test_link_to_client(self, driver_sync, topic_path, topic_consumer): + reader = driver_sync.topic_client.reader(topic_path, topic_consumer) + assert reader._parent is driver_sync.topic_client + def test_read_message( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 68c34a8e..cfd65571 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -27,6 +27,10 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): init_info = await writer2.wait_init() assert init_info.last_seqno == 5 + async def test_link_to_client(self, driver, topic_path, topic_consumer): + writer = driver.topic_client.writer(topic_path) + assert writer._parent is driver.topic_client + async def test_random_producer_id( self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO ): @@ -138,6 +142,10 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): init_info = writer.wait_init() assert init_info.last_seqno == last_seqno + def test_link_to_client(self, driver_sync, topic_path, topic_consumer): + writer = driver_sync.topic_client.writer(topic_path) + assert writer._parent is driver_sync.topic_client + def test_random_producer_id( self, driver_sync: ydb.aio.Driver, diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index d9cd87fc..6c8cfc17 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -26,6 +26,7 @@ Codec, ) from .._errors import check_retriable_error +from .. import topic class TopicReaderError(YdbError): @@ -61,11 +62,19 @@ class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool _reconnector: ReaderReconnector + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): + def __init__( + self, + driver: Driver, + settings: topic_reader.PublicReaderSettings, + *, + _parent=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = ReaderReconnector(driver, settings) + self._parent = _parent async def __aenter__(self): return self @@ -78,7 +87,7 @@ def __del__(self): self._loop.create_task(self.close(flush=False), name="close reader") async def receive_batch( - self, + self, ) -> typing.Union[datatypes.PublicBatch, None]: """ Get one messages batch from reader. @@ -99,7 +108,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: return self._reconnector.receive_message_nowait() def commit( - self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] + self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] ): """ Write commit message to a buffer. @@ -110,7 +119,7 @@ def commit( self._reconnector.commit(batch) async def commit_with_ack( - self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] + self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] ): """ write commit message to a buffer and wait ack from the server. @@ -195,7 +204,7 @@ def receive_message_nowait(self): return self._stream_reader.receive_message_nowait() def commit( - self, batch: datatypes.ICommittable + self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: return self._stream_reader.commit(batch) @@ -254,10 +263,10 @@ class ReaderStream: _get_token_function: Callable[[], str] def __init__( - self, - reader_reconnector_id: int, - settings: topic_reader.PublicReaderSettings, - get_token_function: Optional[Callable[[], str]] = None, + self, + reader_reconnector_id: int, + settings: topic_reader.PublicReaderSettings, + get_token_function: Optional[Callable[[], str]] = None, ): self._loop = asyncio.get_running_loop() self._id = ReaderStream._static_id_counter.inc_and_get() @@ -286,9 +295,9 @@ def __init__( @staticmethod async def create( - reader_reconnector_id: int, - driver: SupportedDriverType, - settings: topic_reader.PublicReaderSettings, + reader_reconnector_id: int, + driver: SupportedDriverType, + settings: topic_reader.PublicReaderSettings, ) -> "ReaderStream": stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto) @@ -306,7 +315,7 @@ async def create( return reader async def _start( - self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest + self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest ): if self._started: raise TopicReaderError("Double start ReaderStream") @@ -372,7 +381,7 @@ def receive_message_nowait(self): return message def commit( - self, batch: datatypes.ICommittable + self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: partition_session = batch._commit_get_partition_session() @@ -426,19 +435,19 @@ async def _read_messages_loop(self): self._on_read_response(message.server_message) elif isinstance( - message.server_message, StreamReadMessage.CommitOffsetResponse + message.server_message, StreamReadMessage.CommitOffsetResponse ): self._on_commit_response(message.server_message) elif isinstance( - message.server_message, - StreamReadMessage.StartPartitionSessionRequest, + message.server_message, + StreamReadMessage.StartPartitionSessionRequest, ): self._on_start_partition_session(message.server_message) elif isinstance( - message.server_message, - StreamReadMessage.StopPartitionSessionRequest, + message.server_message, + StreamReadMessage.StopPartitionSessionRequest, ): self._on_partition_session_stop(message.server_message) @@ -470,12 +479,12 @@ async def _update_token(self, token: str): self._update_token_event.clear() def _on_start_partition_session( - self, message: StreamReadMessage.StartPartitionSessionRequest + self, message: StreamReadMessage.StartPartitionSessionRequest ): try: if ( - message.partition_session.partition_session_id - in self._partition_sessions + message.partition_session.partition_session_id + in self._partition_sessions ): raise TopicReaderError( "Double start partition session: %s" @@ -506,7 +515,7 @@ def _on_start_partition_session( self._set_first_error(err) def _on_partition_session_stop( - self, message: StreamReadMessage.StopPartitionSessionRequest + self, message: StreamReadMessage.StopPartitionSessionRequest ): if message.partition_session_id not in self._partition_sessions: # may if receive stop partition with graceful=false after response on stop partition @@ -554,7 +563,7 @@ def _buffer_release_bytes(self, bytes_size): ) def _read_response_to_batches( - self, message: StreamReadMessage.ReadResponse + self, message: StreamReadMessage.ReadResponse ) -> typing.List[datatypes.PublicBatch]: batches = [] @@ -564,7 +573,7 @@ def _read_response_to_batches( bytes_per_batch = message.bytes_size // batch_count additional_bytes_to_last_batch = ( - message.bytes_size - bytes_per_batch * batch_count + message.bytes_size - bytes_per_batch * batch_count ) for partition_data in message.partition_data: diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index ca6fde92..3da91d8d 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -25,6 +25,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _parent: typing.Any # need for prevent stop the client by GC def __init__( self, @@ -32,6 +33,7 @@ def __init__( settings: PublicReaderSettings, *, eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, # need for prevent stop the client by GC ): self._closed = False @@ -49,6 +51,8 @@ async def create_reader(): create_reader(), loop ).result() + self._parent = _parent + def __del__(self): self.close(flush=False) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index bc913123..58a92897 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -40,6 +40,8 @@ GrpcWrapperAsyncIO, ) +from .. import topic + logger = logging.getLogger(__name__) @@ -47,13 +49,20 @@ class WriterAsyncIO: _loop: asyncio.AbstractEventLoop _reconnector: "WriterAsyncIOReconnector" _closed: bool + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings): + def __init__( + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + _client=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = WriterAsyncIOReconnector( driver=driver, settings=WriterSettings(settings) ) + self._parent = _client async def __aenter__(self) -> "WriterAsyncIO": return self diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index de0ec41d..f884e299 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -1,9 +1,11 @@ from __future__ import annotations import asyncio +import typing from concurrent.futures import Future from typing import Union, List, Optional +from .. import topic from .._grpc.grpcwrapper.common_utils import SupportedDriverType from .topic_writer import ( PublicWriterSettings, @@ -25,13 +27,15 @@ class WriterSync: _caller: CallFromSyncToAsync _async_writer: WriterAsyncIO _closed: bool + _parent: typing.Any # need for prevent close parent client by GC def __init__( - self, - driver: SupportedDriverType, - settings: PublicWriterSettings, - *, - eventloop: Optional[asyncio.AbstractEventLoop] = None, + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + *, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None ): self._closed = False @@ -49,6 +53,7 @@ async def create_async_writer(): self._async_writer = self._caller.safe_call_with_result( create_async_writer(), None ) + self._parent = _parent def __enter__(self): return self @@ -96,17 +101,17 @@ def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo: ) def write( - self, - messages: Union[Message, List[Message]], - timeout: TimeoutType = None, + self, + messages: Union[Message, List[Message]], + timeout: TimeoutType = None, ): self._check_closed() self._caller.safe_call_with_result(self._async_writer.write(messages), timeout) def async_write_with_ack( - self, - messages: Union[Message, List[Message]], + self, + messages: Union[Message, List[Message]], ) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]: self._check_closed() @@ -115,9 +120,9 @@ def async_write_with_ack( ) def write_with_ack( - self, - messages: Union[Message, List[Message]], - timeout: Union[float, None] = None, + self, + messages: Union[Message, List[Message]], + timeout: Union[float, None] = None, ) -> Union[PublicWriteResult, List[PublicWriteResult]]: self._check_closed() diff --git a/ydb/topic.py b/ydb/topic.py index 7d983540..c8c4ffef 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -44,6 +44,7 @@ RetryPolicy as TopicWriterRetryPolicy, ) +from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter from ._topic_common.common import ( @@ -51,8 +52,6 @@ create_result_wrapper as _create_result_wrapper, ) -from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO - from ._grpc.grpcwrapper import ydb_topic as _ydb_topic from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 @@ -174,7 +173,7 @@ def reader( settings = TopicReaderSettings(**args) - return TopicReaderAsyncIO(self._driver, settings) + return TopicReaderAsyncIO(self._driver, settings, _parent=self) def writer( self, @@ -201,7 +200,7 @@ def writer( if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriterAsyncIO(self._driver, settings) + return TopicWriterAsyncIO(self._driver, settings, _client=self) def close(self): if self._closed: @@ -331,7 +330,7 @@ def reader( settings = TopicReaderSettings(**args) - return TopicReader(self._driver, settings) + return TopicReader(self._driver, settings, _parent=self) def writer( self, @@ -359,7 +358,7 @@ def writer( if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriter(self._driver, settings) + return TopicWriter(self._driver, settings, _parent=self) def close(self): if self._closed: From c40f579a39652a4c285c8ca9fdce148e426379ff Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 23 Mar 2023 01:10:55 +0300 Subject: [PATCH 2/4] fix call commit from sync reader --- ydb/_topic_reader/topic_reader_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index 3da91d8d..28dfd004 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -126,7 +126,7 @@ def commit( """ self._check_closed() - self._caller.call_sync(self._async_reader.commit(mess)) + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( self, From 041c2b4a024019349767e49d7f04c6a615ccd29c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 23 Mar 2023 02:25:06 +0300 Subject: [PATCH 3/4] add tests for commit without ack --- tests/topics/test_topic_reader.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index d7a6d7a4..658f9714 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -31,7 +31,18 @@ async def test_read_message( await reader.close() - async def test_read_and_commit_message( + async def test_read_and_commit_with_close_reader( + self, driver, topic_path, topic_with_messages, topic_consumer + ): + async with driver.topic_client.reader(topic_path, topic_consumer) as reader: + message = await reader.receive_message() + reader.commit(message) + + async with driver.topic_client.reader(topic_path, topic_consumer) as reader: + message2 = await reader.receive_message() + assert message != message2 + + async def test_read_and_commit_with_ack( self, driver, topic_path, topic_with_messages, topic_consumer ): @@ -103,7 +114,18 @@ def test_read_message( reader.close() - def test_read_and_commit_message( + def test_read_and_commit_with_close_reader( + self, driver_sync, topic_path, topic_with_messages, topic_consumer + ): + with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader: + message = reader.receive_message() + reader.commit(message) + + with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader: + message2 = reader.receive_message() + assert message != message2 + + def test_read_and_commit_with_ack( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): reader = driver_sync.topic_client.reader(topic_path, topic_consumer) From d8b4f3e1e0a22197caf36979ddc2746cc7701a1c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 23 Mar 2023 03:48:00 +0300 Subject: [PATCH 4/4] group messages to batches --- ydb/_topic_reader/topic_reader_asyncio.py | 59 +++++++++++------------ ydb/_topic_writer/topic_writer_asyncio.py | 10 ++-- ydb/_topic_writer/topic_writer_sync.py | 29 ++++++----- 3 files changed, 47 insertions(+), 51 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 6c8cfc17..f95f7976 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -26,7 +26,6 @@ Codec, ) from .._errors import check_retriable_error -from .. import topic class TopicReaderError(YdbError): @@ -65,11 +64,11 @@ class PublicAsyncIOReader: _parent: typing.Any # need for prevent close parent client by GC def __init__( - self, - driver: Driver, - settings: topic_reader.PublicReaderSettings, - *, - _parent=None, + self, + driver: Driver, + settings: topic_reader.PublicReaderSettings, + *, + _parent=None, ): self._loop = asyncio.get_running_loop() self._closed = False @@ -87,7 +86,7 @@ def __del__(self): self._loop.create_task(self.close(flush=False), name="close reader") async def receive_batch( - self, + self, ) -> typing.Union[datatypes.PublicBatch, None]: """ Get one messages batch from reader. @@ -108,7 +107,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: return self._reconnector.receive_message_nowait() def commit( - self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] + self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] ): """ Write commit message to a buffer. @@ -119,7 +118,7 @@ def commit( self._reconnector.commit(batch) async def commit_with_ack( - self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] + self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] ): """ write commit message to a buffer and wait ack from the server. @@ -204,7 +203,7 @@ def receive_message_nowait(self): return self._stream_reader.receive_message_nowait() def commit( - self, batch: datatypes.ICommittable + self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: return self._stream_reader.commit(batch) @@ -263,10 +262,10 @@ class ReaderStream: _get_token_function: Callable[[], str] def __init__( - self, - reader_reconnector_id: int, - settings: topic_reader.PublicReaderSettings, - get_token_function: Optional[Callable[[], str]] = None, + self, + reader_reconnector_id: int, + settings: topic_reader.PublicReaderSettings, + get_token_function: Optional[Callable[[], str]] = None, ): self._loop = asyncio.get_running_loop() self._id = ReaderStream._static_id_counter.inc_and_get() @@ -295,9 +294,9 @@ def __init__( @staticmethod async def create( - reader_reconnector_id: int, - driver: SupportedDriverType, - settings: topic_reader.PublicReaderSettings, + reader_reconnector_id: int, + driver: SupportedDriverType, + settings: topic_reader.PublicReaderSettings, ) -> "ReaderStream": stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto) @@ -315,7 +314,7 @@ async def create( return reader async def _start( - self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest + self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest ): if self._started: raise TopicReaderError("Double start ReaderStream") @@ -381,7 +380,7 @@ def receive_message_nowait(self): return message def commit( - self, batch: datatypes.ICommittable + self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: partition_session = batch._commit_get_partition_session() @@ -435,19 +434,19 @@ async def _read_messages_loop(self): self._on_read_response(message.server_message) elif isinstance( - message.server_message, StreamReadMessage.CommitOffsetResponse + message.server_message, StreamReadMessage.CommitOffsetResponse ): self._on_commit_response(message.server_message) elif isinstance( - message.server_message, - StreamReadMessage.StartPartitionSessionRequest, + message.server_message, + StreamReadMessage.StartPartitionSessionRequest, ): self._on_start_partition_session(message.server_message) elif isinstance( - message.server_message, - StreamReadMessage.StopPartitionSessionRequest, + message.server_message, + StreamReadMessage.StopPartitionSessionRequest, ): self._on_partition_session_stop(message.server_message) @@ -479,12 +478,12 @@ async def _update_token(self, token: str): self._update_token_event.clear() def _on_start_partition_session( - self, message: StreamReadMessage.StartPartitionSessionRequest + self, message: StreamReadMessage.StartPartitionSessionRequest ): try: if ( - message.partition_session.partition_session_id - in self._partition_sessions + message.partition_session.partition_session_id + in self._partition_sessions ): raise TopicReaderError( "Double start partition session: %s" @@ -515,7 +514,7 @@ def _on_start_partition_session( self._set_first_error(err) def _on_partition_session_stop( - self, message: StreamReadMessage.StopPartitionSessionRequest + self, message: StreamReadMessage.StopPartitionSessionRequest ): if message.partition_session_id not in self._partition_sessions: # may if receive stop partition with graceful=false after response on stop partition @@ -563,7 +562,7 @@ def _buffer_release_bytes(self, bytes_size): ) def _read_response_to_batches( - self, message: StreamReadMessage.ReadResponse + self, message: StreamReadMessage.ReadResponse ) -> typing.List[datatypes.PublicBatch]: batches = [] @@ -573,7 +572,7 @@ def _read_response_to_batches( bytes_per_batch = message.bytes_size // batch_count additional_bytes_to_last_batch = ( - message.bytes_size - bytes_per_batch * batch_count + message.bytes_size - bytes_per_batch * batch_count ) for partition_data in message.partition_data: diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 58a92897..481b797f 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -40,8 +40,6 @@ GrpcWrapperAsyncIO, ) -from .. import topic - logger = logging.getLogger(__name__) @@ -52,10 +50,10 @@ class WriterAsyncIO: _parent: typing.Any # need for prevent close parent client by GC def __init__( - self, - driver: SupportedDriverType, - settings: PublicWriterSettings, - _client=None, + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + _client=None, ): self._loop = asyncio.get_running_loop() self._closed = False diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index f884e299..7eed3a47 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -5,7 +5,6 @@ from concurrent.futures import Future from typing import Union, List, Optional -from .. import topic from .._grpc.grpcwrapper.common_utils import SupportedDriverType from .topic_writer import ( PublicWriterSettings, @@ -30,12 +29,12 @@ class WriterSync: _parent: typing.Any # need for prevent close parent client by GC def __init__( - self, - driver: SupportedDriverType, - settings: PublicWriterSettings, - *, - eventloop: Optional[asyncio.AbstractEventLoop] = None, - _parent=None + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + *, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, ): self._closed = False @@ -101,17 +100,17 @@ def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo: ) def write( - self, - messages: Union[Message, List[Message]], - timeout: TimeoutType = None, + self, + messages: Union[Message, List[Message]], + timeout: TimeoutType = None, ): self._check_closed() self._caller.safe_call_with_result(self._async_writer.write(messages), timeout) def async_write_with_ack( - self, - messages: Union[Message, List[Message]], + self, + messages: Union[Message, List[Message]], ) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]: self._check_closed() @@ -120,9 +119,9 @@ def async_write_with_ack( ) def write_with_ack( - self, - messages: Union[Message, List[Message]], - timeout: Union[float, None] = None, + self, + messages: Union[Message, List[Message]], + timeout: Union[float, None] = None, ) -> Union[PublicWriteResult, List[PublicWriteResult]]: self._check_closed()