Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for :amqp_channel.register_confirm_handler(channel, confirm_handler) #41

Closed
miwee opened this issue Oct 24, 2016 · 10 comments · Fixed by #125
Closed

support for :amqp_channel.register_confirm_handler(channel, confirm_handler) #41

miwee opened this issue Oct 24, 2016 · 10 comments · Fixed by #125

Comments

@miwee
Copy link

miwee commented Oct 24, 2016

The only facility in Elixir amqp client for receiving confirms is the blocking calls Confirm.wait**** and Confirm.wait****die. Registration of a confirm_handler process is async way of processing.

@lucacorti
Copy link
Contributor

lucacorti commented Oct 4, 2017

This makes currently impossible to return publish errors in a GenServer.call for unroutable messages with the mandatory flag set since basic.return is delivered before basic.ack

Adding Confirm.confirm/Confirm.confirm_cancel to register a confirm handler and receiving basic.acks directly could be a way to detect and handling the resulting errors in a GenServer.
This was actually implemented in pr #8, but the register_confirm_handler part was not merged.

Another could be to make Confirm.wait_for_confirms/1 take into account basic.return received before basic.ack. pika does this.

Any reason neither is implemented? Is either option worth considering for inclusion? I'd be keen to write a PR for either.

@ono
Copy link
Collaborator

ono commented Oct 5, 2017

My comment on Basic.return PR explains my thoughts on this too. TL; DR - PR is welcome :)

I think naming them Confirm.confirm and Confirm.cancel_confirm is the straight forward and you can follow #63 for the implementation.

@lucacorti
Copy link
Contributor

lucacorti commented Oct 5, 2017

Thanks for the feedback. I have put together an implementation based off #63. However there is an issue with message ordering.

This is a catch all handle_info logging in my GenServer after calling Confirm.confirm(channel, self()) and Basic.return(channel, self()):

[warn] MESSAGE: {:basic_return, "test", %{app_id: :undefined, cluster_id: :undefined, content_encoding: :undefined, content_type: :undefined, correlation_id
: :undefined, exchange: "sms", expiration: :undefined, headers: :undefined, message_id: :undefined, persistent: true, priority: :undefined, reply_code: 312,
 reply_text: "NO_ROUTE", reply_to: :undefined, routing_key: "dlr", timestamp: :undefined, type: :undefined, user_id: :undefined}}
[warn] MESSAGE: {:basic_ack, 1, false}
[warn] MESSAGE: {:basic_return, "test", %{app_id: :undefined, cluster_id: :undefined, content_encoding: :undefined, content_type: :undefined, correl
ation_id: :undefined, exchange: "sms", expiration: :undefined, headers: :undefined, message_id: :undefined, persistent: true, priority: :undefined, reply_co
de: 312, reply_text: "NO_ROUTE", reply_to: :undefined, routing_key: "dlr", timestamp: :undefined, type: :undefined, user_id: :undefined}}
[warn] MESSAGE: {:basic_ack, 2, false}
[warn] MESSAGE: {:basic_ack, 3, false}
[warn] MESSAGE: {:basic_return, "test", %{app_id: :undefined, cluster_id: :undefined, content_encoding: :undefined, content_type: :undefined, correlation_id
: :undefined, exchange: "sms", expiration: :undefined, headers: :undefined, message_id: :undefined, persistent: true, priority: :undefined, reply_code: 312, reply_text: "NO_ROUTE", reply_to: :undefined, routing_key: "dlr", timestamp: :undefined, type: :undefined, user_id: :undefined}}
[warn] MESSAGE: {:basic_return, "test", %{app_id: :undefined, cluster_id: :undefined, content_encoding: :undefined, content_type: :undefined, correl
ation_id: :undefined, exchange: "sms", expiration: :undefined, headers: :undefined, message_id: :undefined, persistent: true, priority: :undefined, reply_co
de: 312, reply_text: "NO_ROUTE", reply_to: :undefined, routing_key: "dlr", timestamp: :undefined, type: :undefined, user_id: :undefined}}
[warn] MESSAGE: {:basic_ack, 4, false}

