Skip to content

Commit

Permalink
Tidying up the Cluster class to separate creating a
Browse files Browse the repository at this point in the history
cluster and starting it up.
  • Loading branch information
Mark Needham committed Jul 1, 2016
1 parent 035c12b commit 50c1b23
Show file tree
Hide file tree
Showing 33 changed files with 779 additions and 672 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
Expand All @@ -44,10 +47,14 @@
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.store.StoreFailureException;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.RaftMessage>
{
private final ListenSocketAddress listenAddress;
Expand All @@ -58,6 +65,7 @@ public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages
private MessageHandler<RaftMessages.RaftMessage> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;
private final List<MismatchedStoreListener> listeners = new ArrayList<>();

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

Expand Down Expand Up @@ -131,31 +139,60 @@ public void registerHandler( Inbound.MessageHandler<RaftMessages.RaftMessage> ha
this.messageHandler = handler;
}

public void addMismatchedStoreListener( MismatchedStoreListener listener )
{
listeners.add( listener );
}

private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.StoreIdAwareMessage>
{
@Override
protected void channelRead0( ChannelHandlerContext channelHandlerContext,
RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) throws Exception
{
RaftMessages.RaftMessage message = storeIdAwareMessage.message();
StoreId storeId = storeIdAwareMessage.storeId();

if ( storeId.equals( localDatabase.storeId() ) )
try
{
messageHandler.handle( message );
}
else
{
if ( localDatabase.isEmpty() )
RaftMessages.RaftMessage message = storeIdAwareMessage.message();
StoreId storeId = storeIdAwareMessage.storeId();

if ( storeId.equals( localDatabase.storeId() ) )
{
raftStateMachine.downloadSnapshot( message.from() );
messageHandler.handle( message );
}
else
{
log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " +
"Encountered: %s", storeId, localDatabase.storeId() );
if ( localDatabase.isEmpty() )
{
raftStateMachine.downloadSnapshot( message.from() );
}
else
{
log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " +
"Encountered: %s", storeId, localDatabase.storeId() );
listeners.forEach( l -> {
MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() );
l.onMismatchedStore( ex );
} );
}
}
}
catch ( Exception e )
{
log.error( format( "Failed to process message %s", storeIdAwareMessage ), e );
}
}
}

public interface MismatchedStoreListener
{
void onMismatchedStore(MismatchedStoreIdException ex);
}

public class MismatchedStoreIdException extends StoreFailureException
{
public MismatchedStoreIdException( StoreId expected, StoreId encountered )
{
super( "Expected:" + expected + ", encountered:" + encountered );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.cursor.IOCursor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
private final long rotateAtSize;
private final ChannelMarshal<ReplicatedContent> contentMarshal;
private final FileNames fileNames;
private final Log log;

private boolean needsRecovery;
private final LogProvider logProvider;
Expand All @@ -87,6 +89,7 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long
this.fileNames = new FileNames( directory );
this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock );
this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider );
this.log = logProvider.getLog( getClass() );
}

@Override
Expand All @@ -99,6 +102,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException,

RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider, scanStats );
state = recoveryProtocol.run();
log.info( "log started with recovered state %s", state );
}

@Override
Expand Down Expand Up @@ -216,11 +220,14 @@ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
@Override
public synchronized long skip( long newIndex, long newTerm ) throws IOException
{
log.info( "Skipping from {index: %d, term: %d} to {index: %d, term: %d}", state.appendIndex, state.currentTerm, newIndex, newTerm );
if ( state.appendIndex < newIndex )
{
skipSegment( state.appendIndex, newIndex, newTerm );

state.prevTerm = newTerm;
state.currentTerm = newTerm;

state.prevIndex = newIndex;
state.appendIndex = newIndex;
}
Expand All @@ -239,6 +246,7 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException
@Override
public long readEntryTerm( long logIndex ) throws IOException
{
log.info( "reading entry term at %d. prevIndex:%d, prevTerm:%d", logIndex, state.prevIndex, state.prevTerm );
if ( logIndex == state.prevIndex )
{
return state.prevTerm;
Expand All @@ -257,8 +265,20 @@ public long prune( long safeIndex ) throws IOException
{
long pruneIndex = pruner.getIndexToPruneFrom( safeIndex, state.segments );
SegmentFile oldestNotDisposed = state.segments.prune( pruneIndex );
state.prevIndex = oldestNotDisposed.header().prevIndex();
state.prevTerm = oldestNotDisposed.header().prevTerm();

long newPrevIndex = oldestNotDisposed.header().prevIndex();
long newPrevTerm = oldestNotDisposed.header().prevTerm();

if ( newPrevIndex > state.prevIndex )
{
state.prevIndex = newPrevIndex;
}

if ( newPrevTerm > state.prevTerm )
{
state.prevTerm = newPrevTerm;
}

return state.prevIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -51,30 +53,35 @@ public class MembershipWaiter
private final CoreMember myself;
private final JobScheduler jobScheduler;
private final long maxCatchupLag;
private final RaftServer raftServer;
private final Log log;

public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, LogProvider logProvider )
public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, RaftServer raftServer, LogProvider logProvider )
{
this.myself = myself;
this.jobScheduler = jobScheduler;
this.maxCatchupLag = maxCatchupLag;
this.raftServer = raftServer;
this.log = logProvider.getLog( getClass() );
}

public CompletableFuture<Boolean> waitUntilCaughtUpMember( ReadableRaftState raftState )
{
CompletableFuture<Boolean> catchUpFuture = new CompletableFuture<>();

Evaluator evaluator = new Evaluator( raftState, catchUpFuture );
raftServer.addMismatchedStoreListener( evaluator );

JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
new Evaluator( raftState, catchUpFuture ), maxCatchupLag, MILLISECONDS );
evaluator, maxCatchupLag, MILLISECONDS );

catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );

return catchUpFuture;
}

private class Evaluator implements Runnable
private class Evaluator implements Runnable, RaftServer.MismatchedStoreListener
{
private final ReadableRaftState raftState;
private final CompletableFuture<Boolean> catchUpFuture;
Expand Down Expand Up @@ -119,10 +126,8 @@ private boolean caughtUpWithLeader()
lastLeaderCommit = raftState.leaderCommit();
if ( lastLeaderCommit != -1 )
{
log.info( "%s Catchup: %d => %d (%d behind)",
myself,
localCommit, lastLeaderCommit,
lastLeaderCommit - localCommit );
long gap = lastLeaderCommit - localCommit;
log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap );
}
else
{
Expand All @@ -131,5 +136,11 @@ private boolean caughtUpWithLeader()

return caughtUpWithLeader;
}

@Override
public void onMismatchedStore(RaftServer.MismatchedStoreIdException ex)
{
catchUpFuture.completeExceptionally( ex );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

public class Appending
{
static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome,
RaftMessages.AppendEntries.Request request ) throws IOException
RaftMessages.AppendEntries.Request request, Log log ) throws IOException

{
if ( request.leaderTerm() < state.term() )
{
Expand All @@ -51,7 +53,7 @@ static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome
outcome.setLeader( request.from() );
outcome.setLeaderCommit( request.leaderCommit() );

if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm() ) )
if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm(), log ) )
{
assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1;
RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx,
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving heartbeat from %s at term %d (i am at %d)",
req.from(), req.leaderTerm(), ctx.term() );
Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message );
Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message, log );
break;
}

Expand All @@ -75,7 +75,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx,
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (i am at %d)n",
req.from(), req.leaderTerm(), ctx.term() );
Appending.handleAppendEntriesRequest( ctx, outcome, req );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

class Follower implements RaftMessageHandler
{
static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm )
static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm, Log log )
throws IOException
{
// NOTE: A prevLogIndex before or at our log's prevIndex means that we
Expand All @@ -44,8 +44,18 @@ static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long
// NOTE: The entry term for a non existing log index is defined as -1,
// so the history for a non existing log entry never matches.

return prevLogIndex <= ctx.entryLog().prevIndex() ||
ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm;
long logPrevIndex = ctx.entryLog().prevIndex();
long logPrevTerm = ctx.entryLog().readEntryTerm( prevLogIndex );

boolean logHistoryMatches = prevLogIndex <= logPrevIndex || logPrevTerm == prevLogTerm;

if ( !logHistoryMatches )
{
log.info( "Log history mismatch: index:[%s, %s], term:[%s, %s]",
logPrevIndex, prevLogIndex, logPrevTerm, prevLogTerm );
}

return logHistoryMatches;
}

static void commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome )
Expand Down Expand Up @@ -80,13 +90,13 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx,
{
case HEARTBEAT:
{
Heart.beat( ctx, outcome, (Heartbeat) message );
Heart.beat( ctx, outcome, (Heartbeat) message, log );
break;
}

case APPEND_ENTRIES_REQUEST:
{
Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message );
Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message, log );
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

class Heart
{
static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException
static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request, Log log ) throws IOException
{
if ( request.leaderTerm() < state.term() )
{
Expand All @@ -39,7 +40,7 @@ static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartb
outcome.setLeader( request.from() );
outcome.setLeaderCommit( request.commitIndex() );

if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm() ) )
if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm(), log ) )
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx,
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Heart.beat( ctx, outcome, (Heartbeat) message );
Heart.beat( ctx, outcome, (Heartbeat) message, log );
break;
}

Expand Down Expand Up @@ -114,15 +114,14 @@ else if ( req.leaderTerm() == ctx.term() )
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
break;
}
}

case APPEND_ENTRIES_RESPONSE:
{
RaftMessages.AppendEntries.Response response =
(RaftMessages.AppendEntries.Response) message;
RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message;

if ( response.term() < ctx.term() )
{
Expand Down

0 comments on commit 50c1b23

Please sign in to comment.