Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 37 additions & 56 deletions examples/topic/reader_async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,57 @@ async def connect():
connection_string="grpc://localhost:2135?database=/local",
credentials=ydb.credentials.AnonymousCredentials(),
)
reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer")
reader = db.topic_client.reader("/local/topic", consumer="consumer")
return reader


async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
with ydb.TopicClientAsyncIO(db).reader(
async with db.topic_client.reader(
"/database/topic/path", consumer="consumer"
) as reader:
async for message in reader.messages():
pass
) as reader: # noqa
...


async def print_message_content(reader: ydb.TopicReaderAsyncIO):
async for message in reader.messages():
while True:
message = await reader.receive_message()
print("text", message.data.read().decode("utf-8"))
# await and async_commit need only for sync commit mode - for wait ack from servr
await reader.commit(message)


async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO):
async def process_messages_batch_with_commit(reader: ydb.TopicReaderAsyncIO):
# Explicit commit example
async for batch in reader.batches(max_messages=100, timeout=2):
async with asyncio.TaskGroup() as tg:
for message in batch.messages:
tg.create_task(_process(message))

# wait complete of process all messages from batch be taskgroup context manager
# and commit complete batch
while True:
batch = await reader.receive_batch()
...
await reader.commit(batch)


async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO):
# Commit with context manager
async for batch in reader.batches():
async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg:
for message in batch.messages:
tg.create_task(_process(message))


async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO):
try:
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
except TimeoutError:
except asyncio.TimeoutError:
print("Have no new messages in a second")
return

print("mess", message.data)


async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO):
async for message in reader.messages(timeout=1):
await _process(message)
print("Have no new messages in a second")
while True:
try:
message = await reader.receive_message()
await _process(message)
except asyncio.TimeoutError:
print("Have no new messages in a second")


async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO):
for i in range(10):
try:
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
except TimeoutError:
except asyncio.TimeoutError:
return
await _process(message)

Expand All @@ -78,7 +69,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
for i in range(10):
try:
batch = await asyncio.wait_for(reader.receive_batch(), timeout=2)
except TimeoutError:
except asyncio.TimeoutError:
return

for message in batch.messages:
Expand All @@ -89,30 +80,23 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
async def auto_deserialize_message(db: ydb.aio.Driver):
# async, batch work similar to this

async with ydb.TopicClientAsyncIO(db).reader(
async with db.topic_client.reader(
"/database/topic/path", consumer="asd", deserializer=json.loads
) as reader:
async for message in reader.messages():
while True:
message = await reader.receive_message()
print(
message.data.Name
) # message.data replaces by json.loads(message.data) of raw message
reader.commit(message)


async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO):
async for batch in reader.batches():
async with reader.commit_on_exit(batch):
for message in batch.messages:
if not batch.is_alive:
break
await _process(message)


async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
async for message in reader.messages():
time.sleep(1) # some work
while True:
message = await reader.receive_message()
time.sleep(123) # some work
if message.is_alive:
time.sleep(123) # some other work
time.sleep(1) # some other work
await reader.commit(message)


Expand All @@ -126,38 +110,34 @@ def process_batch(batch):
_process(message)
reader.commit(batch)

async for batch in reader.batches():
while True:
batch = await reader.receive_batch()
process_batch(batch)


async def connect_and_read_few_topics(db: ydb.aio.Driver):
with ydb.TopicClientAsyncIO(db).reader(
with db.topic_client.reader(
[
"/database/topic/path",
ydb.TopicSelector("/database/second-topic", partitions=3),
]
) as reader:
async for message in reader.messages():
while True:
message = await reader.receive_message()
await _process(message)
await reader.commit(message)


async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO):
# no special handle, but batch will contain less than prefer count messages
async for batch in reader.batches():
await _process(batch)
reader.commit(batch)


async def advanced_commit_notify(db: ydb.aio.Driver):
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
print(event.topic)
print(event.offset)

