Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/_topic_common/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def wait_condition(


async def wait_for_fast(
awaitable: typing.Awaitable,
awaitable: typing.Union[typing.Awaitable, typing.Coroutine],
timeout: typing.Optional[typing.Union[float, int]] = None,
):
fut = asyncio.ensure_future(awaitable)
Expand Down
69 changes: 69 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,75 @@ def session_count():
with pytest.raises(asyncio.QueueEmpty):
stream.from_client.get_nowait()

@pytest.mark.parametrize(
"graceful",
(
[True],
[False],
),
)
async def test_free_buffer_after_partition_stop(self, stream, stream_reader, partition_session, graceful):
initial_buffer_size = stream_reader._buffer_size_bytes
message_size = initial_buffer_size - 1

t = datetime.datetime.now()

stream.from_server.put_nowait(
StreamReadMessage.FromServer(
server_status=ServerStatus(issues.StatusCode.SUCCESS, []),
server_message=StreamReadMessage.ReadResponse(
bytes_size=message_size,
partition_data=[
StreamReadMessage.ReadResponse.PartitionData(
partition_session_id=partition_session.id,
batches=[
StreamReadMessage.ReadResponse.Batch(
message_data=[
StreamReadMessage.ReadResponse.MessageData(
partition_session.committed_offset + 1,
seq_no=123,
created_at=t,
data=bytes(),
uncompresed_size=message_size,
message_group_id="test-message-group",
)
],
producer_id="asd",
write_session_meta={},
codec=Codec.CODEC_RAW,
written_at=t,
)
],
)
],
),
)
)

def message_received():
return len(stream_reader._message_batches) > 0

await wait_condition(message_received)

assert stream_reader._buffer_size_bytes == initial_buffer_size - message_size

stream.from_server.put_nowait(
StreamReadMessage.FromServer(
server_status=ServerStatus(issues.StatusCode.SUCCESS, []),
server_message=StreamReadMessage.StopPartitionSessionRequest(
partition_session_id=partition_session.id,
graceful=graceful,
committed_offset=partition_session.committed_offset,
),
)
)

await wait_condition(lambda: partition_session.closed)

batch = stream_reader.receive_batch_nowait()
assert not batch.alive
assert stream_reader._buffer_size_bytes == initial_buffer_size

async def test_receive_message_from_server(
self,
stream_reader,
Expand Down