Skip to content

Commit

Permalink
custom thread pool (#855)
Browse files Browse the repository at this point in the history
* custom thread pool, todo: DefaultFixedThreadsExecutorGroupFactory.toString

* format

* refactor

* add ThreadGroup

* add ThreadGroup

* Utils.runInThread

* Utils.runClosureInThread(com.alipay.sofa.jraft.Closure, com.alipay.sofa.jraft.Status)

* Utils.runClosureInThread(com.alipay.sofa.jraft.Closure)

* format

* unit test

* unit test

* unit test

* unit test

* unit test

* unit test

* format

* format

* format

* use global threadpool to send msg

* use global threadpool to send msg

* format

add ThreadGroup

* unit test

* use global threadpool to send msg

* refactor bad smell

* refactor bad smell

* fix groupid is null

* fix groupid is null

* NodeTest.assertReadIndex try-finally

* fix bad smell, imports layout

* format code and fix bad smell

Co-authored-by: tynan.liu <tynan.liu@xinjifamily.com>
  • Loading branch information
shihuili1218 and tynan.liu committed Jul 14, 2022
1 parent 03b579d commit cebea9c
Show file tree
Hide file tree
Showing 47 changed files with 708 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static RaftTimerFactory raftTimerFactory() {
* @return true if bootstrap success
*/
public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException {
final NodeImpl node = new NodeImpl();
final NodeImpl node = new NodeImpl(opts.getGroupId(), null);
final boolean ret = node.bootstrap(opts);
node.shutdown();
node.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType)
/**
* Wait the peer catchup.
*/
boolean waitCaughtUp(final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);
boolean waitCaughtUp(final String groupId, final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);

/**
* Get peer's last rpc send timestamp (monotonic time in milliseconds).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;

/**
* Closure queue implementation.
Expand All @@ -42,6 +42,7 @@ public class ClosureQueueImpl implements ClosureQueue {

private static final Logger LOG = LoggerFactory.getLogger(ClosureQueueImpl.class);

private String groupId;
private final Lock lock;
private long firstIndex;
private LinkedList<Closure> queue;
Expand All @@ -63,6 +64,11 @@ public ClosureQueueImpl() {
this.queue = new LinkedList<>();
}

public ClosureQueueImpl(final String groupId) {
this();
this.groupId = groupId;
}

@Override
public void clear() {
List<Closure> savedQueue;
Expand All @@ -76,7 +82,7 @@ public void clear() {
}

final Status status = new Status(RaftError.EPERM, "Leader stepped down");
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : savedQueue) {
if (done != null) {
done.run(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -222,7 +223,8 @@ public synchronized void shutdown() {
if (this.taskQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(() -> this.taskQueue.publishEvent((task, sequence) -> {

ThreadPoolsFactory.runInThread(getNode().getGroupId(), () -> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
Expand Down Expand Up @@ -468,6 +470,7 @@ public boolean isRunningOnFSMThread() {
return Thread.currentThread() == fsmThread;
}

@Override
public void onSnapshotSaveSync(SaveSnapshotClosure done) {
ApplyTask task = new ApplyTask();
task.type = TaskType.SNAPSHOT_SAVE;
Expand Down Expand Up @@ -595,8 +598,8 @@ private void doSnapshotSave(final SaveSnapshotClosure done) {
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s",
lastAppliedIndex));
ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
for (final PeerId peer : confEntry.getConf()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicLong;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
Expand All @@ -29,6 +28,7 @@
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;

/**
Expand Down Expand Up @@ -134,7 +134,7 @@ protected void runTheRestClosureWithError() {
Requires.requireNonNull(this.error, "error");
Requires.requireNonNull(this.error.getStatus(), "error.status");
final Status status = this.error.getStatus();
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.fsmCaller.getNode().getGroupId(), done, status);
}
}
}
Expand Down
60 changes: 36 additions & 24 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;
Expand Down Expand Up @@ -363,13 +364,15 @@ public ConfigurationCtx(final NodeImpl node) {
void start(final Configuration oldConf, final Configuration newConf, final Closure done) {
if (isBusy()) {
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Already in busy stage."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EBUSY,
"Already in busy stage."));
}
throw new IllegalStateException("Busy stage");
}
if (this.done != null) {
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Already have done closure."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EINVAL,
"Already have done closure."));
}
throw new IllegalArgumentException("Already have done closure");
}
Expand Down Expand Up @@ -403,8 +406,8 @@ private void addNewPeers(final Configuration adding) {
}
final OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version);
final long dueTime = Utils.nowMs() + this.node.options.getElectionTimeoutMs();
if (!this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime,
caughtUp)) {
if (!this.node.replicatorGroup.waitCaughtUp(this.node.getGroupId(), newPeer,
this.node.options.getCatchupMargin(), dueTime, caughtUp)) {
LOG.error("Node {} waitCaughtUp, peer={}.", this.node.getNodeId(), newPeer);
onCaughtUp(this.version, newPeer, false);
return;
Expand Down Expand Up @@ -463,8 +466,8 @@ void reset(final Status st) {
this.stage = Stage.STAGE_NONE;
this.nchanges = 0;
if (this.done != null) {
Utils.runClosureInThread(this.done, st != null ? st : new Status(RaftError.EPERM,
"Leader stepped down."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), this.done, st != null ? st : new Status(
RaftError.EPERM, "Leader stepped down."));
this.done = null;
}
}
Expand Down Expand Up @@ -577,6 +580,7 @@ private boolean initLogStorage() {
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
this.logManager = new LogManagerImpl();
final LogManagerOptions opts = new LogManagerOptions();
opts.setGroupId(this.groupId);
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
Expand Down Expand Up @@ -610,7 +614,7 @@ private void handleSnapshotTimeout() {
this.writeLock.unlock();
}
// do_snapshot in another thread to avoid blocking the timer thread.
Utils.runInThread(() -> doSnapshot(null, false));
ThreadPoolsFactory.runInThread(this.groupId, () -> doSnapshot(null, false));
}

private void handleElectionTimeout() {
Expand Down Expand Up @@ -754,7 +758,7 @@ private boolean initFSMCaller(final LogId bootstrapId) {
LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId);
return false;
}
this.closureQueue = new ClosureQueueImpl();
this.closureQueue = new ClosureQueueImpl(this.groupId);
final FSMCallerOptions opts = new FSMCallerOptions();
opts.setAfterShutdown(status -> afterShutdown());
opts.setLogManager(this.logManager);
Expand Down Expand Up @@ -906,6 +910,10 @@ public boolean init(final NodeOptions opts) {
return false;
}

if (this.options.getAppendEntriesExecutors() == null) {
this.options.setAppendEntriesExecutors(Utils.getDefaultAppendEntriesExecutor());
}

this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(),
this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");

Expand Down Expand Up @@ -1044,7 +1052,7 @@ protected int adjustTimeout(final int timeoutMs) {

// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it
this.replicatorGroup = new ReplicatorGroupImpl();
this.rpcService = new DefaultRaftClientService(this.replicatorGroup);
this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors());
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
Expand Down Expand Up @@ -1365,7 +1373,7 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
final List<Closure> dones = tasks.stream().map(ele -> ele.done)
.filter(Objects::nonNull).collect(Collectors.toList());
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : dones) {
done.run(st);
}
Expand All @@ -1381,14 +1389,14 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, st);
task.reset();
}
continue;
}
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
task.reset();
continue;
}
Expand Down Expand Up @@ -1428,7 +1436,8 @@ public JRaftServiceFactory getServiceFactory() {
@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.ENODESHUTDOWN,
"Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
Expand Down Expand Up @@ -1603,7 +1612,7 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.
@Override
public void apply(final Task task) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(task, "Null task");
Expand All @@ -1626,7 +1635,7 @@ public void apply(final Task task) {
default:
if (!this.applyQueue.tryPublishEvent(translator)) {
String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
Utils.runClosureInThread(task.getDone(),
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(),
new Status(RaftError.EBUSY, errorMsg));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
Expand Down Expand Up @@ -2176,7 +2185,8 @@ private void onCaughtUp(final PeerId peer, final long term, final long version,
LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer);
final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version);
final long dueTime = Utils.nowMs() + this.options.getElectionTimeoutMs();
if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) {
if (this.replicatorGroup.waitCaughtUp(this.groupId, peer, this.options.getCatchupMargin(), dueTime,
caughtUp)) {
return;
}
LOG.warn("Node {} waitCaughtUp failed, peer={}.", getNodeId(), peer);
Expand Down Expand Up @@ -2344,7 +2354,8 @@ private void unsafeApplyConfiguration(final Configuration newConf, final Configu
final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart);
// Use the new_conf to deal the quorum of this very log
if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) {
Utils.runClosureInThread(configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task."));
ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status(
RaftError.EINTERNAL, "Fail to append task."));
return;
}
final List<LogEntry> entries = new ArrayList<>();
Expand All @@ -2369,21 +2380,22 @@ private void unsafeRegisterConfChange(final Configuration oldConf, final Configu
} else {
status.setError(RaftError.EPERM, "Not leader");
}
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.groupId, done, status);
}
return;
}
// check concurrent conf change
if (this.confCtx.isBusy()) {
LOG.warn("Node {} refused configuration concurrent changing.", getNodeId());
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Doing another configuration change."));
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EBUSY,
"Doing another configuration change."));
}
return;
}
// Return immediately when the new peers equals to current configuration
if (this.conf.getConf().equals(newConf)) {
Utils.runClosureInThread(done, Status.OK());
ThreadPoolsFactory.runClosureInThread(this.groupId, done, Status.OK());
return;
}
this.confCtx.start(oldConf, newConf, done);
Expand All @@ -2405,7 +2417,7 @@ private void afterShutdown() {
}
if (savedDoneList != null) {
for (final Closure closure : savedDoneList) {
Utils.runClosureInThread(closure);
ThreadPoolsFactory.runClosureInThread(this.groupId, closure);
}
}
}
Expand Down Expand Up @@ -2787,7 +2799,7 @@ public void shutdown(Closure done) {
if (this.applyQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(
ThreadPoolsFactory.runInThread(this.groupId,
() -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
} else {
final int num = GLOBAL_NUM_NODES.decrementAndGet();
Expand All @@ -2814,7 +2826,7 @@ public void shutdown(Closure done) {
}
// Call join() asynchronously
final Closure shutdownHook = done;
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
try {
join();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -3148,7 +3160,7 @@ private void doSnapshot(final Closure done, boolean sync) {
} else {
if (done != null) {
final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported");
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.groupId, done, status);
}
}
}
Expand Down
Loading

0 comments on commit cebea9c

Please sign in to comment.