/
drain_sub.py
47 lines (33 loc) · 1.05 KB
/
drain_sub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# [begin drain_sub]
import asyncio
from nats.aio.client import Client as NATS
async def example(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def handler(msg):
print("[Received] ", msg)
await nc.publish(msg.reply, b'I can help')
# Can check whether client is in draining state
if nc.is_draining:
print("Connection is draining")
sid = await nc.subscribe("help", "workers", cb=handler)
await nc.flush()
# Gracefully unsubscribe the subscription
await nc.drain(sid)
# [end drain_sub]
requests = []
for i in range(0, 100):
request = nc.request("help", b'help!', timeout=1)
requests.append(request)
# Wait for all the responses
try:
responses = []
responses = await asyncio.gather(*requests)
except:
pass
print("Received {} responses".format(len(responses)))
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(example(loop))
loop.close()