diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 03731b76..658f9714 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -16,6 +16,10 @@ async def test_read_batch( await reader.close() + async def test_link_to_client(self, driver, topic_path, topic_consumer): + reader = driver.topic_client.reader(topic_path, topic_consumer) + assert reader._parent is driver.topic_client + async def test_read_message( self, driver, topic_path, topic_with_messages, topic_consumer ): @@ -27,7 +31,18 @@ async def test_read_message( await reader.close() - async def test_read_and_commit_message( + async def test_read_and_commit_with_close_reader( + self, driver, topic_path, topic_with_messages, topic_consumer + ): + async with driver.topic_client.reader(topic_path, topic_consumer) as reader: + message = await reader.receive_message() + reader.commit(message) + + async with driver.topic_client.reader(topic_path, topic_consumer) as reader: + message2 = await reader.receive_message() + assert message != message2 + + async def test_read_and_commit_with_ack( self, driver, topic_path, topic_with_messages, topic_consumer ): @@ -84,6 +99,10 @@ def test_read_batch( reader.close() + def test_link_to_client(self, driver_sync, topic_path, topic_consumer): + reader = driver_sync.topic_client.reader(topic_path, topic_consumer) + assert reader._parent is driver_sync.topic_client + def test_read_message( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): @@ -95,7 +114,18 @@ def test_read_message( reader.close() - def test_read_and_commit_message( + def test_read_and_commit_with_close_reader( + self, driver_sync, topic_path, topic_with_messages, topic_consumer + ): + with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader: + message = reader.receive_message() + reader.commit(message) + + with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader: + message2 = reader.receive_message() + assert message != message2 + + def test_read_and_commit_with_ack( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): reader = driver_sync.topic_client.reader(topic_path, topic_consumer) diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 68c34a8e..cfd65571 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -27,6 +27,10 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): init_info = await writer2.wait_init() assert init_info.last_seqno == 5 + async def test_link_to_client(self, driver, topic_path, topic_consumer): + writer = driver.topic_client.writer(topic_path) + assert writer._parent is driver.topic_client + async def test_random_producer_id( self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO ): @@ -138,6 +142,10 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): init_info = writer.wait_init() assert init_info.last_seqno == last_seqno + def test_link_to_client(self, driver_sync, topic_path, topic_consumer): + writer = driver_sync.topic_client.writer(topic_path) + assert writer._parent is driver_sync.topic_client + def test_random_producer_id( self, driver_sync: ydb.aio.Driver, diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index d9cd87fc..f95f7976 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -61,11 +61,19 @@ class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool _reconnector: ReaderReconnector + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): + def __init__( + self, + driver: Driver, + settings: topic_reader.PublicReaderSettings, + *, + _parent=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = ReaderReconnector(driver, settings) + self._parent = _parent async def __aenter__(self): return self diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index ca6fde92..28dfd004 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -25,6 +25,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _parent: typing.Any # need for prevent stop the client by GC def __init__( self, @@ -32,6 +33,7 @@ def __init__( settings: PublicReaderSettings, *, eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, # need for prevent stop the client by GC ): self._closed = False @@ -49,6 +51,8 @@ async def create_reader(): create_reader(), loop ).result() + self._parent = _parent + def __del__(self): self.close(flush=False) @@ -122,7 +126,7 @@ def commit( """ self._check_closed() - self._caller.call_sync(self._async_reader.commit(mess)) + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( self, diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index bc913123..481b797f 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -47,13 +47,20 @@ class WriterAsyncIO: _loop: asyncio.AbstractEventLoop _reconnector: "WriterAsyncIOReconnector" _closed: bool + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings): + def __init__( + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + _client=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = WriterAsyncIOReconnector( driver=driver, settings=WriterSettings(settings) ) + self._parent = _client async def __aenter__(self) -> "WriterAsyncIO": return self diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index de0ec41d..7eed3a47 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import typing from concurrent.futures import Future from typing import Union, List, Optional @@ -25,6 +26,7 @@ class WriterSync: _caller: CallFromSyncToAsync _async_writer: WriterAsyncIO _closed: bool + _parent: typing.Any # need for prevent close parent client by GC def __init__( self, @@ -32,6 +34,7 @@ def __init__( settings: PublicWriterSettings, *, eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, ): self._closed = False @@ -49,6 +52,7 @@ async def create_async_writer(): self._async_writer = self._caller.safe_call_with_result( create_async_writer(), None ) + self._parent = _parent def __enter__(self): return self diff --git a/ydb/topic.py b/ydb/topic.py index 7d983540..c8c4ffef 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -44,6 +44,7 @@ RetryPolicy as TopicWriterRetryPolicy, ) +from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter from ._topic_common.common import ( @@ -51,8 +52,6 @@ create_result_wrapper as _create_result_wrapper, ) -from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO - from ._grpc.grpcwrapper import ydb_topic as _ydb_topic from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 @@ -174,7 +173,7 @@ def reader( settings = TopicReaderSettings(**args) - return TopicReaderAsyncIO(self._driver, settings) + return TopicReaderAsyncIO(self._driver, settings, _parent=self) def writer( self, @@ -201,7 +200,7 @@ def writer( if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriterAsyncIO(self._driver, settings) + return TopicWriterAsyncIO(self._driver, settings, _client=self) def close(self): if self._closed: @@ -331,7 +330,7 @@ def reader( settings = TopicReaderSettings(**args) - return TopicReader(self._driver, settings) + return TopicReader(self._driver, settings, _parent=self) def writer( self, @@ -359,7 +358,7 @@ def writer( if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriter(self._driver, settings) + return TopicWriter(self._driver, settings, _parent=self) def close(self): if self._closed: