From 677edfdcb8a1607ab90d6776d2eb29ec24b5af1e Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 Mar 2023 19:55:26 +0300 Subject: [PATCH 1/2] sync --- examples/topic/reader_async_example.py | 78 ++++++++++---------------- 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/examples/topic/reader_async_example.py b/examples/topic/reader_async_example.py index fb7f4c26..448e3ded 100644 --- a/examples/topic/reader_async_example.py +++ b/examples/topic/reader_async_example.py @@ -15,44 +15,32 @@ async def connect(): async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver): - with ydb.TopicClientAsyncIO(db).reader( + async with ydb.TopicClientAsyncIO(db).reader( "/database/topic/path", consumer="consumer" ) as reader: - async for message in reader.messages(): - pass + ... async def print_message_content(reader: ydb.TopicReaderAsyncIO): - async for message in reader.messages(): + while True: + message = await reader.receive_message() print("text", message.data.read().decode("utf-8")) # await and async_commit need only for sync commit mode - for wait ack from servr await reader.commit(message) -async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO): +async def process_messages_batch_with_commit(reader: ydb.TopicReaderAsyncIO): # Explicit commit example - async for batch in reader.batches(max_messages=100, timeout=2): - async with asyncio.TaskGroup() as tg: - for message in batch.messages: - tg.create_task(_process(message)) - - # wait complete of process all messages from batch be taskgroup context manager - # and commit complete batch + while True: + batch = await reader.receive_batch() + ... await reader.commit(batch) -async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO): - # Commit with context manager - async for batch in reader.batches(): - async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg: - for message in batch.messages: - tg.create_task(_process(message)) - - async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO): try: message = await asyncio.wait_for(reader.receive_message(), timeout=1) - except TimeoutError: + except asyncio.TimeoutError: print("Have no new messages in a second") return @@ -60,16 +48,19 @@ async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO): async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO): - async for message in reader.messages(timeout=1): - await _process(message) - print("Have no new messages in a second") + while True: + try: + message = await reader.receive_message() + await _process(message) + except asyncio.TimeoutError: + print("Have no new messages in a second") async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO): for i in range(10): try: message = await asyncio.wait_for(reader.receive_message(), timeout=1) - except TimeoutError: + except asyncio.TimeoutError: return await _process(message) @@ -78,7 +69,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO) for i in range(10): try: batch = await asyncio.wait_for(reader.receive_batch(), timeout=2) - except TimeoutError: + except asyncio.TimeoutError: return for message in batch.messages: @@ -92,27 +83,20 @@ async def auto_deserialize_message(db: ydb.aio.Driver): async with ydb.TopicClientAsyncIO(db).reader( "/database/topic/path", consumer="asd", deserializer=json.loads ) as reader: - async for message in reader.messages(): + while True: + message = await reader.receive_message() print( message.data.Name ) # message.data replaces by json.loads(message.data) of raw message reader.commit(message) -async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO): - async for batch in reader.batches(): - async with reader.commit_on_exit(batch): - for message in batch.messages: - if not batch.is_alive: - break - await _process(message) - - async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO): - async for message in reader.messages(): - time.sleep(1) # some work + while True: + message = await reader.receive_message() + time.sleep(123) # some work if message.is_alive: - time.sleep(123) # some other work + time.sleep(1) # some other work await reader.commit(message) @@ -126,7 +110,8 @@ def process_batch(batch): _process(message) reader.commit(batch) - async for batch in reader.batches(): + while True: + batch = await reader.receive_batch() process_batch(batch) @@ -137,18 +122,12 @@ async def connect_and_read_few_topics(db: ydb.aio.Driver): ydb.TopicSelector("/database/second-topic", partitions=3), ] ) as reader: - async for message in reader.messages(): + while True: + message = await reader.receive_message() await _process(message) await reader.commit(message) -async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO): - # no special handle, but batch will contain less than prefer count messages - async for batch in reader.batches(): - await _process(batch) - reader.commit(batch) - - async def advanced_commit_notify(db: ydb.aio.Driver): def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: print(event.topic) @@ -157,7 +136,8 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: async with ydb.TopicClientAsyncIO(db).reader( "/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit ) as reader: - async for message in reader.messages(): + while True: + message = await reader.receive_message() await _process(message) await reader.commit(message) From 1f94f7b5094048e7d96f89851232ff6c48611750 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 Mar 2023 20:23:14 +0300 Subject: [PATCH 2/2] fix examples --- examples/topic/reader_async_example.py | 17 ++++--- examples/topic/reader_example.py | 67 ++++++++++++++------------ examples/topic/writer_async_example.py | 28 +++++------ examples/topic/writer_example.py | 24 ++++----- 4 files changed, 71 insertions(+), 65 deletions(-) diff --git a/examples/topic/reader_async_example.py b/examples/topic/reader_async_example.py index 448e3ded..41825ab4 100644 --- a/examples/topic/reader_async_example.py +++ b/examples/topic/reader_async_example.py @@ -10,14 +10,14 @@ async def connect(): connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer") + reader = db.topic_client.reader("/local/topic", consumer="consumer") return reader async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).reader( + async with db.topic_client.reader( "/database/topic/path", consumer="consumer" - ) as reader: + ) as reader: # noqa ... @@ -80,7 +80,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO) async def auto_deserialize_message(db: ydb.aio.Driver): # async, batch work similar to this - async with ydb.TopicClientAsyncIO(db).reader( + async with db.topic_client.reader( "/database/topic/path", consumer="asd", deserializer=json.loads ) as reader: while True: @@ -116,7 +116,7 @@ def process_batch(batch): async def connect_and_read_few_topics(db: ydb.aio.Driver): - with ydb.TopicClientAsyncIO(db).reader( + with db.topic_client.reader( [ "/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3), @@ -133,7 +133,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: print(event.topic) print(event.offset) - async with ydb.TopicClientAsyncIO(db).reader( + async with db.topic_client.reader( "/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit ) as reader: while True: @@ -151,12 +151,13 @@ async def on_get_partition_start_offset( resp.start_offset = 123 return resp - async with ydb.TopicClient(db).reader( + async with db.topic_client.reader( "/local/test", consumer="consumer", on_get_partition_start_offset=on_get_partition_start_offset, ) as reader: - async for mess in reader.messages(): + while True: + mess = reader.receive_message() await _process(mess) # save progress to own database diff --git a/examples/topic/reader_example.py b/examples/topic/reader_example.py index 183c51d6..8de33c7e 100644 --- a/examples/topic/reader_example.py +++ b/examples/topic/reader_example.py @@ -9,33 +9,37 @@ def connect(): connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer") + reader = db.topic_client.reader("/local/topic", consumer="consumer") return reader def create_reader_and_close_with_context_manager(db: ydb.Driver): - with ydb.TopicClient(db).reader( + with db.topic_client.reader( "/database/topic/path", consumer="consumer", buffer_size_bytes=123 ) as reader: - for message in reader: + while True: + message = reader.receive_message() # noqa pass def print_message_content(reader: ydb.TopicReader): - for message in reader.messages(): + while True: + message = reader.receive_message() print("text", message.data.read().decode("utf-8")) reader.commit(message) def process_messages_batch_explicit_commit(reader: ydb.TopicReader): - for batch in reader.batches(max_messages=100, timeout=2): + while True: + batch = reader.receive_batch() for message in batch.messages: _process(message) reader.commit(batch) def process_messages_batch_context_manager_commit(reader: ydb.TopicReader): - for batch in reader.batches(max_messages=100, timeout=2): + while True: + batch = reader.receive_batch() with reader.commit_on_exit(batch): for message in batch.messages: _process(message) @@ -52,9 +56,12 @@ def get_message_with_timeout(reader: ydb.TopicReader): def get_all_messages_with_small_wait(reader: ydb.TopicReader): - for message in reader.messages(timeout=1): - _process(message) - print("Have no new messages in a second") + while True: + try: + message = reader.receive_message(timeout=1) + _process(message) + except TimeoutError: + print("Have no new messages in a second") def get_a_message_from_external_loop(reader: ydb.TopicReader): @@ -81,30 +88,23 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader): def auto_deserialize_message(db: ydb.Driver): # async, batch work similar to this - reader = ydb.TopicClient(db).reader( + reader = db.topic_client.reader( "/database/topic/path", consumer="asd", deserializer=json.loads ) - for message in reader.messages(): + while True: + message = reader.receive_message() print( message.data.Name ) # message.data replaces by json.loads(message.data) of raw message reader.commit(message) -def commit_batch_with_context(reader: ydb.TopicReader): - for batch in reader.batches(): - with reader.commit_on_exit(batch): - for message in batch.messages: - if not batch.is_alive: - break - _process(message) - - def handle_partition_stop(reader: ydb.TopicReader): - for message in reader.messages(): - time.sleep(1) # some work + while True: + message = reader.receive_message() + time.sleep(123) # some work if message.is_alive: - time.sleep(123) # some other work + time.sleep(1) # some other work reader.commit(message) @@ -118,25 +118,28 @@ def process_batch(batch): _process(message) reader.commit(batch) - for batch in reader.batches(): + while True: + batch = reader.receive_batch() process_batch(batch) def connect_and_read_few_topics(db: ydb.Driver): - with ydb.TopicClient(db).reader( + with db.topic_client.reader( [ "/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3), ] ) as reader: - for message in reader: + while True: + message = reader.receive_message() _process(message) reader.commit(message) def handle_partition_graceful_stop_batch(reader: ydb.TopicReader): # no special handle, but batch will contain less than prefer count messages - for batch in reader.batches(): + while True: + batch = reader.receive_batch() _process(batch) reader.commit(batch) @@ -146,10 +149,11 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: print(event.topic) print(event.offset) - with ydb.TopicClient(db).reader( + with db.topic_client.reader( "/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit ) as reader: - for message in reader: + while True: + message = reader.receive_message() with reader.commit_on_exit(message): _process(message) @@ -164,12 +168,13 @@ def on_get_partition_start_offset( resp.start_offset = 123 return resp - with ydb.TopicClient(db).reader( + with db.topic_client.reader( "/local/test", consumer="consumer", on_get_partition_start_offset=on_get_partition_start_offset, ) as reader: - for mess in reader: + while True: + mess = reader.receive_message() _process(mess) # save progress to own database diff --git a/examples/topic/writer_async_example.py b/examples/topic/writer_async_example.py index c5144685..28a17f52 100644 --- a/examples/topic/writer_async_example.py +++ b/examples/topic/writer_async_example.py @@ -1,6 +1,6 @@ import asyncio +import datetime import json -import time from typing import Dict, List import ydb @@ -8,7 +8,7 @@ async def create_writer(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).writer( + async with db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) as writer: @@ -16,15 +16,16 @@ async def create_writer(db: ydb.aio.Driver): async def connect_and_wait(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).writer( + async with db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) as writer: - writer.wait_init() + info = await writer.wait_init() # noqa + ... async def connect_without_context_manager(db: ydb.aio.Driver): - writer = ydb.TopicClientAsyncIO(db).writer( + writer = db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) @@ -49,7 +50,7 @@ async def send_messages(writer: ydb.TopicWriterAsyncIO): # with meta await writer.write( - ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()) + ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()) ) @@ -71,7 +72,7 @@ async def send_messages_with_manual_seqno(writer: ydb.TopicWriter): async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO): # future wait - await writer.write_with_result( + await writer.write_with_ack( [ ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2), @@ -84,10 +85,10 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO): async def send_json_message(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).writer( + async with db.topic_client.writer( "/database/path/topic", serializer=json.dumps ) as writer: - writer.write({"a": 123}) + await writer.write({"a": 123}) async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAsyncIO): @@ -99,14 +100,11 @@ async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAs async def send_messages_and_wait_all_commit_with_results( writer: ydb.TopicWriterAsyncIO, ): - last_future = None for i in range(10): content = "%s" % i - last_future = await writer.write_with_ack(content) + await writer.write(content) - await asyncio.wait(last_future) - if last_future.exception() is not None: - raise last_future.exception() + await writer.flush() async def switch_messages_with_many_producers( @@ -118,7 +116,7 @@ async def switch_messages_with_many_producers( # select writer for the msg writer_idx = msg[:1] writer = writers[writer_idx] - future = await writer.write_with_ack(msg) + future = await writer.write_with_ack_future(msg) futures.append(future) # wait acks from all writes diff --git a/examples/topic/writer_example.py b/examples/topic/writer_example.py index 1465dba5..63f6a108 100644 --- a/examples/topic/writer_example.py +++ b/examples/topic/writer_example.py @@ -1,6 +1,6 @@ import concurrent.futures +import datetime import json -import time from typing import Dict, List from concurrent.futures import Future, wait @@ -8,20 +8,20 @@ from ydb import TopicWriterMessage -async def connect(): - db = ydb.aio.Driver( +def connect(): + db = ydb.Driver( connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - writer = ydb.TopicClientAsyncIO(db).writer( + writer = db.topic_client.writer( "/local/topic", producer_id="producer-id", ) - await writer.write(TopicWriterMessage("asd")) + writer.write(TopicWriterMessage("asd")) def create_writer(db: ydb.Driver): - with ydb.TopicClient(db).writer( + with db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) as writer: @@ -29,15 +29,15 @@ def create_writer(db: ydb.Driver): def connect_and_wait(db: ydb.Driver): - with ydb.TopicClient(db).writer( + with db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) as writer: - writer.wait() + info = writer.wait_init() # noqa def connect_without_context_manager(db: ydb.Driver): - writer = ydb.TopicClient(db).writer( + writer = db.topic_client.writer( "/database/topic/path", producer_id="producer-id", ) @@ -61,7 +61,9 @@ def send_messages(writer: ydb.TopicWriter): ) # send few messages by one call # with meta - writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns())) + writer.write( + ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()) + ) def send_message_without_block_if_internal_buffer_is_full( @@ -101,7 +103,7 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter): def send_json_message(db: ydb.Driver): - with ydb.TopicClient(db).writer( + with db.topic_client.writer( "/database/path/topic", serializer=json.dumps ) as writer: writer.write({"a": 123})