Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling shutdown reason in direct_consumer, and clean up consumer afterwards. #184

Merged
merged 3 commits into from Mar 1, 2021

Conversation

benonymus
Copy link
Contributor

Hey there,

We encountered countless :consumer_died error logs in our test cases, and this pr aims to rectify that, and additionally to fix another possible headache for other users with disorderly shutdowns.

The changes are that we handle :shutdown and :normal exit reasons to avoid messy error logs, and return nil as the consumer. Now we also need to handle that, so that no message is being forwarded anymore in case if the channel was not shut down, if you were to try to send messages then we return an error.

I hope this makes sense.

@ono
Copy link
Collaborator

ono commented Feb 17, 2021

Hi @benonymus. Could you share a sample test case which reproduces the error? Thanks!

@benonymus
Copy link
Contributor Author

@ono Hey, it is hard to reproduce, because it doesn't actually cause problems.
Just spams error logs.

I think I saw someone else mentioning this as well, I will try to find it

@ono
Copy link
Collaborator

ono commented Feb 17, 2021

Cool. yeah, it will be great if you can create a sample project or unit test that spams error logs. By the way, channel is closed explicitly with AMQP.Channel.close/1?

https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl#L23-L29

@haljin
Copy link
Contributor

haljin commented Feb 17, 2021

Hi @ono

The way the channels are handled inside amqp_client is what is causing a bit of a problem here. When you start a new process that then calls AMQP.Channel.open with a DirectConsumer you get the following situation:

(amqp_channel_sup)
 |                         |
 |                 (amqp_channel)
 |
(amqp_gen_consumer) -  -  -  [monitor] -  -  -  -  -  -  - (your process)

So the gen_consumer, executing the DirectConsumer code actually monitors your process. Now if your process decides to call AMQP.Channel.close and then shuts down (so e.g. you do this in GenServers terminate callback) this sometimes causes a race condition. The channel is being shut down but your process has gone down first, so the amqp_gen_consumer gets a DOWN signal from that and handles it before the channel has shut down (which will lead to amqp_channel_sup to shut down and take amqp_gen_consumer with it). That results in an error log that's coming from DirectConsumer here:

  def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
    {:ok, consumer}
  end

  def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do
    {:error, {:consumer_died, info}, consumer}
  end

because the info is :shutdown and not :normal. It can be reproduced by creating a simple GenServer that opens a channel in init and then closes it in terminate but it's not 100% reliable as it is a race condition inside the amqp_client lib.

@ono
Copy link
Collaborator

ono commented Feb 24, 2021

Hey @haljin. Thanks for the detailed explanation!

My thoughts:

  • the user should explicitly close the channel as it's recommended here. Maybe we should highlight that in our documentation?
  • in the unexpected situation and the consumer dies, it still makes sense to down DirectConsumer too. I also hesitate to change its behaviour from Erlang library

There is also related discussion in #186.

@haljin
Copy link
Contributor

haljin commented Feb 24, 2021

@ono But in the race condition I described the user does explicitly close the channel, that's what causes the race condition. In the unexpected situation, the DOWN signal from the process will not have reasons :normal nor :shutdown so that case is handled.

@ono
Copy link
Collaborator

ono commented Feb 24, 2021

@haljin

are you explaining a same thing @benonymus is experiencing and solving to fix here? or expanding the topic? I'd like to make sure if we want to fix countless error logs or a race condition here.

I think user's consumer process shouldn't be linked to any processes when using DirectConsumer. That will avoid the race condition?

@haljin
Copy link
Contributor

haljin commented Feb 25, 2021

@ono Please re-read my original description. The countless errors logs are caused by a race condition between the amqp_channel closing (because amqp_channel:close was called explicitly by the user process) and the user process itself closing.

The consumer process is only linked to the DirectConsumer itself as it should be.

@ono
Copy link
Collaborator

ono commented Feb 25, 2021

Don't worry - I understand that. I wanted to check if @benonymus is experiencing this despite he closes the channel explicitly on their test cases.

Let's use a simple test case as an example:

  test "simple test" do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})

    on_exit(fn ->
      AMQP.Channel.close(channel)
    end)
  end

Unfortunately :amqp_channel.close/1 is async so the DOWN message can arrive to DirectConsumer if the test suite shutdown the test process before channel is shut down properly. I managed to reproduce it only once so I am not sure if it happens countlessly but I guess the root cause is same anyway.

You can avoid the race condition with the following:

  def ensure_close(%{pid: pid} = chan, retry \\ 0) do
    if Process.alive?(pid) && retry < 100 do
      :timer.sleep(10)
      ensure_close(chan, retry + 1)
    else
      :ok
    end
  end

  test "avoid a race condition" do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})

    on_exit(fn ->
      AMQP.Channel.close(channel)
      ensure_close(channel)
    end)
  end

I understand it's tedious but I hesitate to change the current behaviour as it can give a side effect to other users. However we can enhance the module.

What do you guys think about adding an option?

AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), [monitor: false]}})

If the monitor option is given and it is false, DirectConsumer would act as the proposed behaviour.

@haljin
Copy link
Contributor

haljin commented Feb 25, 2021

Maybe we should've clarified that @benonymus is on my team and we have been working on this together. :)

I think a flag could be a good solution, although I think it should be called something like ignore_shutdown as the DirectConsumer always monitors the user process (and it's a very bad idea for it not to), it's just it should handle the :shutdown reason in the same way as :normal. The additional safeguard of closing the consumer if a message is received, but the consumer is nil (as it's gone now) should make this recover if something goes wrong, as Rabbit will just retransmit the message.

@ono
Copy link
Collaborator

ono commented Feb 25, 2021

Got it. Yeah, getting back to the initial discussion point - handling :shutdown with a same way it does for :normal makes sense to me. The user consumer process was intentionally killed so DirectConsumer shouldn't react.

However I would keep the behaviour as similar as possible with amqp_client version so let's use the option for now. I like ignore_shutdown 👍 We can change the state like %{consumer: pid, ignore_shutdown: boolean} instead of pid.

@benonymus
Copy link
Contributor Author

Hey @ono I can confirm that me and @haljin are indeed working in the same team, and describing the same problem - solution.

@benonymus
Copy link
Contributor Author

benonymus commented Feb 25, 2021

Hey @ono I pushed the updated state.
It works exactly like you recommended just with the flag name suggested by @haljin.

AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), [ignore_shutdown: true]}})

Copy link
Collaborator

@ono ono left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @benonymus and @haljin. After a bit more thinking, I am thinking of changing the option like below.

opts = [
  on_consumer_down: [
    cascade: true,
    ignore: [:normal, :shutdown]
  ]
]

with cascade: false, it simply ignores consumer down for any reasons.

Don't worry to make the further changes though. I will work on them with a separate PR after merging this PR.

Don't worry about the code review comments either - I can fix them on my PR for the option change at the same time.

@@ -61,7 +61,7 @@ defmodule ConnectionTest do

test "open connection with uri, name, and options (deprected but still spported)" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')
Connection.open("amqp://nonexistent:5672", name: "my-connection", host: 'localhost')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended. Testing a deprecated option for backward compatibility.

# there's no support for direct connection
# this callback implementation should be added with library support
{:error, :undefined}
end

@impl true
def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
{:ok, consumer}
def handle_info({:DOWN, _mref, :process, state, reason}, %{ignore_shutdown: true} = _state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to drop the pattern match for consumer pid here and other places.

@benonymus
Copy link
Contributor Author

Hey @ono thanks, sounds good to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants