Skip to content

Commit

Permalink
Fixed concurrency issue due to get-after-analyse
Browse files Browse the repository at this point in the history
When updating we must first get the cnode, then analyse how to do the
update and then compareAndSet the new cnode. If we first analyse and
then get the cnode, the cnode may have changed in the meantime, and the
analysis result may no longer be valid.
  • Loading branch information
hylkevds committed May 28, 2023
1 parent e24c808 commit a816d01
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,35 +112,30 @@ public void addToTree(Subscription newSubscription) {
}

private Action insert(Topic topic, final INode inode, Subscription newSubscription) {
Token token = topic.headToken();
final Token token = topic.headToken();
final CNode cnode = inode.mainNode();
if (!topic.isEmpty()) {
INode nextInode = inode.mainNode().childOf(token);
INode nextInode = cnode.childOf(token);
if (nextInode != null) {
Topic remainingTopic = topic.exceptHeadToken();
return insert(remainingTopic, nextInode, newSubscription);
}
}
if (topic.isEmpty()) {
return insertSubscription(inode, newSubscription);
return insertSubscription(inode, cnode, newSubscription);
} else {
return createNodeAndInsertSubscription(topic, inode, newSubscription);
return createNodeAndInsertSubscription(topic, inode, cnode, newSubscription);
}
}

private Action insertSubscription(INode inode, Subscription newSubscription) {
CNode cnode = inode.mainNode();
CNode updatedCnode = cnode.copy().addSubscription(newSubscription);
if (inode.compareAndSet(cnode, updatedCnode)) {
return Action.OK;
} else {
return Action.REPEAT;
}
private Action insertSubscription(INode inode, CNode cnode, Subscription newSubscription) {
final CNode updatedCnode = cnode.copy().addSubscription(newSubscription);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
}

private Action createNodeAndInsertSubscription(Topic topic, INode inode, Subscription newSubscription) {
INode newInode = createPathRec(topic, newSubscription);
CNode cnode = inode.mainNode();
CNode updatedCnode = cnode.copy();
private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, Subscription newSubscription) {
final INode newInode = createPathRec(topic, newSubscription);
final CNode updatedCnode = cnode.copy();
updatedCnode.add(newInode);

return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
Expand Down Expand Up @@ -174,14 +169,14 @@ public void removeFromTree(Topic topic, String clientID) {

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
final CNode cnode = inode.mainNode();
if (!topic.isEmpty()) {
INode nextInode = inode.mainNode().childOf(token);
INode nextInode = cnode.childOf(token);
if (nextInode != null) {
Topic remainingTopic = topic.exceptHeadToken();
return remove(clientId, remainingTopic, nextInode, inode);
}
}
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
Expand Down

0 comments on commit a816d01

Please sign in to comment.