async with ydb.TopicClientAsyncIO(db).reader(
async with db.topic_client.reader(
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
) as reader:
async for message in reader.messages():
while True:
message = await reader.receive_message()
await _process(message)
await reader.commit(message)

Expand All @@ -171,12 +151,13 @@ async def on_get_partition_start_offset(
resp.start_offset = 123
return resp

async with ydb.TopicClient(db).reader(
async with db.topic_client.reader(
"/local/test",
consumer="consumer",
on_get_partition_start_offset=on_get_partition_start_offset,
) as reader:
async for mess in reader.messages():
while True:
mess = reader.receive_message()
await _process(mess)
# save progress to own database

Expand Down
67 changes: 36 additions & 31 deletions examples/topic/reader_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,37 @@ def connect():
connection_string="grpc://localhost:2135?database=/local",
credentials=ydb.credentials.AnonymousCredentials(),
)
reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer")
reader = db.topic_client.reader("/local/topic", consumer="consumer")
return reader


def create_reader_and_close_with_context_manager(db: ydb.Driver):
with ydb.TopicClient(db).reader(
with db.topic_client.reader(
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
) as reader:
for message in reader:
while True:
message = reader.receive_message() # noqa
pass


def print_message_content(reader: ydb.TopicReader):
for message in reader.messages():
while True:
message = reader.receive_message()
print("text", message.data.read().decode("utf-8"))
reader.commit(message)


def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
for batch in reader.batches(max_messages=100, timeout=2):
while True:
batch = reader.receive_batch()
for message in batch.messages:
_process(message)
reader.commit(batch)


def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
for batch in reader.batches(max_messages=100, timeout=2):
while True:
batch = reader.receive_batch()
with reader.commit_on_exit(batch):
for message in batch.messages:
_process(message)
Expand All @@ -52,9 +56,12 @@ def get_message_with_timeout(reader: ydb.TopicReader):


def get_all_messages_with_small_wait(reader: ydb.TopicReader):
for message in reader.messages(timeout=1):
_process(message)
print("Have no new messages in a second")
while True:
try:
message = reader.receive_message(timeout=1)
_process(message)
except TimeoutError:
print("Have no new messages in a second")


def get_a_message_from_external_loop(reader: ydb.TopicReader):
Expand All @@ -81,30 +88,23 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
def auto_deserialize_message(db: ydb.Driver):
# async, batch work similar to this

reader = ydb.TopicClient(db).reader(
reader = db.topic_client.reader(
"/database/topic/path", consumer="asd", deserializer=json.loads
)
for message in reader.messages():
while True:
message = reader.receive_message()
print(
message.data.Name
) # message.data replaces by json.loads(message.data) of raw message
reader.commit(message)


def commit_batch_with_context(reader: ydb.TopicReader):
for batch in reader.batches():
with reader.commit_on_exit(batch):
for message in batch.messages:
if not batch.is_alive:
break
_process(message)


def handle_partition_stop(reader: ydb.TopicReader):
for message in reader.messages():
time.sleep(1) # some work
while True:
message = reader.receive_message()
time.sleep(123) # some work
if message.is_alive:
time.sleep(123) # some other work
time.sleep(1) # some other work
reader.commit(message)


Expand All @@ -118,25 +118,28 @@ def process_batch(batch):
_process(message)
reader.commit(batch)

for batch in reader.batches():
while True:
batch = reader.receive_batch()
process_batch(batch)


def connect_and_read_few_topics(db: ydb.Driver):
with ydb.TopicClient(db).reader(
with db.topic_client.reader(
[
"/database/topic/path",
ydb.TopicSelector("/database/second-topic", partitions=3),
]
) as reader:
for message in reader:
while True:
message = reader.receive_message()
_process(message)
reader.commit(message)


def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
# no special handle, but batch will contain less than prefer count messages
for batch in reader.batches():
while True:
batch = reader.receive_batch()
_process(batch)
reader.commit(batch)

Expand All @@ -146,10 +149,11 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
print(event.topic)
print(event.offset)

with ydb.TopicClient(db).reader(
with db.topic_client.reader(
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
) as reader:
for message in reader:
while True:
message = reader.receive_message()
with reader.commit_on_exit(message):
_process(message)

Expand All @@ -164,12 +168,13 @@ def on_get_partition_start_offset(
resp.start_offset = 123
return resp

with ydb.TopicClient(db).reader(
with db.topic_client.reader(
"/local/test",
consumer="consumer",
on_get_partition_start_offset=on_get_partition_start_offset,
) as reader:
for mess in reader:
while True:
mess = reader.receive_message()
_process(mess)
# save progress to own database

Expand Down
Loading