Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack messages with multiple connections #44

Closed
thijsterlouw opened this issue Sep 3, 2015 · 16 comments
Closed

Ack messages with multiple connections #44

thijsterlouw opened this issue Sep 3, 2015 · 16 comments

Comments

@thijsterlouw
Copy link

Usecase: my Python application is connecting to multiple RabbitMQ servers. These are non-clustered. Our events are sharded across these servers. We want to consume events from all these servers. These events need to be ack'ed.

Problem:
channel.basic_consume(queue_name, callback=callback)
callback looks like:
def callback(body, envelope, properties):
thus inside the callback you have no idea from which server you received the event.
Ticket #39 suggests creating a separate channel to ack. Problem I don't know which server to create the channel for.

Initially I thought I just create a lambda for the callback and bind the ack-channel, but then I found:
ff74ee7
So that's probably not going to work out. Wonder why this change was made anyway.

I'm pretty new to Python, so I am not sure what I'm supposed to do in this case. Should I wrap these functions in objects and can the callback function point to a member-function of the object? The object would then have access to the server/channel, thus we're able to send an ack.

@dzen
Copy link
Contributor

dzen commented Sep 3, 2015

Hello @thijsterlouw,

Thank you for reporting your use case. This issue really shows that we didn't solve the problem correctly. For instance we should probably pass the channel instance to ack this specific message.
Problem is: this channel is currently overflooded by incomming message and won't send any message to the server :/

@thijsterlouw
Copy link
Author

Hi @dzen , thanks for your quick reply. I've created a little bit of Python code to test the idea, and it seems wrapping my code in Objects would be a good way to go forward. That way I can pass my context around in the objects and have the object just create a reply channel to ack the messages (like #39 suggested)

import asyncio


def tester():
    print('tester')
    x = MyClass(1)
    y = MyClass(2)
    print('created classes')
    print(asyncio.iscoroutinefunction(x.f))
    val_x = yield from x.f()
    val_y = yield from y.f()
    print('x: {0}, y: {1}'.format(val_x, val_y))


class MyClass:
    def __init__(self, i):
        self.i = i

    @asyncio.coroutine
    def f(self):
        return 'hello world {0}'.format(self.i)


if __name__ == '__main__':
    print('main')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(tester())
    loop.run_forever()
    loop.stop()

@thijsterlouw
Copy link
Author

I have everything setup nicely. One instance of a class for each server, which wraps the callback and provides the necessary context. Acking now works great (even when using the same channel). So for me: problem solved :)

Initially I had a small application bug where I didn't ack invalid events, but that was easily fixed.

@dzen
Copy link
Contributor

dzen commented Sep 29, 2015

Hi @thijsterlouw,

I tried a little thing: the callback is converted as a Task, and the channel instance is passed to the callback. This way it's possible to ack the message from the coroutine called when a message is received. The only problem is that we will create a Task for each received message. The eventloop could be very BIG if your coroutine is slow.

I've got a publisher which publish at 20k msg/s. My computer goes very slow :(

@dzen
Copy link
Contributor

dzen commented Sep 29, 2015

screenshot from 2015-09-29 18 15 49

@dzen
Copy link
Contributor

dzen commented Sep 30, 2015

@mpaolini hello. Any thought on this PR / Issue ?

@mpaolini
Copy link
Contributor

mpaolini commented Oct 1, 2015

@dzen will have a look later this weekend

@dzen
Copy link
Contributor

dzen commented Oct 13, 2015

Hello @thijsterlouw,

Did you tried this branch against your code ?

beware, the definition of your callback should be like:

@asyncio.coroutine
def callback(channel, body, envelope, properties):
    # stuff
    yield from channel.basic_client_ack(envelope.delivery_tag)

@ObjReponse
Copy link

Hi
I tried out that branch and I have a question. How to perform BASIC_ACK, because basic_client_ack and basic_server_ack both did not work?

@dzen
Copy link
Contributor

dzen commented Oct 31, 2015

do you have any problem using basic_client_ack ? what's the error generated ?

@ObjReponse
Copy link

Yes I have, it does not work. No error, no ack, task still in queue.

@dzen
Copy link
Contributor

dzen commented Oct 31, 2015

Can you paste some code ?

@ObjReponse
Copy link

@dzen sorry for missleading you, I forgot about yield from and when use yield from ch.basic_client_ask... it is working now. sorry again.

@dzen
Copy link
Contributor

dzen commented Oct 31, 2015

No problem. For now I didn't wrote enough documentation. It's planned

@dzen
Copy link
Contributor

dzen commented Dec 15, 2015

hello @thijsterlouw . Did you try the latest release ? (0.5.1, since we got a packaging problem) ?

@dzen
Copy link
Contributor

dzen commented Dec 19, 2015

@thijsterlouw you now have the channel handle to ack a message correctly.

@dzen dzen closed this as completed Dec 19, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants