Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cancel specific consumer #191

Closed
wants to merge 5 commits into from

Conversation

rhsobr
Copy link
Contributor

@rhsobr rhsobr commented Oct 3, 2021

Possibility to cancel specific consumer after its setup.

Provides more granularity than cancelAll function.

Usage example:

let cancelConsumer = await channel.consume(queueName, (msg) => {
    //your code
}, { prefetch: 10 });

await cancelConsumer()

cancelConsumer = await channel.consume(queueName, (msg) => {
    //your code
}, { prefetch: 100 });

process.on('SIGTERM', () => cancelConsumer())

This PR also:

  • includes await when prefetch is setted up

src/ChannelWrapper.ts Outdated Show resolved Hide resolved
@rhsobr rhsobr requested a review from luddd3 January 6, 2022 21:55
@luddd3
Copy link
Collaborator

luddd3 commented Jan 7, 2022

@jwalton What do you think?

@jwalton
Copy link
Owner

jwalton commented Jan 7, 2022

The one thing I don't like about this is that our signature for consume now differs from amqp.consume. Might be better to make this channel.cancel(consumerTag) like amqp does it.

@luddd3
Copy link
Collaborator

luddd3 commented Jan 7, 2022

I agree

@rhsobr
Copy link
Contributor Author

rhsobr commented Jan 24, 2022

Sorry, guys. I did't undestand it.

Do you mean channel.consume should always return the consumerTag?
Then, should the developer call channel.cancel when it is appropriate?

@jwalton
Copy link
Owner

jwalton commented Jan 24, 2022

This library is meant to be a thin-wrapper around amqplib which provides automatic reconnects. So whenever possible, our API should be identical to amqplib's API; that way you can just change your import from ampplib to amqp-connection-manager and it should "just work". Or as close to that as we can get.

So, since ampqlib has a channel.canel(consumerTag), we should have the same for cancelling. And ideally, our consume() would return Promise<{ consumerTag: string; }>, since this is what ampqlib's Channel.consume() returns... unfortunately, we can't return the consumerTag, because we don't know what it is - it gets generated when the consumer is actually bound, and could change on a reconnect.

So I think we have two options here; first we can make it so if you want to cancel a consumer, you have to explicitly provide a consumerTag to us, rather than trying to get the generated one back. This is pretty easy to do.

The second option would be that we internally generate a unique consumerTag when we call into ampqlib.consume() (if one is not provided), and then pass this back. Then we have a "static" consumerTag we can pass again when we next reconnect. I suspect consumer tags have to be globally unique, so we'd need to make sure if we go down this road that our consumerTag doesn't conflict with consumerTags created by other clients running on other instances - maybe a UUID?

Actually; a third option which might be best; we could generate a "synthetic" consumerTag locally, and then keep a map of our synthetic tags to real tags. So when someone calls consume we'd generate a tag like "consumerTag-1", and then when we actually bind the consumer we'd set this.syntheticConsumerTags["consumerTag-1"] = realConsumerTag. Then when we want to cancel we can lookup the real consumer tag in this.syntheticConsumerTags. This is really nice, because then our consume() can return a consumerTag to the client. I'm not sure how many other places consumerTag is used and we might have to translate back and forth, though...

@luddd3
Copy link
Collaborator

luddd3 commented Jan 24, 2022 via email

@jwalton
Copy link
Owner

jwalton commented Jan 24, 2022

About the consumerTag, we can also do what the OP did before. That is to
remember the consumerTag that RabbitMq generates on first connect and then
reuse it for reconnects.

That would work, although our consume() would be unable to return a consumerTag if we're disconnected when it is called. You wouldn't be able to cancel a consumer until it connects at least once.

@luddd3
Copy link
Collaborator

luddd3 commented Jan 25, 2022 via email

@luddd3
Copy link
Collaborator

luddd3 commented Jan 26, 2022

I did a commit with a proposed solution. I use 16 random bytes instead of an actual uuid (same entropy) to create the consumer tag.
luddd3/node-amqp-connection-manager@5f3b2eb

@jwalton
Copy link
Owner

jwalton commented Feb 1, 2022

@luddd3 your solution looks good to me. :shipit:

@rhsobr
Copy link
Contributor Author

rhsobr commented Feb 1, 2022

Great @luddd3. I think your commit is good.

Closed this one.

@rhsobr rhsobr closed this Feb 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants