-
Notifications
You must be signed in to change notification settings - Fork 813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #629: Adding subscriptions is very slow #630
Fix #629: Adding subscriptions is very slow #630
Conversation
hylkevds
commented
Sep 15, 2021
•
edited
Loading
edited
- Sort CNode.children by token
- made Token comparable
- made CNode.token final
- Changed all searches to use Collections.binarySearch()
- Changed inserts to respect sorting order
- Optimised away all instances that loop over all children
- Optimised away double lookup caused by "contains then get"
2f77a27
to
a816d01
Compare
This not only massively speeds up adding subscriptions, also matching publishes to subscriptions is much more efficient when there are many topics with subscriptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea to switch to binarySearch is good, however, due to the facts that some optimizations are not easy to grasp, maybe each one could be separated in its own PS, describing well the optimization and why.
I would appreciate if the code could be accompanied with a test that with a huge set of sibling subscriptions shows the slowness. For example an insert or a search that fails after 5 minutes.
In this way just switching the linear scan to binary search shows that time is reduced a lot.
@@ -162,7 +162,7 @@ void onePublishTriggerManySubscriptionsNotifications() throws MqttException, Int | |||
} | |||
|
|||
private void segmentedParallelSubscriptions(BiConsumer<IMqttAsyncClient, IMqttActionListener> biConsumer) throws InterruptedException { | |||
int openSlotCount = COMMAND_QUEUE_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the reason why of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timing...
Because the code is now faster, it can flood the queue since each action generates more than one command on the Queue.
It used to work because the slowness of the insert delayed the creation of commands enough for the worker threads to catch up.
It surprised me too, but you should try it out for yourself :)
broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Outdated
Show resolved
Hide resolved
if (idx < 0) | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In previous code when childOf
wasn't able to find an INode associated to a Token it raised an IllegalArgumentException, I don't understand the reason to move to a null return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's part of the "optimised away double lookup" commit. Often the code did a anyChildrenMatch()
followed by a childOf()
meaning it did the expensive lookup twice. That's really inefficient of course. So to remove this superfluous anyChildrenMatch()
call, we make childOf()
return null if there is no such child. We could do an Optional
, but since it's not a public interface that seemed redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going step by step is much easier and now I can see the motivation of the changes. Maybe a more descriptive comment for the "Optimised away all instances that loop over all children" case. Something like:
Not all next level children has to be verified recursively, but only the multi level any (#) the single level one (+) or the only matching node with the next head token part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can update the commit message with a longer explanation.
broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Outdated
Show resolved
Hide resolved
That's what the separate commits are for. Separating those into their own PRs makes no sense, since they don't work separately. Just go through the separate commits. You can even check them out individually with GIT to try them out locally.
That should be possible :) |
511b07e
to
4059f20
Compare
I've added two tests that together run in ~4 seconds with the new code on my machine, but time out after 5 minutes with the old code. |
4570ad8
to
49da768
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank's @hylkevds for your patience. The PR is pretty good, just a couple of final touches.
} | ||
int idx = findIndexForToken(token); | ||
if (idx < 0) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think would better to be more explicit. null
is speedy but could drive us to the NPE. I think would be better to explicitly return an Optional
, it's designed for cases such this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No Problem, updated!
@@ -0,0 +1,118 @@ | |||
/* | |||
* Copyright (c) 2012-2018 The original author or authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Copyright (c) 2012-2018 The original author or authors | |
* Copyright (c) 2012-2023 The original author or authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I've updated it.
In FROST-Server we're now using spotless-maven-plugin to automatically check the licence header and code formatting. When something isn't how it should be it can also automatically apply the corrections.
for (int b = 0; b < TOTAL_SUBSCRIPTIONS / 10; b++) { | ||
for (int a = 0; a < 10; a++) { | ||
count++; | ||
results.add(clientSubOnTopic("Client-" + a, "mainTopic-" + b)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int b = 0; b < TOTAL_SUBSCRIPTIONS / 10; b++) { | |
for (int a = 0; a < 10; a++) { | |
count++; | |
results.add(clientSubOnTopic("Client-" + a, "mainTopic-" + b)); | |
} | |
} | |
for (int v = 0; topicIdx < TOTAL_SUBSCRIPTIONS / 10; topicIdx++) { | |
for (int clientIdx = 0; clientIdx < 10; clientIdx++) { | |
count++; | |
results.add(clientSubOnTopic("Client-" + clientIdx, "mainTopic-" + topicIdx)); | |
} | |
} |
Instead of a
and b
, using something more expressive would help readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, no problem. Also for the other method.
for (int a = 0; a < countPerLevel; a++) { | ||
for (int b = 0; b < countPerLevel; b++) { | ||
for (int c = 0; c < countPerLevel; c++) { | ||
for (int d = 0; d < countPerLevel; d++) { | ||
count++; | ||
results.add(clientSubOnTopic("Client-" + a, "mainTopic-" + b + "/subTopic-" + c + "/subSubTopic" + d)); | ||
if (count >= TOTAL_SUBSCRIPTIONS) { | ||
break outerloop; | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int a = 0; a < countPerLevel; a++) { | |
for (int b = 0; b < countPerLevel; b++) { | |
for (int c = 0; c < countPerLevel; c++) { | |
for (int d = 0; d < countPerLevel; d++) { | |
count++; | |
results.add(clientSubOnTopic("Client-" + a, "mainTopic-" + b + "/subTopic-" + c + "/subSubTopic" + d)); | |
if (count >= TOTAL_SUBSCRIPTIONS) { | |
break outerloop; | |
} | |
} | |
} | |
} | |
} | |
for (int clientIdx = 0; clientIdx < countPerLevel; clientIdx++) { | |
for (int rootLayerIdx = 0; rootLayerIdx < countPerLevel; rootLayerIdx++) { | |
for (int firstLayerIdx = 0; firstLayerIdx < countPerLevel; firstLayerIdx++) { | |
for (int secondLayerIdx = 0; secondLayerIdx < countPerLevel; secondLayerIdx++) { | |
count++; | |
results.add(clientSubOnTopic("Client-" + clientIdx, "mainTopic-" + rootLayerIdx + "/subTopic-" + firstLayerIdx + "/subSubTopic" + secondLayerIdx)); | |
if (count >= TOTAL_SUBSCRIPTIONS) { | |
break outerloop; | |
} | |
} | |
} | |
} | |
} |
The same spirit could be applied here, with better naming could increase readability
49da768
to
e46fb2d
Compare
Only 3 children can possibly match: +, # and the exact match. Instead of looping over all children trying to mach each, we specifically fetch these three and only handle them.
The calls to anyChildrenMatch are always followed by a call to childOf.
When updating we must first get the cnode, then analyse how to do the update and then compareAndSet the new cnode. If we first analyse and then get the cnode, the cnode may have changed in the meantime, and the analysis result may no longer be valid.
Moquette also generates commands on the queue internally, so pushing too many concurrent subscriptions can overflow the queue.
e46fb2d
to
c24d92b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work LGTM!