Skip to content

Commit

Permalink
Adds diagnostic logging to various CE components
Browse files Browse the repository at this point in the history
State changes of RAFT members are now logged, along with reasons they
 happened
RaftLogShipper will now print which instance it manages in every
 log line it produces. It will also print state changes, along with
 reasons it did so
Election timeouts are now logged
  • Loading branch information
digitalstain committed Jun 10, 2016
1 parent 9fb06ab commit d17aea9
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 24 deletions.
Expand Up @@ -143,6 +143,7 @@ private void initTimers()
{
electionTimer = renewableTimeoutService.create(
Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> {
log.info( "Election timeout triggered, base timeout value is %d%n", electionTimeout );
handle( new RaftMessages.Timeout.Election<>( myself ) );
timeout.renew();
} );
Expand Down
Expand Up @@ -139,7 +139,7 @@ public Object identity()

public synchronized void start()
{
log.info( "Starting log shipper to: " + follower );
log.info( "Starting log shipper: %s", statusAsString() );

try
{
Expand All @@ -154,15 +154,15 @@ public synchronized void start()
// TODO: service is a LifeCycle.
// TODO: Should we have and use one system level timeout service instead?

log.error( "Failed to start log shipper to: " + follower, e );
log.error( "Failed to start log shipper " + statusAsString(), e );
}

sendSingle( raftLog.appendIndex(), lastLeaderContext );
}

public synchronized void stop()
{
log.info( "Stopping log shipper to: " + follower );
log.info( "Stopping log shipper %s", statusAsString() );

try
{
Expand All @@ -171,7 +171,7 @@ public synchronized void stop()
}
catch ( Throwable e )
{
log.error( "Failed to start log shipper to: " + follower, e );
log.error( "Failed to start log shipper " + statusAsString(), e );
}
abortTimeout();
}
Expand All @@ -186,7 +186,8 @@ public synchronized void onMismatch( long lastRemoteAppendIndex, LeaderContext l
break;
case PIPELINE:
case CATCHUP:
log.info( format( "Mismatch in mode %s from follower %s", mode, follower ) );
log.info( "%s: mismatch in mode %s from follower %s, moving to MISMATCH mode",
statusAsString(), mode, follower );
mode = Mode.MISMATCH;
sendSingle( lastSentIndex, leaderContext );
break;
Expand All @@ -205,12 +206,12 @@ public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContex
case MISMATCH:
if ( sendNextBatchAfterMatch( leaderContext ) )
{
log.info( format( "Caught up after mismatch: %s", follower ) );
log.info( "%s: caught up after mismatch, moving to PIPELINE mode", statusAsString() );
mode = Mode.PIPELINE;
}
else
{
log.info( format( "Starting catch up after mismatch: %s", follower ) );
log.info( "%s: starting catch up after mismatch, moving to CATCHUP mode", statusAsString() );
mode = Mode.CATCHUP;
}
break;
Expand All @@ -219,7 +220,7 @@ public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContex
{
if ( sendNextBatchAfterMatch( leaderContext ) )
{
log.info( format( "Caught up: %s", follower ) );
log.info( "%s: caught up, moving to PIPELINE mode", statusAsString() );
mode = Mode.PIPELINE;
}
}
Expand Down Expand Up @@ -255,6 +256,7 @@ public synchronized void onNewEntries( long prevLogIndex, long prevLogTerm, Raft
/* The timer is still set at this point. Either we will send the next batch
* as soon as the follower has caught up with the last pipelined entry,
* or when we timeout and resend. */
log.info("%s: follower has fallen behind (target prevLogIndex was %d, maxAllowedShippingLag is %d), moving to CATCHUP mode", statusAsString(), prevLogIndex, maxAllowedShippingLag );
mode = Mode.CATCHUP;
break;
}
Expand Down Expand Up @@ -301,7 +303,7 @@ else if ( timeoutAbsoluteMillis != 0 )
}
catch ( Throwable e )
{
log.error( "Exception during timeout handling: " + follower, e );
log.error( "Exception during timeout handling: " + statusAsString(), e );
}
}

Expand All @@ -312,6 +314,7 @@ private void onTimeout() throws IOException
case PIPELINE:
/* we leave pipelined mode here, because the follower seems
* unresponsive and we do not want to spam it with new entries */
log.info( "%s: timed out, moving to CATCHUP mode", statusAsString() );
mode = Mode.CATCHUP;
/* fallthrough */
case CATCHUP:
Expand Down Expand Up @@ -399,8 +402,8 @@ private void sendSingle( long logIndex, LeaderContext leaderContext )

if ( prevLogTerm > leaderContext.term )
{
log.warn(
format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
log.warn( "%s aborting send. Not leader anymore? %s, prevLogTerm=%d",
statusAsString(), leaderContext, prevLogTerm );
return;
}

Expand Down Expand Up @@ -431,9 +434,9 @@ private void sendSingle( long logIndex, LeaderContext leaderContext )
catch ( IOException e )
{
log.warn(
"Tried to send entry at index %d that can't be found in the raft log; it has likely been pruned. " +
"%s tried to send entry at index %d that can't be found in the raft log; it has likely been pruned. " +
"This is a temporary state and the system should recover automatically in a short while.",
logIndex );
statusAsString(), logIndex );
}
}

Expand Down Expand Up @@ -469,8 +472,8 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont

if ( prevLogTerm > leaderContext.term )
{
log.warn(
format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
log.warn( "%s aborting send. Not leader anymore? %s, prevLogTerm=%d",
statusAsString(), leaderContext, prevLogTerm );
return;
}

Expand All @@ -486,8 +489,8 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont
entries[offset] = cursor.get();
if ( entries[offset].term() > leaderContext.term )
{
log.warn( format( "Aborting send. Not leader anymore? %s, entryTerm=%d", leaderContext,
entries[offset].term() ) );
log.warn( "%s aborting send. Not leader anymore? %s, entryTerm=%d",
statusAsString(), leaderContext, entries[offset].term() );
return;
}
offset++;
Expand All @@ -498,7 +501,13 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont
}
catch ( IOException e )
{
log.warn( "Exception during batch send", e );
log.warn( statusAsString() + " exception during batch send", e );
}
}

private String statusAsString()
{
return format( "%s[matchIndex: %d, lastSentIndex: %d, localAppendIndex: %d, mode: %s]", follower, matchIndex,
lastSentIndex, raftLog.appendIndex(), mode );
}
}
Expand Up @@ -53,6 +53,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message
}

outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving heartbeat from %s at term %d (i am at %d)%n",
req.from(), req.leaderTerm(), ctx.term() );
Heart.beat( ctx, outcome, (RaftMessages.Heartbeat<MEMBER>) message );
break;
}
Expand All @@ -72,6 +74,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message
}

outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (i am at %d)%n",
req.from(), req.leaderTerm(), ctx.term() );
Appending.handleAppendEntriesRequest( ctx, outcome, req );
break;
}
Expand All @@ -84,6 +88,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message
{
outcome.setNextTerm( res.term() );
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving vote response from %s at term %d (i am at %d)%n",
res.from(), res.term(), ctx.term() );
break;
}
else if ( res.term() < ctx.term() || !res.voteGranted() )
Expand All @@ -98,15 +104,15 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )

if ( isQuorum( ctx.votingMembers().size(), outcome.getVotesForMe().size() ) )
{
log.info( "In term %d %s ELECTED AS LEADER voted for by %s%n",
ctx.term(), ctx.myself(), outcome.getVotesForMe() );

outcome.setLeader( ctx.myself() );
Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() );

outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() );
outcome.electedLeader();
outcome.setNextRole( LEADER );
log.info( "Moving to LEADER state at term %d (i am %s), voted for by %s%n",
ctx.term(), ctx.myself(), outcome.getVotesForMe() );
}
break;
}
Expand All @@ -119,6 +125,8 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )
{
outcome.getVotesForMe().clear();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving vote request from %s at term %d (i am at %d)%n",
req.from(), req.term(), ctx.term() );
Voting.handleVoteRequest( ctx, outcome, req );
break;
}
Expand All @@ -129,8 +137,9 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )

case ELECTION_TIMEOUT:
{
if ( !Election.start( ctx, outcome ) )
if ( !Election.start( ctx, outcome, log ) )
{
log.info( "Moving to FOLLOWER state after failing to start election" );
outcome.setNextRole( FOLLOWER );
}
break;
Expand Down
Expand Up @@ -25,14 +25,17 @@
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

public class Election
{
public static <MEMBER> boolean start( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome ) throws IOException
public static <MEMBER> boolean start( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome, Log log ) throws IOException
{
Set<MEMBER> currentMembers = ctx.votingMembers();
if ( currentMembers == null || !currentMembers.contains( ctx.myself() ) )
{
log.info( "Election attempted but not started, current members are %s, i am %s%n",
currentMembers, ctx.myself() );
return false;
}

Expand All @@ -51,6 +54,7 @@ public static <MEMBER> boolean start( ReadableRaftState<MEMBER> ctx, Outcome<MEM
}

outcome.setVotedFor( ctx.myself() );
log.info( "Election started with vote request: %s and members: %s%n", voteForMe, currentMembers );
return true;
}
}
Expand Up @@ -105,9 +105,10 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message

case ELECTION_TIMEOUT:
{
if ( Election.start( ctx, outcome ) )
if ( Election.start( ctx, outcome, log ) )
{
outcome.setNextRole( CANDIDATE );
log.info( "Moving to CANDIDATE state after successfully starting election %n" );
}
break;
}
Expand Down
Expand Up @@ -46,7 +46,7 @@ private static <MEMBER> Iterable<MEMBER> replicationTargets( final ReadableRaftS
return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) );
}

static <MEMBER> void sendHeartbeats( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome ) throws IOException
private static <MEMBER> void sendHeartbeats( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome ) throws IOException
{
long commitIndex = ctx.leaderCommit();
long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex );
Expand Down Expand Up @@ -76,6 +76,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message

outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " +
"%d) from %s%n", req.leaderTerm(), ctx.term(), req.from() );
Heart.beat( ctx, outcome, (Heartbeat<MEMBER>) message );
break;
}
Expand Down Expand Up @@ -108,6 +110,8 @@ else if ( req.leaderTerm() == ctx.term() )
// There is a new leader in a later term, we should revert to follower. (§5.1)
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s%n", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req );
break;
}
Expand All @@ -128,6 +132,8 @@ else if ( response.term() > ctx.term() )
outcome.setNextTerm( response.term() );
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " +
"%d) from %s%n", response.term(), ctx.term(), response.from() );
outcome.replaceFollowerStates( new FollowerStates<>() );
break;
}
Expand Down Expand Up @@ -197,6 +203,8 @@ else if ( response.term() > ctx.term() )
if ( req.term() > ctx.term() )
{
outcome.steppingDown();
log.info( "Moving to FOLLOWER state after receiving vote request at term %d (my term is " +
"%d) from %s%n", req.term(), ctx.term(), req.from() );
outcome.setNextRole( FOLLOWER );
Voting.handleVoteRequest( ctx, outcome, req );
break;
Expand Down

0 comments on commit d17aea9

Please sign in to comment.