Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def __post_init__(self):
self._loop = None

def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
self._ensure_not_closed()

waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future())
if end_offset <= self.committed_offset:
waiter._finish_ok()
Expand Down Expand Up @@ -121,7 +123,7 @@ def close(self):
return

self.state = PartitionSession.State.Stopped
exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
exception = topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
for waiter in self._ack_waiters:
waiter._finish_error(exception)

Expand All @@ -131,7 +133,7 @@ def closed(self):

def _ensure_not_closed(self):
if self.state == PartitionSession.State.Stopped:
raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()

class State(enum.Enum):
Active = 1
Expand Down
8 changes: 7 additions & 1 deletion ydb/_topic_reader/datatypes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,15 @@ async def test_close_notify_waiters(self, session):
waiter = session.add_waiter(session.committed_offset + 1)
session.close()

with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
waiter.future.result()

async def test_close_twice(self, session):
session.close()
session.close()

async def test_commit_after_close(self, session):
session.close()

with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
session.add_waiter(session.committed_offset + 1)
27 changes: 22 additions & 5 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
pass


class TopicReaderCommitToExpiredPartition(TopicReaderError):
class PublicTopicReaderPartitionExpiredError(TopicReaderError):
"""
Commit message when partition read session are dropped.
It is ok - the message/batch will not commit to server and will receive in other read session
Expand Down Expand Up @@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
Write commit message to a buffer.

For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again)
(for example if lost connection - commits will not re-send and committed messages will receive again).
"""
self._reconnector.commit(batch)
try:
self._reconnector.commit(batch)
except PublicTopicReaderPartitionExpiredError:
pass

async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
write commit message to a buffer and wait ack from the server.

use asyncio.wait_for for wait with timeout.

may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
to this or other reader).
"""
waiter = self._reconnector.commit(batch)
await waiter.future
Expand Down Expand Up @@ -174,6 +181,14 @@ async def _connection_loop(self):
await asyncio.sleep(retry_info.sleep_timeout_seconds)

attempt += 1
finally:
if self._stream_reader is not None:
# noinspection PyBroadException
try:
await self._stream_reader.close()
except BaseException:
# supress any error on close stream reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass

async def wait_message(self):
while True:
Expand Down Expand Up @@ -366,10 +381,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
raise TopicReaderError("reader can commit only self-produced messages")

if partition_session.reader_stream_id != self._id:
raise TopicReaderCommitToExpiredPartition("commit messages after reconnect to server")
raise PublicTopicReaderPartitionExpiredError("commit messages after reconnect to server")

if partition_session.id not in self._partition_sessions:
raise TopicReaderCommitToExpiredPartition("commit messages after server stop the partition read session")
raise PublicTopicReaderPartitionExpiredError("commit messages after server stop the partition read session")

commit_range = batch._commit_get_offsets_range()
waiter = partition_session.add_waiter(commit_range.end)
Expand Down Expand Up @@ -617,6 +632,7 @@ async def flush(self):
async def close(self):
if self._closed:
return

self._closed = True

self._set_first_error(TopicReaderStreamClosedError())
Expand All @@ -625,6 +641,7 @@ async def close(self):

for session in self._partition_sessions.values():
session.close()
self._partition_sessions.clear()

for task in self._background_tasks:
task.cancel()
Expand Down
2 changes: 1 addition & 1 deletion ydb/_topic_reader/topic_reader_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ async def test_close_ack_waiters_when_close_stream_reader(
waiter = partition_session.add_waiter(self.partition_session_committed_offset + 1)
await wait_for_fast(stream_reader_started.close())

with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
waiter.future.result()

async def test_flush(self, stream, stream_reader_started: ReaderStream, partition_session):
Expand Down
2 changes: 2 additions & 0 deletions ydb/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"TopicReaderAsyncIO",
"TopicReaderSelector",
"TopicReaderSettings",
"TopicReaderPartitionExpiredError",
"TopicStatWindow",
"TopicWriteResult",
"TopicWriter",
Expand Down Expand Up @@ -40,6 +41,7 @@

from ._topic_reader.topic_reader_asyncio import (
PublicAsyncIOReader as TopicReaderAsyncIO,
PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
)

from ._topic_writer.topic_writer import ( # noqa: F401
Expand Down