Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ async def test_read_batch(

await reader.close()

async def test_link_to_client(self, driver, topic_path, topic_consumer):
reader = driver.topic_client.reader(topic_path, topic_consumer)
assert reader._parent is driver.topic_client

async def test_read_message(
self, driver, topic_path, topic_with_messages, topic_consumer
):
Expand All @@ -27,7 +31,18 @@ async def test_read_message(

await reader.close()

async def test_read_and_commit_message(
async def test_read_and_commit_with_close_reader(
self, driver, topic_path, topic_with_messages, topic_consumer
):
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
message = await reader.receive_message()
reader.commit(message)

async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
message2 = await reader.receive_message()
assert message != message2

async def test_read_and_commit_with_ack(
self, driver, topic_path, topic_with_messages, topic_consumer
):

Expand Down Expand Up @@ -84,6 +99,10 @@ def test_read_batch(

reader.close()

def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
assert reader._parent is driver_sync.topic_client

def test_read_message(
self, driver_sync, topic_path, topic_with_messages, topic_consumer
):
Expand All @@ -95,7 +114,18 @@ def test_read_message(

reader.close()

def test_read_and_commit_message(
def test_read_and_commit_with_close_reader(
self, driver_sync, topic_path, topic_with_messages, topic_consumer
):
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
message = reader.receive_message()
reader.commit(message)

with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
message2 = reader.receive_message()
assert message != message2

def test_read_and_commit_with_ack(
self, driver_sync, topic_path, topic_with_messages, topic_consumer
):
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
Expand Down
8 changes: 8 additions & 0 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
init_info = await writer2.wait_init()
assert init_info.last_seqno == 5

async def test_link_to_client(self, driver, topic_path, topic_consumer):
writer = driver.topic_client.writer(topic_path)
assert writer._parent is driver.topic_client

async def test_random_producer_id(
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
):
Expand Down Expand Up @@ -138,6 +142,10 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
init_info = writer.wait_init()
assert init_info.last_seqno == last_seqno

def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
writer = driver_sync.topic_client.writer(topic_path)
assert writer._parent is driver_sync.topic_client

def test_random_producer_id(
self,
driver_sync: ydb.aio.Driver,
Expand Down
10 changes: 9 additions & 1 deletion ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,19 @@ class PublicAsyncIOReader:
_loop: asyncio.AbstractEventLoop
_closed: bool
_reconnector: ReaderReconnector
_parent: typing.Any # need for prevent close parent client by GC

def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings):
def __init__(
self,
driver: Driver,
settings: topic_reader.PublicReaderSettings,
*,
_parent=None,
):
self._loop = asyncio.get_running_loop()
self._closed = False
self._reconnector = ReaderReconnector(driver, settings)
self._parent = _parent

async def __aenter__(self):
return self
Expand Down
6 changes: 5 additions & 1 deletion ydb/_topic_reader/topic_reader_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ class TopicReaderSync:
_caller: CallFromSyncToAsync
_async_reader: PublicAsyncIOReader
_closed: bool
_parent: typing.Any # need for prevent stop the client by GC

def __init__(
self,
driver: SupportedDriverType,
settings: PublicReaderSettings,
*,
eventloop: Optional[asyncio.AbstractEventLoop] = None,
_parent=None, # need for prevent stop the client by GC
):
self._closed = False

Expand All @@ -49,6 +51,8 @@ async def create_reader():
create_reader(), loop
).result()

self._parent = _parent

def __del__(self):
self.close(flush=False)

Expand Down Expand Up @@ -122,7 +126,7 @@ def commit(
"""
self._check_closed()

self._caller.call_sync(self._async_reader.commit(mess))
self._caller.call_sync(lambda: self._async_reader.commit(mess))

def commit_with_ack(
self,
Expand Down
9 changes: 8 additions & 1 deletion ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ class WriterAsyncIO:
_loop: asyncio.AbstractEventLoop
_reconnector: "WriterAsyncIOReconnector"
_closed: bool
_parent: typing.Any # need for prevent close parent client by GC

def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings):
def __init__(
self,
driver: SupportedDriverType,
settings: PublicWriterSettings,
_client=None,
):
self._loop = asyncio.get_running_loop()
self._closed = False
self._reconnector = WriterAsyncIOReconnector(
driver=driver, settings=WriterSettings(settings)
)
self._parent = _client

async def __aenter__(self) -> "WriterAsyncIO":
return self
Expand Down
4 changes: 4 additions & 0 deletions ydb/_topic_writer/topic_writer_sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import typing
from concurrent.futures import Future
from typing import Union, List, Optional

Expand All @@ -25,13 +26,15 @@ class WriterSync:
_caller: CallFromSyncToAsync
_async_writer: WriterAsyncIO
_closed: bool
_parent: typing.Any # need for prevent close parent client by GC

def __init__(
self,
driver: SupportedDriverType,
settings: PublicWriterSettings,
*,
eventloop: Optional[asyncio.AbstractEventLoop] = None,
_parent=None,
):

self._closed = False
Expand All @@ -49,6 +52,7 @@ async def create_async_writer():
self._async_writer = self._caller.safe_call_with_result(
create_async_writer(), None
)
self._parent = _parent

def __enter__(self):
return self
Expand Down
11 changes: 5 additions & 6 deletions ydb/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@
RetryPolicy as TopicWriterRetryPolicy,
)

from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter

from ._topic_common.common import (
wrap_operation as _wrap_operation,
create_result_wrapper as _create_result_wrapper,
)

from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO

from ._grpc.grpcwrapper import ydb_topic as _ydb_topic
from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types
from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401
Expand Down Expand Up @@ -174,7 +173,7 @@ def reader(

settings = TopicReaderSettings(**args)

return TopicReaderAsyncIO(self._driver, settings)
return TopicReaderAsyncIO(self._driver, settings, _parent=self)

def writer(
self,
Expand All @@ -201,7 +200,7 @@ def writer(
if not settings.encoder_executor:
settings.encoder_executor = self._executor

return TopicWriterAsyncIO(self._driver, settings)
return TopicWriterAsyncIO(self._driver, settings, _client=self)

def close(self):
if self._closed:
Expand Down Expand Up @@ -331,7 +330,7 @@ def reader(

settings = TopicReaderSettings(**args)

return TopicReader(self._driver, settings)
return TopicReader(self._driver, settings, _parent=self)

def writer(
self,
Expand Down Expand Up @@ -359,7 +358,7 @@ def writer(
if not settings.encoder_executor:
settings.encoder_executor = self._executor

return TopicWriter(self._driver, settings)
return TopicWriter(self._driver, settings, _parent=self)

def close(self):
if self._closed:
Expand Down