-
Notifications
You must be signed in to change notification settings - Fork 4k
Add rabbit_queue_type behaviour #2052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Great idea 👍 |
a2376e7 to
cc8393e
Compare
And use the implementing module as the value of the amqqueue record `type` field. This will allow for easy dispatch to the queue type implementation.
First step to a greater abstraction!
cc8393e to
5277cbb
Compare
To a more generic an extensible opaque queue type specific map.
src/amqqueue.erl
Outdated
| -spec get_quorum_nodes(amqqueue()) -> [node()]. | ||
| -spec get_type_state(amqqueue()) -> map(). | ||
| get_type_state(#amqqueue{type_state = TState}) -> TState; | ||
| get_type_state(_) -> []. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type_state is documented as a map. Why default to empty list here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch - that is what it returned when it was just quorum nodes - the value is separated by so much whitespace it's quite hard to read :) - will fix
| %% the assumption here is that the queue state has been cleaned up and | ||
| %% this is a residual Ra notification | ||
| noreply_coalesce(State0) | ||
| handle_info({ra_event, From, _} = Evt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very generic 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no it isn't - we can consider at some point make all queues send messages with a particular format - we'd have to make a change in Ra to allow custom ra event formatting to do so though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to convert the Ra-specific message to something more generic here as another clause, like:
handle_info({ra_event, From, Content}, ...) ->
handle_info({generic_event, From, Content}, ...);
handle_info({generic_event, From, _} = Evt, ...) ->
...| State = #ch{unconfirmed = UC, | ||
| queue_names = QNames}) | ||
| queue_states = QStates}) | ||
| when ?IS_CLASSIC(QPid) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These macros are not generic either. Should we have a rabbit_queue_type:needs_handle_publishing_queue_down or something similar?
The same applies to all other ocurrences of this and the ?IS_QUORUM macro. Now that we have types they should disappear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree and to do that we'd have to move all the confirm accounting inside the queue type module. I change I found a bit scary at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So having pondered this a bit more the current scope of the queue type abstraction isn't complete, it would need to handle confirms, downs etc as well to be considered so. That said I think at this point it achieves one goal: it makes it easier for us to deliver new queue types. The secondary goal of completing the queue type abstraction and making it easier for us to write new channel processes is one we can choose to defer at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the abstraction does not need to be complete right away. What I would like us to keep in mind however, is the API we expose and the plugin ecosystem outside of our team: it would be nice that a plugin developer does not have to maintain and/or publish one version of his plugin per RabbitMQ branch.
All mirrors need to be present as queue names. This introduces context linking allowing additional queue refs to be linked to a single "master" queue ref contining the actual queue context.
Also update queue type smoke test to use a cluster by default.
dumbbell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a first batch of comments.
I didn't finish to review the patch yet: I'm at a park and I've just been surrounded by two dozens kids, like they believe it's a good idea to sit on the same bench as me and shout and so on... Relocating to a more peaceful place!
| ((?is_amqqueue_v2(Q) andalso | ||
| ?amqqueue_v2_field_type(Q) =:= classic) orelse | ||
| ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse | ||
| ?is_amqqueue_v1(Q))). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To follow up on Diana's comments, we can probably replace those functions & associated macros by something more generic. @lukebakken did change the initial generic functions/macros to those specific more readable versions. @lukebakken: I think you linked an article about that kind of API but I don't rember the content, what was the reason behind this already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My goal was to put all "knowledge" of how to determine if an amqqueue is classic into this macro. This way, when the identification of a classic queue changes, we only have to change this macro and not all the places where it is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Tell Don't Ask"
| -spec get_leader(amqqueue_v2()) -> node(). | ||
|
|
||
| get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader. | ||
| get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the "PID" be an opaque type as well?
Again, that "leader" thing is very quorum queues/Ra specific.
| %% messages pending consumer acknowledgement | ||
| unacked_message_q, | ||
| %% a map of queue ref to queue name | ||
| queue_names, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that the #ch{} record is never exchanged with remote nodes or used as a Mnesia table schema?
| %% the assumption here is that the queue state has been cleaned up and | ||
| %% this is a residual Ra notification | ||
| noreply_coalesce(State0) | ||
| handle_info({ra_event, From, _} = Evt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to convert the Ra-specific message to something more generic here as another clause, like:
handle_info({ra_event, From, Content}, ...) ->
handle_info({generic_event, From, Content}, ...);
handle_info({generic_event, From, _} = Evt, ...) ->
...| State = #ch{unconfirmed = UC, | ||
| queue_names = QNames}) | ||
| queue_states = QStates}) | ||
| when ?IS_CLASSIC(QPid) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the abstraction does not need to be complete right away. What I would like us to keep in mind however, is the API we expose and the plugin ecosystem outside of our team: it would be nice that a plugin developer does not have to maintain and/or publish one version of his plugin per RabbitMQ branch.
| {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> | ||
| case catch basic_consume( %% [0] | ||
| rabbit_log:info("recovering consumer ~p", [CTag]), | ||
| case catch basic_consume( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The %% [0] comment callout is removed: is the associated comment irrelevant now? In which case it should be removed as well.
| self(), Acc0) | ||
| fun({QRef, CTag}, MsgIds, Acc0) -> | ||
| rabbit_queue_type:reject(QRef, CTag, Requeue, | ||
| MsgIds, Acc0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Picking a random line for a more general comment: I find the rabbit_queue_type name a bit confusing. All functions such as reject() act on a queue, not a queue type; e.g. the line above wants to reject a message in a queue, not a queue type (which makes little sense).
Did you think about the simpler rabbit_queue name already?
dumbbell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, second half done. I mostly focused on the API, not on the actual changes to channels and other RabbitMQ internals, as I don't really master them.
| queue_state()) -> | ||
| {ok, queue_state(), actions()} | {error, term()} | eol. | ||
|
|
||
| -callback deliver([{amqqueue:amqqueue(), queue_state()}], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a bit more thoughts, I find this callback (and all others working on a list of queues) a bit more weird in fact compared to last week:
- As I said, from an API PoV, this looks inconsistent: some callbacks take a list of queues, others a single queue, and the name doesn't reflect this change of behavior.
- Should the queues in that list share the same queue type? What if a queue type implementation finds a queue from a different queue type in the middle of that list?
It seems to me that this feature should be outside of the new behavior and it should call an actual deliver() callback which takes a queue as argument. Again, my comment applies to other callbacks such as recover().
| queue_state(). | ||
|
|
||
| -callback dequeue(NoAck :: boolean(), LimiterPid :: pid(), | ||
| rabbit_types:ctag(), queue_state()) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the ctag third here?
|
More work is needed to make this PR fit for purpose. The confirm logic needs to be moved inside the queue type abstraction and DOWN events need to be passed down to each queue type so that they can mutate their state and run logic as appropriate. #2062 has been created to ensure the breaking changes to the amqqueue record are included in 3.8 allowing us to work on this further and include it in a future release. |
First cut queue type behaviour
Uses the implementing module as the value of the amqqueue record
typefield. This will allow for easy dispatch to the queue typeimplementation.
Provides a stateful API around queue interactions. Currently the classic queue type will need some special handling when implementing processes that interact with queues but this should simplify this as well as the development of new queue types.