Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Refused requests when node is overload, #138 #144

Merged
merged 7 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<artifactId>junit-dep</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,19 @@
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

/**
* The finite state machine caller implementation.
Expand Down Expand Up @@ -153,7 +156,7 @@ public void onEvent(final ApplyTask event, final long sequence, final boolean en
private NodeImpl node;
private volatile TaskType currTask;
private final AtomicLong applyingIndex;
private RaftException error;
private volatile RaftException error;
private Disruptor<ApplyTask> disruptor;
private RingBuffer<ApplyTask> taskQueue;
private volatile CountDownLatch shutdownLatch;
Expand All @@ -178,8 +181,13 @@ public boolean init(final FSMCallerOptions opts) {
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
this.disruptor = new Disruptor<>(new ApplyTaskFactory(), opts.getDisruptorBufferSize(), new NamedThreadFactory(
"JRaft-FSMCaller-disruptor-", true));
this.disruptor = DisruptorBuilder.<ApplyTask> newInstance() //
.setEventFactory(new ApplyTaskFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.disruptor.handleEventsWith(new ApplyTaskHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.disruptor.start();
Expand Down Expand Up @@ -321,6 +329,10 @@ public void run(final Status st) {

@Override
public boolean onError(final RaftException error) {
if (!this.error.getStatus().isOk()) {
LOG.warn("FSMCaller already in error status, ignore new error: {}", error);
return false;
}
final OnErrorClosure c = new OnErrorClosure(error);
return enqueueTask((task, sequence) -> {
task.type = TaskType.ERROR;
Expand Down
45 changes: 38 additions & 7 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,23 @@
import com.alipay.sofa.jraft.storage.SnapshotExecutor;
import com.alipay.sofa.jraft.storage.impl.LogManagerImpl;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.RepeatedTimer;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.Message;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

/**
* The raft replica node implementation.
Expand All @@ -118,6 +123,9 @@
*/
public class NodeImpl implements Node, RaftServerService {

// Max retry times when applying tasks.
private static final int MAX_APPLY_RETRY_TIMES = 3;

private static final Logger LOG = LoggerFactory.getLogger(NodeImpl.class);

public static final AtomicInteger GLOBAL_NUM_NODES = new AtomicInteger(0);
Expand Down Expand Up @@ -710,8 +718,13 @@ protected void onTrigger() {

this.configManager = new ConfigurationManager();

this.applyDisruptor = new Disruptor<>(new LogEntryAndClosureFactory(),
this.raftOptions.getDisruptorBufferSize(), new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true));
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure> newInstance() //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setEventFactory(new LogEntryAndClosureFactory()) //
.setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.applyDisruptor.start();
Expand Down Expand Up @@ -1292,14 +1305,30 @@ public void apply(final Task task) {

final LogEntry entry = new LogEntry();
entry.setData(task.getData());

int retryTimes = 0;
try {
this.applyQueue.publishEvent((event, sequence) -> {
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
});
};
while (true) {
if (this.applyQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_APPLY_RETRY_TIMES) {
Utils.runClosureInThread(task.getDone(),
new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
return;
}
ThreadHelper.onSpinWait();
}
}

} catch (final Exception e) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
}
Expand Down Expand Up @@ -2321,8 +2350,10 @@ public void shutdown(final Closure done) {
this.rpcService.shutdown();
}
if (this.applyQueue != null) {
this.shutdownLatch = new CountDownLatch(1);
this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch);
Utils.runInThread(() -> {
this.shutdownLatch = new CountDownLatch(1);
this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch);
});
} else {
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class NodeMetrics {

private final MetricRegistry metrics;

public NodeMetrics(boolean enableMetrics) {
public NodeMetrics(final boolean enableMetrics) {
if (enableMetrics) {
this.metrics = new MetricRegistry();
} else {
Expand Down Expand Up @@ -62,7 +62,18 @@ public MetricRegistry getMetricRegistry() {
}

/**
* Recorded operation batch size.
* Records operation times.
* @param key
* @param times
*/
public void recordTimes(final String key, final long times) {
if (this.metrics != null) {
this.metrics.counter(key).inc(times);
}
}

/**
* Records operation batch size.
*
* @param key key of operation
* @param size size of operation
Expand All @@ -74,7 +85,7 @@ public void recordSize(final String key, final long size) {
}

/**
* Record operation latency.
* Records operation latency.
*
* @param key key of operation
* @param duration duration of operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.FSMCaller;
import com.alipay.sofa.jraft.FSMCaller.LastAppliedLogIndexListener;
import com.alipay.sofa.jraft.ReadOnlyService;
Expand All @@ -42,15 +45,20 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.ReadIndexResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.util.Bytes;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.ZeroByteStringHelper;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

/**
* Read-only service implementation.
Expand All @@ -59,19 +67,25 @@
*/
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {

private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
/** Disruptor to run readonly service. */
private Disruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
private RaftOptions raftOptions;
private NodeImpl node;
private final Lock lock = new ReentrantLock();
private final Lock lock = new ReentrantLock();
private FSMCaller fsmCaller;
private volatile CountDownLatch shutdownLatch;

private ScheduledExecutorService scheduledExecutorService;

private NodeMetrics nodeMetrics;

// <logIndex, statusList>
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();

private static final Logger LOG = LoggerFactory
.getLogger(ReadOnlyServiceImpl.class);

private static class ReadIndexEvent {
Bytes requestContext;
Expand All @@ -90,10 +104,12 @@ public ReadIndexEvent newInstance() {

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private List<ReadIndexEvent> events = new ArrayList<>(ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
private final List<ReadIndexEvent> events = new ArrayList<>(
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

@Override
public void onEvent(ReadIndexEvent newEvent, long sequence, boolean endOfBatch) throws Exception {
public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
this.events.clear();
Expand All @@ -119,7 +135,7 @@ class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexRespon
final List<ReadIndexState> states;
final ReadIndexRequest request;

public ReadIndexResponseClosure(List<ReadIndexState> states, ReadIndexRequest request) {
public ReadIndexResponseClosure(final List<ReadIndexState> states, final ReadIndexRequest request) {
super();
this.states = states;
this.request = request;
Expand All @@ -140,7 +156,8 @@ public void run(final Status status) {
return;
}
// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request, readIndexResponse.getIndex());
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.getIndex());
Expand All @@ -156,8 +173,8 @@ public void run(final Status status) {
notifySuccess(readIndexStatus);
} else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10))
.add(readIndexStatus);
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)).add(readIndexStatus);
}
} finally {
if (doUnlock) {
Expand All @@ -169,8 +186,7 @@ public void run(final Status status) {
private void notifyFail(final Status status) {
final long nowMs = Utils.monotonicMs();
for (final ReadIndexState state : this.states) {
ReadOnlyServiceImpl.this.node.getNodeMetrics().recordLatency("read-index",
nowMs - state.getStartTimeMs());
ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
final ReadIndexClosure done = state.getDone();
if (done != null) {
final Bytes reqCtx = state.getRequestContext();
Expand Down Expand Up @@ -202,15 +218,22 @@ private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
@Override
public boolean init(final ReadOnlyServiceOptions opts) {
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.fsmCaller = opts.getFsmCaller();
this.raftOptions = opts.getRaftOptions();

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = new Disruptor<>(new ReadIndexEventFactory(), this.raftOptions.getDisruptorBufferSize(),
new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true));
this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexDisruptor
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexDisruptor.start();
this.readIndexQueue = this.readIndexDisruptor.getRingBuffer();

Expand All @@ -219,7 +242,7 @@ public boolean init(final ReadOnlyServiceOptions opts) {

// start scanner
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
}

Expand Down Expand Up @@ -249,11 +272,27 @@ public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
throw new IllegalStateException("Service already shutdown.");
}
try {
this.readIndexQueue.publishEvent((event, sequence) -> {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
});
};
int retryTimes = 0;
while (true) {
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
return;
}
ThreadHelper.onSpinWait();
}
}
} catch (final Exception e) {
Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
}
Expand Down Expand Up @@ -309,7 +348,7 @@ void flush() throws InterruptedException {

@OnlyForTest
TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
return pendingNotifyStatus;
return this.pendingNotifyStatus;
}

private void notifySuccess(final ReadIndexStatus status) {
Expand All @@ -320,7 +359,7 @@ private void notifySuccess(final ReadIndexStatus status) {
final ReadIndexState task = states.get(i);
final ReadIndexClosure done = task.getDone(); // stack copy
if (done != null) {
this.node.getNodeMetrics().recordLatency("read-index", nowMs - task.getStartTimeMs());
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
done.setResult(task.getIndex(), task.getRequestContext().get());
done.run(Status.OK());
}
Expand Down
Loading