Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
producer_id="test",
auto_seqno=False,
) as writer:
await writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
ret = await writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
assert ret.offset == 0

async with driver.topic_client.writer(
topic_path,
Expand Down Expand Up @@ -62,12 +63,14 @@ async def test_write_multi_message_with_ack(
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
):
async with driver.topic_client.writer(topic_path) as writer:
await writer.write_with_ack(
res1, res2 = await writer.write_with_ack(
[
ydb.TopicWriterMessage(data="123".encode()),
ydb.TopicWriterMessage(data="456".encode()),
]
)
assert res1.offset == 0
assert res2.offset == 1

batch = await topic_reader.receive_batch()

Expand Down
11 changes: 10 additions & 1 deletion ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TopicWriterStopped,
TopicWriterError,
messages_to_proto_requests,
PublicWriteResult,
PublicWriteResultTypes,
Message,
)
Expand Down Expand Up @@ -505,7 +506,15 @@ def _handle_receive_ack(self, ack):
"internal error - receive unexpected ack. Expected seqno: %s, received seqno: %s"
% (current_message.seq_no, ack.seq_no)
)
message_future.set_result(None) # todo - return result with offset or skip status
write_ack_msg = StreamWriteMessage.WriteResponse.WriteAck
status = ack.message_write_status
if isinstance(status, write_ack_msg.StatusSkipped):
result = PublicWriteResult.Skipped()
elif isinstance(status, write_ack_msg.StatusWritten):
result = PublicWriteResult.Written(offset=status.offset)
else:
raise TopicWriterError("internal error - receive unexpected ack message.")
message_future.set_result(result)

async def _send_loop(self, writer: "WriterAsyncIOStream"):
try:
Expand Down