Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong order of callbacks #90

Closed
nxsofsys opened this issue Sep 18, 2018 · 7 comments
Closed

Wrong order of callbacks #90

nxsofsys opened this issue Sep 18, 2018 · 7 comments

Comments

@nxsofsys
Copy link

nxsofsys commented Sep 18, 2018

Hello,

NATS documentation says, that messages from one publisher are received in the same order as they were published. But if we make few subscriptions, and publish messages for each one few times, callbacks will be called in different order. This is because each subscriber has own message queue. Here is example:

receive.py

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):

    await nc.connect("nats://nats:4222", loop=loop)

    async def message_handler_A(msg):
        print('message_handler_A')

    async def message_handler_B(msg):
        print('message_handler_B')

    async def message_handler_C(msg):
        print('message_handler_C')

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

publish.py:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://nats:4222", loop=loop)
    for i in range(10):
      await nc.publish("message_handler_B", b"")
      await nc.publish("message_handler_C", b"")
      await nc.publish("message_handler_A", b"")

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Just run publish.py after receive.py.

@wallyqs
Copy link
Member

wallyqs commented Sep 18, 2018

Protocol wise the message will be delivered with respective to the order in which the messages were published and asyncio will dispatch for the callback that is ready.

In your example the B messages will be delivered before C and A. If the callbacks relinquish control so that other tasks can process messages then you would see that the messages are processed in that order:

import asyncio
import logging
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://demo.nats.io:4222", loop=loop)

    async def message_handler_A(msg):
        print(f'message_handler_A: {msg}')
        await asyncio.sleep(0)

    async def message_handler_B(msg):
        print(f'message_handler_B: {msg}')
        await asyncio.sleep(0)

    async def message_handler_C(msg):
        print(f'message_handler_C: {msg}')
        await asyncio.sleep(0)

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
message_handler_B: <Msg: subject='message_handler_B' reply='' data='0...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='0...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='0...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='1...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='1...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='1...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='2...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='2...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='2...'>

@nxsofsys
Copy link
Author

nxsofsys commented Sep 18, 2018

But if we add one more sleep to one of callbacks, situation changes:

import asyncio
import logging
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://nats:4222", loop=loop)

    async def message_handler_A(msg):
        print(f'message_handler_A')
        await asyncio.sleep(0)

    async def message_handler_B(msg):
        print(f'message_handler_B')
        await asyncio.sleep(0)
        await asyncio.sleep(0)

    async def message_handler_C(msg):
        print(f'message_handler_C')
        await asyncio.sleep(0)

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

then log looks like:

message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_B
message_handler_B
message_handler_B
message_handler_B

This is more common situation because callbacks in real case have more returns into loop.

@wallyqs
Copy link
Member

wallyqs commented Sep 18, 2018

Right, because when the B handler starts processing the messages it will continue to do so until it the event loop allows other callback to execute (https://github.com/nats-io/asyncio-nats/blob/master/nats/aio/client.py#L529-L549). But internally the messages have been received in the same order as the source, if you relinquishing control as part of the callback using await asyncio.sleep(0) then you can opt to allow other callbacks to be able to have time to process the messages.

@nxsofsys
Copy link
Author

nxsofsys commented Sep 18, 2018

I found, that socket receives messages in right order, but in real case callbacks comes in random order, and it completely unusable to publish messages with different topics in loop - for example if I have two subjects - create_user and set_user_password - then often second topic comes before first, and trying to setup password to user which not created yet. Btw, for same subject callbacks always work in right order.

@wallyqs
Copy link
Member

wallyqs commented Sep 18, 2018

For that type of usage I'd recommend using request/response functionality from the client as it would give you a stronger ordering guarantee and better decoupling with the subscribers, sharing an example below:

Subscribers:

import asyncio
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("demo.nats.io", loop=loop)

    async def create_user(msg):
        print(f'create_user      : {msg}')
        await nc.publish(msg.reply, b'user created')

    async def set_user_password(msg):
        print(f'set_user_password: {msg}')
        await nc.publish(msg.reply, b'password set')

    await nc.subscribe("create_user", cb=create_user)
    await nc.subscribe("set_user_password", cb=set_user_password)

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Publish:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("demo.nats.io", loop=loop)

    for i in range(10):
        response = await nc.request("create_user", f"user:user-{i}".encode())
        print(f"Result [{i}]: {response.data.decode()}")
        response = await nc.request("set_user_password", f"password:foo".encode())
        print(f"Result [{i}]: {response.data.decode()}")

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Result:

Result [0]: user created
Result [0]: password set
Result [1]: user created
Result [1]: password set
Result [2]: user created
Result [2]: password set
...

@nxsofsys
Copy link
Author

I've found that asyncio.Lock doing job. I added lock around callbacks which should run in ordered way.

@wallyqs
Copy link
Member

wallyqs commented Sep 19, 2018

Closing as that is how the client is intended to work right now (similar behavior as in the Go client).

@wallyqs wallyqs closed this as completed Sep 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants