From fa263d2807a78c89839fe8bb869ce01c5bf59f14 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 8 Jun 2023 17:40:37 +0300 Subject: [PATCH] Fix release buffer while read messages one by one --- ydb/_topic_reader/topic_reader_asyncio.py | 5 +- .../topic_reader_asyncio_test.py | 48 ++++++++++++++++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index ebe7bd6b..36e1aa6e 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -348,6 +348,9 @@ def receive_batch_nowait(self): return batch def receive_message_nowait(self): + if self._get_first_error(): + raise self._get_first_error() + try: batch = self._message_batches[0] message = batch.pop_message() @@ -355,7 +358,7 @@ def receive_message_nowait(self): return None if batch.empty(): - self._message_batches.popleft() + self.receive_batch_nowait() return message diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index c1019b02..4c6849f4 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -213,6 +213,16 @@ def create_message( ) async def send_message(self, stream_reader, message: PublicMessage): + await self.send_batch(stream_reader, [message]) + + async def send_batch(self, stream_reader, batch: typing.List[PublicMessage]): + if len(batch) == 0: + return + + first_message = batch[0] + for message in batch: + assert message._partition_session is first_message._partition_session + def batch_count(): return len(stream_reader._message_batches) @@ -225,7 +235,7 @@ def batch_count(): server_message=StreamReadMessage.ReadResponse( partition_data=[ StreamReadMessage.ReadResponse.PartitionData( - partition_session_id=message._partition_session.id, + partition_session_id=first_message._partition_session.id, batches=[ StreamReadMessage.ReadResponse.Batch( message_data=[ @@ -237,11 +247,12 @@ def batch_count(): uncompresed_size=len(message.data), message_group_id=message.message_group_id, ) + for message in batch ], - producer_id=message.producer_id, - write_session_meta=message.session_metadata, + producer_id=first_message.producer_id, + write_session_meta=first_message.session_metadata, codec=Codec.CODEC_RAW, - written_at=message.written_at, + written_at=first_message.written_at, ) ], ) @@ -1066,13 +1077,15 @@ async def test_read_message( async def test_receive_batch_nowait(self, stream, stream_reader, partition_session): assert stream_reader.receive_batch_nowait() is None + initial_buffer_size = stream_reader._buffer_size_bytes + mess1 = self.create_message(partition_session, 1, 1) await self.send_message(stream_reader, mess1) mess2 = self.create_message(partition_session, 2, 1) await self.send_message(stream_reader, mess2) - initial_buffer_size = stream_reader._buffer_size_bytes + assert stream_reader._buffer_size_bytes == initial_buffer_size - 2 * self.default_batch_size received = stream_reader.receive_batch_nowait() assert received == PublicBatch( @@ -1090,7 +1103,7 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi _codec=Codec.CODEC_RAW, ) - assert stream_reader._buffer_size_bytes == initial_buffer_size + 2 * self.default_batch_size + assert stream_reader._buffer_size_bytes == initial_buffer_size assert StreamReadMessage.ReadRequest(self.default_batch_size) == stream.from_client.get_nowait().client_message assert StreamReadMessage.ReadRequest(self.default_batch_size) == stream.from_client.get_nowait().client_message @@ -1098,6 +1111,29 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi with pytest.raises(asyncio.QueueEmpty): stream.from_client.get_nowait() + async def test_receive_message_nowait(self, stream, stream_reader, partition_session): + assert stream_reader.receive_batch_nowait() is None + + initial_buffer_size = stream_reader._buffer_size_bytes + + await self.send_batch( + stream_reader, [self.create_message(partition_session, 1, 1), self.create_message(partition_session, 2, 1)] + ) + await self.send_batch( + stream_reader, + [ + self.create_message(partition_session, 10, 1), + ], + ) + + assert stream_reader._buffer_size_bytes == initial_buffer_size - 2 * self.default_batch_size + + for expected_seqno in [1, 2, 10]: + mess = stream_reader.receive_message_nowait() + assert mess.seqno == expected_seqno + + assert stream_reader._buffer_size_bytes == initial_buffer_size + async def test_update_token(self, stream): settings = PublicReaderSettings( consumer="test-consumer",