Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supervising multiple consumers #35

Closed
desmondmonster opened this issue Sep 15, 2016 · 6 comments
Closed

Supervising multiple consumers #35

desmondmonster opened this issue Sep 15, 2016 · 6 comments

Comments

@desmondmonster
Copy link

Hi,

Our application has many consumers - one for each queue. We don't want to reopen a connection in each one (as per the example in the README) and think it'd be better to create a connection in one GenServer, then pass that in to many consumer GenServers and have them each open their own channel.

At the same time, we want to recover from failures both on the channel and the connection. Since we can't pass a conn around a supervision tree we'll have to do this manually, but in #31 you suggest not using Supervision at all because of the speedy connection retry. I think a channel failure should only require the consumer to be restarted, and a connection failure results in a new connection and a whole new set of consumers. Is there a good pattern for this problem?

@pma
Copy link
Owner

pma commented Sep 15, 2016

@desmondmonster I think a channel failure will only happen if the underlying connection is killed/closed.

One solution that comes to mind is:

Have a GenServer hold the single connection (let's call this the ConnectionManager) like you describe. This can implement the suggested connection retry mechanism that does not trigger a supervisor restart.

Define your consumers as separate GenServers that on start simply do a call to the ConnectionManager to request new channel. The ConnectionManager does not return the channel immediately, because it could still be trying to connect. Instead it will keep the consumer pid and do a GenServer.cast(consumer_pid, {:setup_channel, chan}). This callback can be triggered immediately if the conn is available/established or the consumer_pid can be saved in a list (in the ConnectionManager state) and triggered when the connection is finally reestablished.

In the handle_cast({:setup_channel, chan}, state) of the consumer you can do the normal exchange/queue declares, setup the bindings and start consuming.

You consumers can sit under a Supervisor and each monitors the provided channel pid. If the channel dies, the consumer handles the exit message and requests a new channel from the ConnectionManager.

Edit: One issue with the above is that if you consumer GenServer crashes, the channel would remain open. An option is to have the consumer link to (instead of monitoring) the channel. But this would just cause every consumer, channel and the connection to die if just on consumer crashes...

Another option is to have the ConnectionManager monitor the consumers (it knows about them when they request a channel) and keep a mapping between consumer and channel. If the consumer dies, the ConnectionManager can cleanup and close the channel.

@rafaeljesus
Copy link

@desmondmonster was that solved? Can you close the issue mate?

@pma
Copy link
Owner

pma commented Nov 25, 2016

@desmondmonster @rafaeljesus In this implementation of a gen_stage based interface (https://github.com/pma/wabbit/), a single connection can be reused by multiple consumers. It also handles reconnects. Can you give it a try?

@rafaeljesus
Copy link

@pma Yes of course, my application is going to be deployed next week, basically we have 6 consumers, once it is running on production I can add it into the ones we have less traffic for testing

@rafaeljesus
Copy link

@pma Are you using that in any production system of yours?

@desmondmonster
Copy link
Author

@pma @rafaeljesus We ended up with the following config:

A ConnectionManager genserver opens, maintains, and links to a connection. It starts and supervises the consumers, which are also genservers, who request a channel from the ConnectionManager when they start. The ConnectionManager maintains a mapping of opened channels to consumers so that if a consumer dies and restarts, it's given the same channel used by its predecessor. If a connection is unavailable during a channel request (eg it's still connecting to Rabbitmq) then the request is saved and processed once a connection becomes available. Consumers request channels asynchronously; they sit idle until they receive a channel.

The benefit of this supervision tree is that individual consumers can come and go without connection or channel churn - reconnection logic is only in the ConnectionManager. Channels are reused, if possible, and share a single connection. If either the ConnectionManager or its stored connection die, the ConnectionManager and all consumers are killed and restarted.

nb we found that a channel closing will not also bring down the connection.

We've been running this in production for a couple of months and it's worked pretty well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants