Skip to content

Commit

Permalink
feat: 添加节点参与投票异常监听器
Browse files Browse the repository at this point in the history
添加节点异常监听器,以便为用户提供自动加入集群的时机;
  • Loading branch information
zxuanhong committed Apr 10, 2024
1 parent 890033a commit 285715a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 5 deletions.
57 changes: 57 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,32 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
List<Replicator.ReplicatorStateListener> getReplicatorStatueListeners();

/**
* SOFAJRaft users can implement the NodeStateListener interface by themselves.
* So users can do their own logical operator in this listener when node error, destroyed or had some errors.
*
* @param nodeStateListener added NodeStateListener which is implemented by users.
*/
void addNodeStateListener(final Node.NodeStateListener nodeStateListener);

/**
* End User can remove their implement the NodeStateListener interface by themselves.
*
* @param nodeStateListener need to remove the NodeStateListener which has been added by users.
*/
void removeNodeStateListener(final Node.NodeStateListener nodeStateListener);

/**
* Remove all the NodeStateListener which have been added by users.
*/
void clearNodeStateListener();

/**
* Get the NodeStateListener which have been added by users.
* @return node's nodeStateListener which have been added by users.
*/
List<Node.NodeStateListener> getNodeStateListeners();

/**
* Get the node's target election priority value.
*
Expand Down Expand Up @@ -380,4 +406,35 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* @since 1.3.12
*/
long getLastAppliedLogIndex();

/**
* User can implement the NodeStateListener interface by themselves.
* So they can do some their own logic codes when node error, destroyed or had some errors.
*
* @author zxuanhong
*
* 2024-04-10 09:23
*/
interface NodeStateListener {

/**
* Called when this can't do preVote as it is not in conf
*
* @param nodeId current node id
* @param options current node options
*/
default void peerNotInConf(NodeId nodeId, NodeOptions options) {

}

/**
* Called when this PreVoteResponse no voting granted.
*
* @param nodeId current node id
* @param options current node options
*/
default void noVotingGranted(NodeId nodeId, NodeOptions options) {

}
}
}
39 changes: 34 additions & 5 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.stream.Collectors;

import com.alipay.sofa.jraft.rpc.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,9 +82,6 @@
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import com.alipay.sofa.jraft.option.SnapshotExecutorOptions;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RaftServerService;
import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse;
import com.alipay.sofa.jraft.rpc.RpcRequests.InstallSnapshotRequest;
Expand All @@ -94,8 +92,6 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.RequestVoteResponse;
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.LogStorage;
Expand Down Expand Up @@ -220,6 +216,10 @@ public class NodeImpl implements Node, RaftServerService {

/** ReplicatorStateListeners */
private final CopyOnWriteArrayList<Replicator.ReplicatorStateListener> replicatorStateListeners = new CopyOnWriteArrayList<>();

/** nodeStateListeners */
private final CopyOnWriteArrayList<Node.NodeStateListener> nodeStateListeners = new CopyOnWriteArrayList<>();

/** Node's target leader election priority value */
private volatile int targetPriority;
/** The number of elections time out for current node */
Expand Down Expand Up @@ -2628,6 +2628,10 @@ public void handlePreVoteResponse(final PeerId peerId, final long term, final Re
doUnlock = false;
electSelf();
}
}else{
for(Node.NodeStateListener listener : this.nodeStateListeners) {
RpcUtils.runInThread(() -> listener.noVotingGranted(getNodeId(), options));
}
}
} finally {
if (doUnlock) {
Expand Down Expand Up @@ -2674,6 +2678,9 @@ private void preVote() {
}
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
for(Node.NodeStateListener listener : this.nodeStateListeners) {
RpcUtils.runInThread(() -> listener.peerNotInConf(getNodeId(), options));
}
return;
}
oldTerm = this.currTerm;
Expand Down Expand Up @@ -3448,6 +3455,28 @@ public List<Replicator.ReplicatorStateListener> getReplicatorStatueListeners() {
return this.replicatorStateListeners;
}

@Override
public void addNodeStateListener(NodeStateListener nodeStateListener) {
Requires.requireNonNull(nodeStateListener, "nodeStateListener");
this.nodeStateListeners.add(nodeStateListener);
}

@Override
public void removeNodeStateListener(NodeStateListener nodeStateListener) {
Requires.requireNonNull(nodeStateListener, "nodeStateListener");
this.nodeStateListeners.remove(nodeStateListener);
}

@Override
public void clearNodeStateListener() {
this.nodeStateListeners.clear();
}

@Override
public List<NodeStateListener> getNodeStateListeners() {
return this.nodeStateListeners;
}

@Override
public int getNodeTargetPriority() {
return this.targetPriority;
Expand Down

0 comments on commit 285715a

Please sign in to comment.