Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of a subscriber on an infinite loop #12

Closed
yanpozka opened this issue Jun 27, 2018 · 5 comments
Closed

Example of a subscriber on an infinite loop #12

yanpozka opened this issue Jun 27, 2018 · 5 comments

Comments

@yanpozka
Copy link

yanpozka commented Jun 27, 2018

Hi thanks for publishing a Python client for nats-streaming, results that asyncio is relative new for a lot of Python developers (like me) Is possible to add an example with a subscriber listening messages forever e.i. inside of an infinite loop or something equivalent on the asyncio way ? I want to have a service to listening updates from NATS.

Generic examples:
https://gist.github.com/Integralist/6f34e23f71340a1a23e846cd2f64cf32
https://tutorialedge.net/python/concurrency/asyncio-event-loops-tutorial/

Thanks in advance!

@wallyqs
Copy link
Member

wallyqs commented Jun 27, 2018

Thanks for the feedback I will add an example that does exactly that. Sharing them first here below:

Example consumer receiving messages forever:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # Start session with NATS Streaming cluster using
    # the established NATS connection.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-123", nats=nc)

    # Example async subscriber
    async def cb(msg):
        print("Received a message (seq={}): {}".format(msg.seq, msg.data))

    # Subscribe to get all messages from the beginning.
    await sc.subscribe("greetings", start_at='first', cb=cb)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Example producer sending messages forever:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # First connect to NATS, then start session with NATS Streaming.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-456", nats=nc)

    # Periodically send a message
    while True:
        await sc.publish("greetings", b'Hello World!')
        await asyncio.sleep(1, loop=loop)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

@yanpozka
Copy link
Author

@wallyqs thanks a lot for your prompt response! I've tried your Example consumer receiving messages forever but it's not working, python client is trying to subscribe and unsubscribe a lot of times (probably coz the infinite loop), we can see the six tries of subscriptions on nats server logs:

2018/06/28 23:10:35.286569 [DBG] 172.18.0.1:43394 - cid:7 - Deferring actual UNSUB(_INBOX.lqc06jDlxe63OS3rz3jN8O): 1 max, 0 received
2018/06/28 23:10:35.286581 [TRC] 172.18.0.1:43394 - cid:7 - ->> [PUB _STAN.close.FUlqxXDk0C3F29nclnZ5Kh _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286585 [TRC] 172.18.0.1:43394 - cid:7 - ->> MSG_PAYLOAD: [python-client-andry]
2018/06/28 23:10:35.286590 [TRC] 127.0.0.1:40514 - cid:2 - <<- [MSG _STAN.close.FUlqxXDk0C3F29nclnZ5Kh 7 _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286657 [TRC] 127.0.0.1:40514 - cid:2 - ->> [PUB _INBOX.lqc06jDlxe63OS3rz3jN8O 0]
2018/06/28 23:10:35.286664 [TRC] 127.0.0.1:40514 - cid:2 - ->> MSG_PAYLOAD: []
2018/06/28 23:10:35.286672 [DBG] 127.0.0.1:40514 - cid:2 - Auto-unsubscribe limit of 1 reached for sid '6'
2018/06/28 23:10:35.286676 [TRC] 172.18.0.1:43394 - cid:7 - <<- [MSG _INBOX.lqc06jDlxe63OS3rz3jN8O 6 0]
2018/06/28 23:10:35.286681 [TRC] 172.18.0.1:43394 - cid:7 - <-> [DELSUB 6]
2018/06/28 23:10:35.286960 [DBG] 172.18.0.1:43394 - cid:7 - Client connection closed

Any idea of how to solve this? Thanks in advance

@wallyqs
Copy link
Member

wallyqs commented Jun 29, 2018

@yanpozka which versions of the asyncio-nats-client and asyncio-nats-streaming clients are you using? Sharing a gif below of how it works if using v0.7.0 version of the NATS client and latest asyncio-nats-streaming. Are you not receiving the messages in the consumer client?

asyncio-nats

@yanpozka
Copy link
Author

yanpozka commented Jun 29, 2018

@wallyqs I'm using the latest version of server and client, I found the problem I was closing the connections at the end of run() method, it's working 👍 Thank you very much for your help!

@Olshansk
Copy link

Olshansk commented Jun 9, 2019

@wallyqs If the nats server were to restart while the client is running, how can the client become aware of that and reconnect?

It seems like there is already a heartbeat mechanism implement (https://github.com/nats-io/stan.py/blob/d5858b91501fc067ace49da6ea2bcde6ea1da7f5/stan/aio/client.py), but I believe I might be missing something to make use of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants