Skip to content

Commit

Permalink
Fix #629: Adding subscriptions is very slow (#630)
Browse files Browse the repository at this point in the history
 - Sort CNode.children by token to change all searches to use Collections.binarySearch(). In this context Token is made Comparable.
- Changed inserts to respect sorting order
- Optimised away all instances that loop over all children: when deep diving into a tree to check if matches, instead of checking if any children matches, cjheck only in subtrees that matches `#` `+` or the exact token.
- Optimised away double lookup caused by "contains then get": all matchAnyChilder + childOf generated a double scan, one to check the presence the other to grab the instance. Moved to jus grab.
  • Loading branch information
hylkevds committed Jun 20, 2023
1 parent e33b7c9 commit 62cb2b3
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 98 deletions.
55 changes: 28 additions & 27 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

import java.util.*;

class CNode {
class CNode implements Comparable<CNode> {

private Token token;
private List<INode> children;
private final Token token;
private final List<INode> children;
Set<Subscription> subscriptions;

CNode() {
CNode(Token token) {
this.children = new ArrayList<>();
this.subscriptions = new HashSet<>();
this.token = token;
}

//Copy constructor
Expand All @@ -39,32 +40,21 @@ public Token getToken() {
return token;
}

public void setToken(Token token) {
this.token = token;
List<INode> allChildren() {
return new ArrayList<>(this.children);
}

boolean anyChildrenMatch(Token token) {
for (INode iNode : children) {
final CNode child = iNode.mainNode();
if (child.equalsToken(token)) {
return true;
}
Optional<INode> childOf(Token token) {
int idx = findIndexForToken(token);
if (idx < 0) {
return Optional.empty();
}
return false;
}

List<INode> allChildren() {
return this.children;
return Optional.of(children.get(idx));
}

INode childOf(Token token) {
for (INode iNode : children) {
final CNode child = iNode.mainNode();
if (child.equalsToken(token)) {
return iNode;
}
}
throw new IllegalArgumentException("Asked for a token that doesn't exists in any child [" + token + "]");
private int findIndexForToken(Token token) {
final INode tempTokenNode = new INode(new CNode(token));
return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token));
}

private boolean equalsToken(Token token) {
Expand All @@ -81,11 +71,17 @@ CNode copy() {
}

public void add(INode newINode) {
this.children.add(newINode);
int idx = findIndexForToken(newINode.mainNode().token);
if (idx < 0) {
children.add(-1 - idx, newINode);
} else {
children.add(idx, newINode);
}
}

public void remove(INode node) {
this.children.remove(node);
int idx = findIndexForToken(node.mainNode().token);
this.children.remove(idx);
}

CNode addSubscription(Subscription newSubscription) {
Expand Down Expand Up @@ -136,4 +132,9 @@ void removeSubscriptionsFor(String clientId) {
}
this.subscriptions.removeAll(toRemove);
}

@Override
public int compareTo(CNode o) {
return token.compareTo(o.token);
}
}
134 changes: 78 additions & 56 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ private enum Action {
INode root;

CTrie() {
final CNode mainNode = new CNode();
mainNode.setToken(ROOT);
final CNode mainNode = new CNode(ROOT);
this.root = new INode(mainNode);
}

Optional<CNode> lookup(Topic topic) {
INode inode = this.root;
Token token = topic.headToken();
while (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
while (!topic.isEmpty()) {
Optional<INode> child = inode.mainNode().childOf(token);
if (child.isEmpty()) {
break;
}
topic = topic.exceptHeadToken();
inode = inode.mainNode().childOf(token);
inode = child.get();
token = topic.headToken();
}
if (inode == null || !topic.isEmpty()) {
Expand Down Expand Up @@ -79,11 +82,24 @@ private Set<Subscription> recursiveMatch(Topic topic, INode inode) {
}
Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topic : topic.exceptHeadToken();
Set<Subscription> subscriptions = new HashSet<>();

// We should only consider the maximum three children children of
// type #, + or exact match
Optional<INode> subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
}
subInode = cnode.childOf(Token.SINGLE);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
}
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
} else {
subInode = cnode.childOf(remainingTopic.headToken());
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
}
}
return subscriptions;
}
Expand All @@ -96,34 +112,41 @@ public void addToTree(Subscription newSubscription) {
}

private Action insert(Topic topic, final INode inode, Subscription newSubscription) {
Token token = topic.headToken();
if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return insert(remainingTopic, nextInode, newSubscription);
} else {
if (topic.isEmpty()) {
return insertSubscription(inode, newSubscription);
} else {
return createNodeAndInsertSubscription(topic, inode, newSubscription);
final Token token = topic.headToken();
final CNode cnode = inode.mainNode();
if (!topic.isEmpty()) {
Optional<INode> nextInode = cnode.childOf(token);
if (nextInode.isPresent()) {
Topic remainingTopic = topic.exceptHeadToken();
return insert(remainingTopic, nextInode.get(), newSubscription);
}
}
if (topic.isEmpty()) {
return insertSubscription(inode, cnode, newSubscription);
} else {
return createNodeAndInsertSubscription(topic, inode, cnode, newSubscription);
}
}

private Action insertSubscription(INode inode, Subscription newSubscription) {
CNode cnode = inode.mainNode();
CNode updatedCnode = cnode.copy().addSubscription(newSubscription);
if (inode.compareAndSet(cnode, updatedCnode)) {
return Action.OK;
private Action insertSubscription(INode inode, CNode cnode, Subscription newSubscription) {
final CNode updatedCnode;
if (cnode instanceof TNode) {
updatedCnode = new CNode(cnode.getToken());
} else {
return Action.REPEAT;
updatedCnode = cnode.copy();
}
updatedCnode.addSubscription(newSubscription);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
}

private Action createNodeAndInsertSubscription(Topic topic, INode inode, Subscription newSubscription) {
INode newInode = createPathRec(topic, newSubscription);
CNode cnode = inode.mainNode();
CNode updatedCnode = cnode.copy();
private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, Subscription newSubscription) {
final INode newInode = createPathRec(topic, newSubscription);
final CNode updatedCnode;
if (cnode instanceof TNode) {
updatedCnode = new CNode(cnode.getToken());
} else {
updatedCnode = cnode.copy();
}
updatedCnode.add(newInode);

return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
Expand All @@ -133,8 +156,7 @@ private INode createPathRec(Topic topic, Subscription newSubscription) {
Topic remainingTopic = topic.exceptHeadToken();
if (!remainingTopic.isEmpty()) {
INode inode = createPathRec(remainingTopic, newSubscription);
CNode cnode = new CNode();
cnode.setToken(topic.headToken());
CNode cnode = new CNode(topic.headToken());
cnode.add(inode);
return new INode(cnode);
} else {
Expand All @@ -143,8 +165,7 @@ private INode createPathRec(Topic topic, Subscription newSubscription) {
}

private INode createLeafNodes(Token token, Subscription newSubscription) {
CNode newLeafCnode = new CNode();
newLeafCnode.setToken(token);
CNode newLeafCnode = new CNode(token);
newLeafCnode.addSubscription(newSubscription);

return new INode(newLeafCnode);
Expand All @@ -159,33 +180,34 @@ public void removeFromTree(Topic topic, String clientID) {

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
final CNode cnode = inode.mainNode();
if (!topic.isEmpty()) {
Optional<INode> nextInode = cnode.childOf(token);
if (nextInode.isPresent()) {
Topic remainingTopic = topic.exceptHeadToken();
return remove(clientId, remainingTopic, nextInode.get(), inode);
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode(cnode.getToken());
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
}

Expand Down
18 changes: 5 additions & 13 deletions broker/src/main/java/io/moquette/broker/subscriptions/TNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
*/
package io.moquette.broker.subscriptions;

import java.util.Optional;

class TNode extends CNode {

@Override
public Token getToken() {
throw new IllegalStateException("Can't be invoked on TNode");
public TNode(Token token) {
super(token);
}

@Override
public void setToken(Token token) {
throw new IllegalStateException("Can't be invoked on TNode");
}

@Override
INode childOf(Token token) {
Optional<INode> childOf(Token token) {
throw new IllegalStateException("Can't be invoked on TNode");
}

Expand Down Expand Up @@ -62,8 +58,4 @@ void removeSubscriptionsFor(String clientId) {
throw new IllegalStateException("Can't be invoked on TNode");
}

@Override
boolean anyChildrenMatch(Token token) {
return false;
}
}
16 changes: 15 additions & 1 deletion broker/src/main/java/io/moquette/broker/subscriptions/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* Internal use only class.
*/
public class Token {
public class Token implements Comparable<Token> {

static final Token EMPTY = new Token("");
static final Token MULTI = new Token("#");
Expand Down Expand Up @@ -72,4 +72,18 @@ public boolean equals(Object obj) {
public String toString() {
return name;
}

@Override
public int compareTo(Token other) {
if (name == null) {
if (other.name == null) {
return 0;
}
return 1;
}
if (other.name == null) {
return -1;
}
return name.compareTo(other.name);
}
}
Loading

0 comments on commit 62cb2b3

Please sign in to comment.