For this feature to be useful basic.return needs to be always received before basic.ack to be able to tell if the message was correctly routed. This is what rabbitmq does, but the current process spawning logic does not preserve message ordering, while registering my process handlers directly with amqp_client for return/ack seems to respect correct ordering:

[warn] MESSAGE: {{:"basic.return", 312, "NO_ROUTE", "sms", "dlr"}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 2, :undefined, :undefi
ned, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "send"}}
[warn] MESSAGE: {:"basic.ack", 1, false}
[warn] MESSAGE: {{:"basic.return", 312, "NO_ROUTE", "sms", "dlr"}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 2, :undefined, :undefi
ned, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "send"}}
[warn] MESSAGE: {:"basic.ack", 2, false}
[warn] MESSAGE: {{:"basic.return", 312, "NO_ROUTE", "sms", "dlr"}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 2, :undefined, :undefi
ned, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "send"}}
[warn] MESSAGE: {:"basic.ack", 3, false}

So, this implementation is not working. Do you think it is acceptable to simplify implementation by just registering for confirm/return the calling process directly? Otherwise I guess we need to spawn a single process handling both confirm and returns. Do you have better ideas?

@ono
Copy link
Collaborator

ono commented Oct 5, 2017

For this feature to be useful basic.return needs to be always received before basic.ack to be able to tell if the message was correctly routed.

I probably need to understand more details about the use case. Why can't they be independent? I believe you can write the logic that doesn't rely on the order?

with amqp_client for return/ack seems to respect correct ordering

That might be true most of cases but writing code that relies on the assumption is dangerous as amqp_client doesn't guarantee the order is always like that.

Return handler on AMQP indeed has a overhead(one send plus Record to Map conversion) but that should be discussed separately from the issue.

@lucacorti
Copy link
Contributor

lucacorti commented Oct 5, 2017

My use case is returning errors when publishing unroutable messages with the mandatory flag set on a channel is in confirm mode. In this case the error condition is signalled to the publisher by sending basic.return before basic.ack as per AMQP spec.

I agree overhead is a different problem, my only concern at the moment is message ordering. In this case the ack must come after the return. I think amqp_client has to guarantee this ordering, since this behaviour is mandated by AMQP and is the only way to detect this error condition AFAIU.

The section on confirm here explains the logic: https://www.rabbitmq.com/amqp-0-9-1-reference.html

@ono
Copy link
Collaborator

ono commented Oct 5, 2017

Could you link to the source which says basic.return should come before basic.ack on protocol specification (not implementation)?

If you can also point to the code amqp_client guarantees the order, it will be also great. I suppose ack and return would be handled by separate processes. If amqp_client guarantees the order 100%, there must be a logic to synchronise the messages. Don't worry if you can't. I will look into it when I have time but it might take sometime.

@lucacorti
Copy link
Contributor

lucacorti commented Oct 5, 2017

Sorry, publisher confirms are a RabbitMQ extension, so this behaviour (basic.return before basic.ack to signal unroutable messages after basic.select) is guaranteed by RabbitMQ not AMQP.

Looking at amqp_client source, it looks like the channel is handled by a GenServer which registers callbacks in handle_info for both return and ack, so ordering should be guaranteed: https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_channel.erl

@ono
Copy link
Collaborator

ono commented Oct 5, 2017

OK, I think we can define the problem like this?

"If a process receives messages from a same channel, the order of the messages in the channel should be preserved."

Yeah, the current implementation of Basic.return can break this because of the extra process in the middle. That's totally a fair point.

Let me think about a solution. (note to myself - Basic.consume has has a similar issue)

@lucacorti
Copy link
Contributor

Yes, I guess that's the point. The first solutions which come to my mind are directly registering the caller to handlers or using a single process to register all messages coming from the same channel, including Basic.consume I guess.

Thanks a lot for your time and thinking this through 👍

@ono
Copy link
Collaborator

ono commented Apr 17, 2019

This should have been addressed 1.2.0-rc.0. Thanks!

@ono ono closed this as completed Apr 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants