Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] [client] Add two compnents to support imprecise priority messages #20

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

poorbarcode
Copy link
Member

@poorbarcode poorbarcode commented May 4, 2023

Motivation

Requirement where some messages are classified in to high priority and some messages are classified as low priority. They experience very high traffic on low priority messages compare to high priority messages. But we want their consumers to process the high priority messages first than lower priority messages as high priority messages are time sensitive.


Modifications

Note:

  • Batch send is not supported
  • The mode Key_Shared is not supported.

We define multiple partitions of a topic as high-priority and low-priority partitions, which can be defined by PriorityDefinition. And Each message is marked with a priority by property MSG_PROP_PRIORITY when it is sent.

    /** Which partitions are declared in each priority. **/
    private final Map<Integer/* priority */, List<Integer>/* partitions */> mapping = new TreeMap<>();
    @Getter
    /** The priority of partition which is not defined in {@link #mapping}. **/
    private final int defaultPriority;
    /** Total partition count of the topic. **/
    private final int partitionCount;

The new component PriorityMessageRouter will send the message to the specified partition according to the rules defined in PriorityDefinition. It will calculate the priority by the property MSG_PROP_PRIORITY of each message, and if there has no property in the message, it will use the default priority.


The new component PriorityConsumerInterceptor will collect the consumers created by the client for each partition when receiving messages, calculate the priority of the message through the property MSG_PROP_PRIORITY, then marks the priority of each consumer through its first message, and pause the low priority consumer to make the high priority message to be processed faster.

Here's how it works: If the high-priority partition has many messages, it suspends receiving messages from the low-priority partition. If some messages with low priority have already been received in the memory, they will be consumed as
high-level priority.

Note: If the priority of a partition is changed, the change takes effect only client restarts after the old messages are consumed.


Sample: see PriorityMessageSample.


Documentation

Need to update docs?

  • doc-required
  • no-need-doc
  • doc

@poorbarcode poorbarcode requested a review from a team as a code owner May 4, 2023 16:07
@poorbarcode poorbarcode self-assigned this May 7, 2023
Comment on lines +109 to +126
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
triggerPauseOrResume();
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
triggerPauseOrResume();
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) {
triggerPauseOrResume();
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> messageIds) {
triggerPauseOrResume();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this solution, if users won't call ack or nack for many messages. The priority delivery will not be applied, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this solution, if users won't call ack or nack for many messages. The priority delivery will not be applied, right?

Yes. We can explain this scenario in other words: the messages received in the first round will not be prioritized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants