From 21a8e690069e5b264db7cdc9a4f187c1d90daaad Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 8 Jun 2023 18:45:57 +0300 Subject: [PATCH] Fix race condition between recreate stream reader and add new commit waiters (commit_with_ack) Close #301 --- ydb/_topic_reader/datatypes.py | 6 +++-- ydb/_topic_reader/datatypes_test.py | 8 +++++- ydb/_topic_reader/topic_reader_asyncio.py | 27 +++++++++++++++---- .../topic_reader_asyncio_test.py | 2 +- ydb/topic.py | 2 ++ 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 4c209f27..1b767e7c 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -82,6 +82,8 @@ def __post_init__(self): self._loop = None def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter": + self._ensure_not_closed() + waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future()) if end_offset <= self.committed_offset: waiter._finish_ok() @@ -121,7 +123,7 @@ def close(self): return self.state = PartitionSession.State.Stopped - exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition() + exception = topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() for waiter in self._ack_waiters: waiter._finish_error(exception) @@ -131,7 +133,7 @@ def closed(self): def _ensure_not_closed(self): if self.state == PartitionSession.State.Stopped: - raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition() + raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() class State(enum.Enum): Active = 1 diff --git a/ydb/_topic_reader/datatypes_test.py b/ydb/_topic_reader/datatypes_test.py index f7c2a193..19a8bf36 100644 --- a/ydb/_topic_reader/datatypes_test.py +++ b/ydb/_topic_reader/datatypes_test.py @@ -192,9 +192,15 @@ async def test_close_notify_waiters(self, session): waiter = session.add_waiter(session.committed_offset + 1) session.close() - with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition): + with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError): waiter.future.result() async def test_close_twice(self, session): session.close() session.close() + + async def test_commit_after_close(self, session): + session.close() + + with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError): + session.add_waiter(session.committed_offset + 1) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index ebe7bd6b..886d76c8 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError): pass -class TopicReaderCommitToExpiredPartition(TopicReaderError): +class PublicTopicReaderPartitionExpiredError(TopicReaderError): """ Commit message when partition read session are dropped. It is ok - the message/batch will not commit to server and will receive in other read session @@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa Write commit message to a buffer. For the method no way check the commit result - (for example if lost connection - commits will not re-send and committed messages will receive again) + (for example if lost connection - commits will not re-send and committed messages will receive again). """ - self._reconnector.commit(batch) + try: + self._reconnector.commit(batch) + except PublicTopicReaderPartitionExpiredError: + pass async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ write commit message to a buffer and wait ack from the server. use asyncio.wait_for for wait with timeout. + + may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server + before receive commit ack. Message may be acked or not (if not - it will send in other read session, + to this or other reader). """ waiter = self._reconnector.commit(batch) await waiter.future @@ -174,6 +181,14 @@ async def _connection_loop(self): await asyncio.sleep(retry_info.sleep_timeout_seconds) attempt += 1 + finally: + if self._stream_reader is not None: + # noinspection PyBroadException + try: + await self._stream_reader.close() + except BaseException: + # supress any error on close stream reader + pass async def wait_message(self): while True: @@ -366,10 +381,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co raise TopicReaderError("reader can commit only self-produced messages") if partition_session.reader_stream_id != self._id: - raise TopicReaderCommitToExpiredPartition("commit messages after reconnect to server") + raise PublicTopicReaderPartitionExpiredError("commit messages after reconnect to server") if partition_session.id not in self._partition_sessions: - raise TopicReaderCommitToExpiredPartition("commit messages after server stop the partition read session") + raise PublicTopicReaderPartitionExpiredError("commit messages after server stop the partition read session") commit_range = batch._commit_get_offsets_range() waiter = partition_session.add_waiter(commit_range.end) @@ -617,6 +632,7 @@ async def flush(self): async def close(self): if self._closed: return + self._closed = True self._set_first_error(TopicReaderStreamClosedError()) @@ -625,6 +641,7 @@ async def close(self): for session in self._partition_sessions.values(): session.close() + self._partition_sessions.clear() for task in self._background_tasks: task.cancel() diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index c1019b02..f438141c 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -363,7 +363,7 @@ async def test_close_ack_waiters_when_close_stream_reader( waiter = partition_session.add_waiter(self.partition_session_committed_offset + 1) await wait_for_fast(stream_reader_started.close()) - with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition): + with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError): waiter.future.result() async def test_flush(self, stream, stream_reader_started: ReaderStream, partition_session): diff --git a/ydb/topic.py b/ydb/topic.py index 190f5329..d598fb7d 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -13,6 +13,7 @@ "TopicReaderAsyncIO", "TopicReaderSelector", "TopicReaderSettings", + "TopicReaderPartitionExpiredError", "TopicStatWindow", "TopicWriteResult", "TopicWriter", @@ -40,6 +41,7 @@ from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, + PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError, ) from ._topic_writer.topic_writer import ( # noqa: F401