Skip to content

Commit

Permalink
core-edge: minor changes in raft log shipper and membership state
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski authored and jimwebber committed Jul 14, 2016
1 parent 5cabecf commit d246e72
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 61 deletions.
Expand Up @@ -419,25 +419,17 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont
return;
}

if ( (prevLogIndex == -1 && prevLogTerm != -1) || (prevLogTerm == -1 && prevLogIndex != -1) )
{
log.warn( "%s aborting append entry request since someone has pruned away the entries we needed." +
"Sending a LogCompactionInfo instead. Leader context=%s, prevLogTerm=%d",
statusAsString(), leaderContext, prevLogTerm );
outbound.send( follower, new RaftMessages.LogCompactionInfo( leader, leaderContext.term,
prevLogIndex ) );
return;
}

RaftMessages.AppendEntries.Request appendRequest =
new RaftMessages.AppendEntries.Request( leader, leaderContext.term, prevLogIndex, prevLogTerm,
entries, leaderContext.commitIndex );

boolean entryMissing = false;
try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightMap, false ) )
{
for ( int offset = 0; offset < batchSize; offset++ )
{
entries[offset] = logEntrySupplier.get( startIndex + offset );
if ( entries[offset] == null )
{
entryMissing = true;
break;
}
if ( entries[offset].term() > leaderContext.term )
{
log.warn( "%s aborting send. Not leader anymore? %s, entryTerm=%d",
Expand All @@ -447,14 +439,45 @@ private void sendRange( long startIndex, long endIndex, LeaderContext leaderCont
}
}

outbound.send( follower, appendRequest );
if ( entryMissing || doesNotExistInLog( prevLogIndex, prevLogTerm ) )
{
if ( raftLog.prevIndex() >= prevLogIndex )
{
sendLogCompactionInfo( leaderContext );
}
else
{
log.error( "Could not send compaction info and entries were missing, but log is not behind." );
}
}
else
{
RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request(
leader, leaderContext.term, prevLogIndex, prevLogTerm, entries, leaderContext.commitIndex );

outbound.send( follower, appendRequest );
}
}
catch ( IOException e )
{
log.warn( statusAsString() + " exception during batch send", e );
}
}

private boolean doesNotExistInLog( long logIndex, long logTerm )
{
return logTerm == -1 && logIndex != -1;
}

private void sendLogCompactionInfo( LeaderContext leaderContext )
{
log.warn( "Sending log compaction info. Log pruned? Status=%s, LeaderContext=%s",
statusAsString(), leaderContext );

outbound.send( follower, new RaftMessages.LogCompactionInfo(
leader, leaderContext.term, raftLog.prevIndex() ) );
}

private String statusAsString()
{
return format( "%s[matchIndex: %d, lastSentIndex: %d, localAppendIndex: %d, mode: %s]", follower, matchIndex,
Expand Down
Expand Up @@ -72,9 +72,9 @@ public class RaftMembershipState extends LifecycleAdapter
private MembershipEntry appended;
long ordinal; // persistence ordinal must be increased each time we change committed or appended

public static RaftMembershipState startState()
public RaftMembershipState()
{
return new RaftMembershipState( -1, null, null );
this( -1, null, null );
}

RaftMembershipState( long ordinal, MembershipEntry committed, MembershipEntry appended )
Expand Down Expand Up @@ -175,14 +175,19 @@ public String toString()
'}';
}

public RaftMembershipState newInstance()
{
return new RaftMembershipState( ordinal, committed, appended );
}

public static class Marshal extends SafeStateMarshal<RaftMembershipState>
{
MembershipEntry.Marshal entryMarshal = new MembershipEntry.Marshal();

@Override
public RaftMembershipState startState()
{
return RaftMembershipState.startState();
return new RaftMembershipState();
}

@Override
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void send( CoreMember to, Collection<RaftMessages.RaftMessage> raftMessag
private int maxAllowedShippingLag = 256;
private Supplier<DatabaseHealth> databaseHealthSupplier;
private StateStorage<RaftMembershipState> raftMembership =
new InMemoryStateStorage<>( RaftMembershipState.startState() );
new InMemoryStateStorage<>( new RaftMembershipState() );
private Monitors monitors = new Monitors();
private RaftStateMachine raftStateMachine = new EmptyStateMachine();
private final InFlightMap<Long,RaftLogEntry> inFlightMap;
Expand Down
Expand Up @@ -308,60 +308,22 @@ public void shouldSendMostRecentlyAvailableEntryIfPruningHappened() throws IOExc
public void shouldSendLogCompactionInfoToFollowerOnMatchIfEntryHasBeenPrunedAway() throws Exception
{
//given
AtomicBoolean afterInit = new AtomicBoolean();
final DoubleLatch latch = new DoubleLatch();
raftLog = new DelegatingRaftLog( raftLog )
{
@Override
public long readEntryTerm( long logIndex ) throws IOException
{
if ( afterInit.get() )
{
latch.start();
latch.awaitFinish();
}
return super.readEntryTerm( logIndex );
}
};

raftLog.append( entry0 );
raftLog.append( entry1 );
raftLog.append( entry2 );
raftLog.append( entry3 );

startLogShipper();

afterInit.set( true );

//when
outbound.clear();
Thread pruningThread = new Thread( "Pruning" )
{
@Override
public void run()
{
try
{
latch.awaitStart();
raftLog.prune( 2 );
latch.finish();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}
};

pruningThread.start();
raftLog.prune( 2 );

logShipper.onMatch( 1, new LeaderContext( 0, 0 ) );

//then
assertTrue( outbound.hasAnyEntriesTo( follower ) );
assertThat( outbound.sentTo( follower ),
hasMessage( new RaftMessages.LogCompactionInfo( leader, 0, 1 ) ) );

pruningThread.join();
hasMessage( new RaftMessages.LogCompactionInfo( leader, 0, 2 ) ) );
}

}
Expand Up @@ -39,7 +39,7 @@

public class RaftMembershipStateTest
{
private RaftMembershipState state = RaftMembershipState.startState();
private RaftMembershipState state = new RaftMembershipState();

private Set<CoreMember> membersA = asSet( member( 0 ), member( 1 ), member( 2 ) );
private Set<CoreMember> membersB = asSet( member( 0 ), member( 1 ), member( 2 ), member( 3 ) );
Expand Down

0 comments on commit d246e72

Please sign in to comment.