Permalink
Browse files

Return a value from StateMachine applyOperation, and use that for the…

… Future result

We wait until the result has been applied to the StateMachine, not just committed
to the Raft log.  This is important e.g. for Read-Your-Writes
  • Loading branch information...
justinsb committed Dec 11, 2013
1 parent ba7fe56 commit dfdd07f5b0620eb9f975239ba589664e7265eb1a
@@ -100,13 +100,13 @@ protected void doStop() {
}
public ListenableFuture<Boolean> commitAsync(final byte[] operation) throws RaftException {
public ListenableFuture<Object> commitAsync(final byte[] operation) throws RaftException {
// Make sure this happens on the Barge thread
ListenableFuture<ListenableFuture<Boolean>> response =
executor.submit(new Callable<ListenableFuture<Boolean>>() {
ListenableFuture<ListenableFuture<Object>> response =
executor.submit(new Callable<ListenableFuture<Object>>() {
@Override
public ListenableFuture<Boolean> call() throws Exception {
public ListenableFuture<Object> call() throws Exception {
// System.out.println("Sending operation");
return ctx.commitOperation(operation);
}
@@ -116,7 +116,7 @@ protected void doStop() {
}
public boolean commit(final byte[] operation) throws RaftException, InterruptedException {
public Object commit(final byte[] operation) throws RaftException, InterruptedException {
try {
return commitAsync(operation).get();
} catch (ExecutionException e) {
@@ -21,7 +21,7 @@
public interface StateMachine {
void applyOperation(@Nonnull ByteBuffer entry);
Object applyOperation(@Nonnull ByteBuffer entry);
// void takeSnapshot(@Nonnull OutputStream snapshot);
//
@@ -28,6 +28,7 @@
import com.google.protobuf.ByteString;
import journal.io.api.Journal;
import org.robotninjas.barge.ClusterConfig;
import com.google.common.util.concurrent.SettableFuture;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.rpc.RaftExecutor;
import org.slf4j.Logger;
@@ -44,6 +45,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.TreeMap;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -111,7 +113,7 @@ public void append(Entry entry, long index) {
}
});
fireComitted();
fireComitted(null);
LOGGER.info("Finished replaying log lastIndex {}, currentTerm {}, commitIndex {}, lastVotedFor {}",
lastLogIndex, currentTerm, commitIndex, votedFor.orNull());
@@ -205,12 +207,16 @@ public GetEntriesResult getEntriesFrom(@Nonnegative long beginningIndex, @Nonneg
}
void fireComitted() {
void fireComitted(Map<Long, SettableFuture<Object>> listeners) {
try {
for (long i = lastApplied + 1; i <= Math.min(commitIndex, lastLogIndex); ++i, ++lastApplied) {
byte[] rawCommand = log.get(i).getCommand().toByteArray();
final ByteBuffer operation = ByteBuffer.wrap(rawCommand).asReadOnlyBuffer();
stateMachine.dispatchOperation(operation);
SettableFuture<Object> listener = null;
if (listeners != null) {
listener = listeners.get(i);
}
stateMachine.dispatchOperation(i, operation, listener);
}
} catch (Exception e) {
throw propagate(e);
@@ -229,10 +235,10 @@ public long commitIndex() {
return commitIndex;
}
public void commitIndex(long index) {
public void commitIndex(long index, Map<Long, SettableFuture<Object>> listeners) {
commitIndex = index;
journal.appendCommit(index);
fireComitted();
fireComitted(listeners);
}
public long currentTerm() {
@@ -4,6 +4,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.robotninjas.barge.StateMachine;
@@ -12,6 +13,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
@@ -35,14 +37,23 @@
}
@Nonnull
public ListenableFuture dispatchOperation(@Nonnull final ByteBuffer op) {
public ListenableFuture dispatchOperation(final long index, @Nonnull final ByteBuffer op, final SettableFuture<Object> listener) {
checkNotNull(op);
return executor.submit(new Runnable() {
@Override
public void run() {
stateMachine.applyOperation(op.asReadOnlyBuffer());
try {
Object result = stateMachine.applyOperation(op.asReadOnlyBuffer());
if (listener != null) {
listener.set(result);
}
} catch (Throwable t) {
if (listener != null) {
listener.setException(t);
}
}
}
});
@@ -172,7 +172,7 @@ public AppendEntriesResponse appendEntries(@Nonnull RaftStateContext ctx, @Nonnu
@Nonnull
@Override
public ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
throw new NoLeaderException();
}
@@ -116,7 +116,7 @@ public AppendEntriesResponse appendEntries(@Nonnull RaftStateContext ctx, @Nonnu
success = log.append(request);
if (request.getCommitIndex() > log.commitIndex()) {
log.commitIndex(Math.min(request.getCommitIndex(), log.lastLogIndex()));
log.commitIndex(Math.min(request.getCommitIndex(), log.lastLogIndex()), null);
}
}
@@ -131,7 +131,7 @@ public AppendEntriesResponse appendEntries(@Nonnull RaftStateContext ctx, @Nonnu
@Nonnull
@Override
public ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
if (leader.isPresent()) {
throw new NotLeaderException(leader.get());
} else {
@@ -23,10 +23,12 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import org.robotninjas.barge.RaftException;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.log.RaftLog;
import org.robotninjas.barge.rpc.RaftExecutor;
import org.robotninjas.barge.rpc.RaftScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -56,17 +59,19 @@
private final RaftLog log;
private final ScheduledExecutorService scheduler;
private final ListeningExecutorService executor;
private final long timeout;
private final Map<Replica, ReplicaManager> managers = Maps.newHashMap();
private final ReplicaManagerFactory replicaManagerFactory;
private ScheduledFuture<?> heartbeatTask;
private final SortedMap<Long, SettableFuture<Boolean>> requests = Maps.newTreeMap();
private final SortedMap<Long, SettableFuture<Object>> requests = Maps.newTreeMap();
@Inject
Leader(RaftLog log, @RaftScheduler ScheduledExecutorService scheduler,
Leader(RaftLog log, @RaftExecutor ListeningExecutorService executor, @RaftScheduler ScheduledExecutorService scheduler,
@ElectionTimeout @Nonnegative long timeout, ReplicaManagerFactory replicaManagerFactory) {
this.log = checkNotNull(log);
this.executor = checkNotNull(executor);
this.scheduler = checkNotNull(scheduler);
checkArgument(timeout > 0);
this.timeout = timeout;
@@ -159,14 +164,30 @@ public void run() {
@Nonnull
@Override
public ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
resetTimeout(ctx);
long index = log.append(operation);
requests.put(index, SettableFuture.<Boolean>create());
final SettableFuture<Object> future = SettableFuture.create();
requests.put(index, future);
List<ListenableFuture<AppendEntriesResponse>> responses = sendRequests(ctx);
return majorityResponse(responses, appendSuccessul());
final ListenableFuture<Boolean> sendMessageFuture = majorityResponse(responses, appendSuccessul());
sendMessageFuture.addListener(new Runnable() {
@Override
public void run() {
try {
Boolean sent = sendMessageFuture.get();
if (sent == Boolean.TRUE) {
// Okay, updateCommitted will be called and we will handle it there
} else {
future.setException(new IOException());
}
} catch (Throwable t) {
future.setException(t);
}
}
}, executor);
return future;
}
/**
@@ -186,12 +207,13 @@ public int compare(ReplicaManager o, ReplicaManager o2) {
final int middle = (int) Math.ceil(sorted.size() / 2.0);
final long committed = sorted.get(middle).getMatchIndex();
log.commitIndex(committed);
SortedMap<Long, SettableFuture<Boolean>> entries = requests.headMap(committed + 1);
for (SettableFuture<Boolean> f : entries.values()) {
f.set(true);
}
SortedMap<Long, SettableFuture<Object>> entries = requests.headMap(committed + 1);
// We need to make a copy, because updateCommitIndex is async
final Map<Long, SettableFuture<Object>> snapshot = Maps.newHashMap(entries);
log.commitIndex(committed, snapshot);
entries.clear();
@@ -63,7 +63,7 @@ public AppendEntriesResponse appendEntries(@Nonnull AppendEntries request) {
}
@Nonnull
public ListenableFuture<Boolean> commitOperation(@Nonnull byte[] op) throws RaftException {
public ListenableFuture<Object> commitOperation(@Nonnull byte[] op) throws RaftException {
checkNotNull(op);
return delegate.commitOperation(this, op);
}
@@ -34,6 +34,6 @@
AppendEntriesResponse appendEntries(@Nonnull RaftStateContext ctx, @Nonnull AppendEntries request);
@Nonnull
ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException;
ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException;
}
@@ -172,7 +172,7 @@ public void init(@Nonnull RaftStateContext ctx) {
@Nonnull
@Override
public ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
return null;
}
@@ -11,6 +11,7 @@
import org.robotninjas.barge.proto.RaftProto;
import org.robotninjas.barge.rpc.Client;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -79,7 +80,7 @@ public void testRequestVoteWithNewerTerm() throws Exception {
verify(mockRaftLog).lastVotedFor(Optional.of(mockCandidate));
verify(mockRaftLog, times(2)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verify(mockRaftStateContext, times(1)).setState(FOLLOWER);
verifyNoMoreInteractions(mockRaftStateContext);
@@ -112,7 +113,7 @@ public void testRequestVoteWithOlderTerm() throws Exception {
verify(mockRaftLog, never()).lastVotedFor(Optional.of(mockCandidate));
verify(mockRaftLog, times(1)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verifyZeroInteractions(mockRaftStateContext);
@@ -143,7 +144,7 @@ public void testRequestVoteWithSameTerm() throws Exception {
verify(mockRaftLog, never()).lastVotedFor(Optional.of(mockCandidate));
verify(mockRaftLog, times(1)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verify(mockRaftStateContext, never()).setState(any(RaftStateContext.StateType.class));
@@ -178,7 +179,7 @@ public void testAppendEntriesWithNewerTerm() throws Exception {
verify(mockRaftLog).lastVotedFor(Optional.of(self));
verify(mockRaftLog, times(1)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verify(mockRaftStateContext).setState(FOLLOWER);
verifyNoMoreInteractions(mockRaftStateContext);
@@ -212,7 +213,7 @@ public void testAppendEntriesWithOlderTerm() throws Exception {
verify(mockRaftLog).lastVotedFor(Optional.of(self));
verify(mockRaftLog, times(1)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verifyZeroInteractions(mockRaftStateContext);
@@ -243,7 +244,7 @@ public void testAppendEntriesWithSameTerm() throws Exception {
verify(mockRaftLog).lastVotedFor(Optional.of(self));
verify(mockRaftLog, times(1)).lastVotedFor(any(Optional.class));
verify(mockRaftLog, never()).commitIndex(anyLong());
verify(mockRaftLog, never()).commitIndex(anyLong(), any(Map.class));
verify(mockRaftStateContext).setState(FOLLOWER);
verifyNoMoreInteractions(mockRaftStateContext);

0 comments on commit dfdd07f

Please sign in to comment.