Skip to content

Commit 6ac50fe

Browse files
committed
fix examples
1 parent 677edfd commit 6ac50fe

File tree

4 files changed

+68
-62
lines changed

4 files changed

+68
-62
lines changed

examples/topic/reader_async_example.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ async def connect():
1010
connection_string="grpc://localhost:2135?database=/local",
1111
credentials=ydb.credentials.AnonymousCredentials(),
1212
)
13-
reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer")
13+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1414
return reader
1515

1616

1717
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
18-
async with ydb.TopicClientAsyncIO(db).reader(
18+
async with db.topic_client.reader(
1919
"/database/topic/path", consumer="consumer"
2020
) as reader:
2121
...
@@ -80,7 +80,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
8080
async def auto_deserialize_message(db: ydb.aio.Driver):
8181
# async, batch work similar to this
8282

83-
async with ydb.TopicClientAsyncIO(db).reader(
83+
async with db.topic_client.reader(
8484
"/database/topic/path", consumer="asd", deserializer=json.loads
8585
) as reader:
8686
while True:
@@ -116,7 +116,7 @@ def process_batch(batch):
116116

117117

118118
async def connect_and_read_few_topics(db: ydb.aio.Driver):
119-
with ydb.TopicClientAsyncIO(db).reader(
119+
with db.topic_client.reader(
120120
[
121121
"/database/topic/path",
122122
ydb.TopicSelector("/database/second-topic", partitions=3),
@@ -133,7 +133,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
133133
print(event.topic)
134134
print(event.offset)
135135

136-
async with ydb.TopicClientAsyncIO(db).reader(
136+
async with db.topic_client.reader(
137137
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
138138
) as reader:
139139
while True:
@@ -151,12 +151,13 @@ async def on_get_partition_start_offset(
151151
resp.start_offset = 123
152152
return resp
153153

154-
async with ydb.TopicClient(db).reader(
154+
async with db.topic_client.reader(
155155
"/local/test",
156156
consumer="consumer",
157157
on_get_partition_start_offset=on_get_partition_start_offset,
158158
) as reader:
159-
async for mess in reader.messages():
159+
while True:
160+
mess = reader.receive_message()
160161
await _process(mess)
161162
# save progress to own database
162163

examples/topic/reader_example.py

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,37 @@ def connect():
99
connection_string="grpc://localhost:2135?database=/local",
1010
credentials=ydb.credentials.AnonymousCredentials(),
1111
)
12-
reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer")
12+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1313
return reader
1414

1515

1616
def create_reader_and_close_with_context_manager(db: ydb.Driver):
17-
with ydb.TopicClient(db).reader(
17+
with db.topic_client.reader(
1818
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
1919
) as reader:
20-
for message in reader:
20+
while True:
21+
message = reader.receive_message()
2122
pass
2223

2324

2425
def print_message_content(reader: ydb.TopicReader):
25-
for message in reader.messages():
26+
while True:
27+
message = reader.receive_message()
2628
print("text", message.data.read().decode("utf-8"))
2729
reader.commit(message)
2830

2931

3032
def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
31-
for batch in reader.batches(max_messages=100, timeout=2):
33+
while True:
34+
batch = reader.receive_batch()
3235
for message in batch.messages:
3336
_process(message)
3437
reader.commit(batch)
3538

3639

3740
def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
38-
for batch in reader.batches(max_messages=100, timeout=2):
41+
while True:
42+
batch = reader.receive_batch()
3943
with reader.commit_on_exit(batch):
4044
for message in batch.messages:
4145
_process(message)
@@ -52,9 +56,12 @@ def get_message_with_timeout(reader: ydb.TopicReader):
5256

5357

5458
def get_all_messages_with_small_wait(reader: ydb.TopicReader):
55-
for message in reader.messages(timeout=1):
56-
_process(message)
57-
print("Have no new messages in a second")
59+
while True:
60+
try:
61+
message = reader.receive_message(timeout=1)
62+
_process(message)
63+
except TimeoutError:
64+
print("Have no new messages in a second")
5865

5966

6067
def get_a_message_from_external_loop(reader: ydb.TopicReader):
@@ -81,30 +88,23 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
8188
def auto_deserialize_message(db: ydb.Driver):
8289
# async, batch work similar to this
8390

84-
reader = ydb.TopicClient(db).reader(
91+
reader = db.topic_client.reader(
8592
"/database/topic/path", consumer="asd", deserializer=json.loads
8693
)
87-
for message in reader.messages():
94+
while True:
95+
message = reader.receive_message()
8896
print(
8997
message.data.Name
9098
) # message.data replaces by json.loads(message.data) of raw message
9199
reader.commit(message)
92100

93101

94-
def commit_batch_with_context(reader: ydb.TopicReader):
95-
for batch in reader.batches():
96-
with reader.commit_on_exit(batch):
97-
for message in batch.messages:
98-
if not batch.is_alive:
99-
break
100-
_process(message)
101-
102-
103102
def handle_partition_stop(reader: ydb.TopicReader):
104-
for message in reader.messages():
105-
time.sleep(1) # some work
103+
while True:
104+
message = reader.receive_message()
105+
time.sleep(123) # some work
106106
if message.is_alive:
107-
time.sleep(123) # some other work
107+
time.sleep(1) # some other work
108108
reader.commit(message)
109109

110110

@@ -118,25 +118,28 @@ def process_batch(batch):
118118
_process(message)
119119
reader.commit(batch)
120120

121-
for batch in reader.batches():
121+
while True:
122+
batch = reader.receive_batch()
122123
process_batch(batch)
123124

124125

125126
def connect_and_read_few_topics(db: ydb.Driver):
126-
with ydb.TopicClient(db).reader(
127+
with db.topic_client.reader(
127128
[
128129
"/database/topic/path",
129130
ydb.TopicSelector("/database/second-topic", partitions=3),
130131
]
131132
) as reader:
132-
for message in reader:
133+
while True:
134+
message = reader.receive_message()
133135
_process(message)
134136
reader.commit(message)
135137

136138

137139
def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
138140
# no special handle, but batch will contain less than prefer count messages
139-
for batch in reader.batches():
141+
while True:
142+
batch = reader.receive_batch()
140143
_process(batch)
141144
reader.commit(batch)
142145

@@ -146,10 +149,11 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
146149
print(event.topic)
147150
print(event.offset)
148151

149-
with ydb.TopicClient(db).reader(
152+
with db.topic_client.reader(
150153
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
151154
) as reader:
152-
for message in reader:
155+
while True:
156+
message = reader.receive_message()
153157
with reader.commit_on_exit(message):
154158
_process(message)
155159

@@ -164,12 +168,13 @@ def on_get_partition_start_offset(
164168
resp.start_offset = 123
165169
return resp
166170

167-
with ydb.TopicClient(db).reader(
171+
with db.topic_client.reader(
168172
"/local/test",
169173
consumer="consumer",
170174
on_get_partition_start_offset=on_get_partition_start_offset,
171175
) as reader:
172-
for mess in reader:
176+
while True:
177+
mess = reader.receive_message()
173178
_process(mess)
174179
# save progress to own database
175180

examples/topic/writer_async_example.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import datetime
23
import json
34
import time
45
from typing import Dict, List
@@ -8,23 +9,24 @@
89

910

1011
async def create_writer(db: ydb.aio.Driver):
11-
async with ydb.TopicClientAsyncIO(db).writer(
12+
async with db.topic_client.writer(
1213
"/database/topic/path",
1314
producer_id="producer-id",
1415
) as writer:
1516
await writer.write(TopicWriterMessage("asd"))
1617

1718

1819
async def connect_and_wait(db: ydb.aio.Driver):
19-
async with ydb.TopicClientAsyncIO(db).writer(
20+
async with db.topic_client.writer(
2021
"/database/topic/path",
2122
producer_id="producer-id",
2223
) as writer:
23-
writer.wait_init()
24+
info = await writer.wait_init()
25+
...
2426

2527

2628
async def connect_without_context_manager(db: ydb.aio.Driver):
27-
writer = ydb.TopicClientAsyncIO(db).writer(
29+
writer = db.topic_client.writer(
2830
"/database/topic/path",
2931
producer_id="producer-id",
3032
)
@@ -49,7 +51,7 @@ async def send_messages(writer: ydb.TopicWriterAsyncIO):
4951

5052
# with meta
5153
await writer.write(
52-
ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns())
54+
ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now())
5355
)
5456

5557

@@ -71,7 +73,7 @@ async def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
7173

7274
async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
7375
# future wait
74-
await writer.write_with_result(
76+
await writer.write_with_ack(
7577
[
7678
ydb.TopicWriterMessage("mess", seqno=1),
7779
ydb.TopicWriterMessage("mess", seqno=2),
@@ -84,10 +86,10 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
8486

8587

8688
async def send_json_message(db: ydb.aio.Driver):
87-
async with ydb.TopicClientAsyncIO(db).writer(
89+
async with db.topic_client.writer(
8890
"/database/path/topic", serializer=json.dumps
8991
) as writer:
90-
writer.write({"a": 123})
92+
await writer.write({"a": 123})
9193

9294

9395
async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAsyncIO):
@@ -99,14 +101,11 @@ async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAs
99101
async def send_messages_and_wait_all_commit_with_results(
100102
writer: ydb.TopicWriterAsyncIO,
101103
):
102-
last_future = None
103104
for i in range(10):
104105
content = "%s" % i
105-
last_future = await writer.write_with_ack(content)
106+
last_future = await writer.write(content)
106107

107-
await asyncio.wait(last_future)
108-
if last_future.exception() is not None:
109-
raise last_future.exception()
108+
await writer.flush()
110109

111110

112111
async def switch_messages_with_many_producers(
@@ -118,7 +117,7 @@ async def switch_messages_with_many_producers(
118117
# select writer for the msg
119118
writer_idx = msg[:1]
120119
writer = writers[writer_idx]
121-
future = await writer.write_with_ack(msg)
120+
future = await writer.write_with_ack_future(msg)
122121
futures.append(future)
123122

124123
# wait acks from all writes

examples/topic/writer_example.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import concurrent.futures
2+
import datetime
23
import json
34
import time
45
from typing import Dict, List
@@ -8,36 +9,36 @@
89
from ydb import TopicWriterMessage
910

1011

11-
async def connect():
12-
db = ydb.aio.Driver(
12+
def connect():
13+
db = ydb.Driver(
1314
connection_string="grpc://localhost:2135?database=/local",
1415
credentials=ydb.credentials.AnonymousCredentials(),
1516
)
16-
writer = ydb.TopicClientAsyncIO(db).writer(
17+
writer = db.topic_client.writer(
1718
"/local/topic",
1819
producer_id="producer-id",
1920
)
20-
await writer.write(TopicWriterMessage("asd"))
21+
writer.write(TopicWriterMessage("asd"))
2122

2223

2324
def create_writer(db: ydb.Driver):
24-
with ydb.TopicClient(db).writer(
25+
with db.topic_client.writer(
2526
"/database/topic/path",
2627
producer_id="producer-id",
2728
) as writer:
2829
writer.write(TopicWriterMessage("asd"))
2930

3031

3132
def connect_and_wait(db: ydb.Driver):
32-
with ydb.TopicClient(db).writer(
33+
with db.topic_client.writer(
3334
"/database/topic/path",
3435
producer_id="producer-id",
3536
) as writer:
36-
writer.wait()
37+
info = writer.wait_init()
3738

3839

3940
def connect_without_context_manager(db: ydb.Driver):
40-
writer = ydb.TopicClient(db).writer(
41+
writer = db.topic_client.writer(
4142
"/database/topic/path",
4243
producer_id="producer-id",
4344
)
@@ -61,7 +62,7 @@ def send_messages(writer: ydb.TopicWriter):
6162
) # send few messages by one call
6263

6364
# with meta
64-
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()))
65+
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()))
6566

6667

6768
def send_message_without_block_if_internal_buffer_is_full(
@@ -101,7 +102,7 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
101102

102103

103104
def send_json_message(db: ydb.Driver):
104-
with ydb.TopicClient(db).writer(
105+
with db.topic_client.writer(
105106
"/database/path/topic", serializer=json.dumps
106107
) as writer:
107108
writer.write({"a": 123})

0 commit comments

Comments
 (0)