Skip to content

Commit

Permalink
Add async/await syntax example, update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldemar Quevedo committed May 10, 2016
1 parent ea6b579 commit 70ffd5a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 5 deletions.
52 changes: 52 additions & 0 deletions examples/basic-async-await.py
@@ -0,0 +1,52 @@
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
nc = NATS()

await nc.connect(io_loop=loop)

async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))

# Simple publisher and async subscriber via coroutine.
sid = await nc.subscribe("foo", cb=message_handler)

# Stop receiving after 2 messages.
await nc.auto_unsubscribe(sid, 2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')

async def help_request(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
await nc.publish(reply, b'I can help')

# Use queue named 'workers' for distributing requests
# among subscribers.
await nc.subscribe("help", "workers", help_request)

# Send a request and expect a single response
# and trigger timeout if not faster than 50 ms.
try:
response = await nc.timed_request("help", b'help me', 0.050)
print("Received response: {message}".format(message=response.data.decode()))
except ErrTimeout:
print("Request timed out")

await asyncio.sleep(1, loop=loop)
await nc.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
74 changes: 69 additions & 5 deletions readme.md
Expand Up @@ -32,15 +32,15 @@ def run(loop):
yield from nc.connect(io_loop=loop)

@asyncio.coroutine
def foo_subscription(msg):
def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))

# Simple publisher and async subscriber via coroutine.
sid = yield from nc.subscribe("foo", cb=foo_subscription)
sid = yield from nc.subscribe("foo", cb=message_handler)

# Stop receiving after 2 messages.
yield from nc.auto_unsubscribe(sid, 2)
Expand Down Expand Up @@ -193,6 +193,10 @@ def run(loop):
"io_loop": loop,
}

# Will try to connect to servers in order of configuration,
# by defaults it connect to one in the pool randomly.
options["dont_randomize"] = True

# Optionally set reconnect wait and max reconnect attempts.
# This example means 10 seconds total per backend.
options["max_reconnect_attempts"] = 5
Expand All @@ -219,6 +223,10 @@ def run(loop):
def closed_cb():
print("Connection is closed")

@asyncio.coroutine
def subscribe_handler(msg):
print("Got message: ", msg.subject, msg.reply, msg.data)

# Setup callbacks to be notified when there is an error
# or connection is closed.
options["error_cb"] = error_cb
Expand All @@ -232,9 +240,9 @@ def run(loop):
return

if nc.is_connected:
yield from nc.subscribe("help.*")
yield from nc.subscribe("help.*", cb=subscribe_handler)

max_messages = 1000
max_messages = 1000000
start_time = datetime.now()
print("Sending {} messages to NATS...".format(max_messages))

Expand All @@ -246,7 +254,6 @@ def run(loop):
print("Connection closed prematurely.")
break
except ErrTimeout as e:
# Can occur during while reconnecting...
print("Timeout occured when publishing msg i={}: {}".format(i, e))

end_time = datetime.now()
Expand All @@ -269,6 +276,63 @@ if __name__ == '__main__':
loop.close()
```

## Async/Await example (Python 3.5 only)

```python
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
nc = NATS()

await nc.connect(io_loop=loop)

async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))

# Simple publisher and async subscriber via coroutine.
sid = await nc.subscribe("foo", cb=message_handler)

# Stop receiving after 2 messages.
await nc.auto_unsubscribe(sid, 2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')

async def help_request(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
await nc.publish(reply, b'I can help')

# Use queue named 'workers' for distributing requests
# among subscribers.
await nc.subscribe("help", "workers", help_request)

# Send a request and expect a single response
# and trigger timeout if not faster than 50 ms.
try:
response = await nc.timed_request("help", b'help me', 0.050)
print("Received response: {message}".format(message=response.data.decode()))
except ErrTimeout:
print("Request timed out")

await asyncio.sleep(1, loop=loop)
await nc.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
```

## License

(The MIT License)
Expand Down

0 comments on commit 70ffd5a

Please sign in to comment.