Skip to content

Commit

Permalink
fix long recovery leading to oom in in-flight map
Browse files Browse the repository at this point in the history
A long recovery during startup could lead to the in-flight map
being filled up with enough entries to exhaust memory. This was
an unnoticed side-effect of a recent refactoring which was
previously protected against by holding the core state monitor
during the entire recovery step.
  • Loading branch information
martinfurmanski committed Apr 25, 2017
1 parent a5e4640 commit ccfd5f8
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 27 deletions.
Expand Up @@ -63,6 +63,7 @@
public class RaftMachine implements LeaderLocator, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;
private final InFlightMap<RaftLogEntry> inFlightMap;
private RenewableTimeoutService.RenewableTimeout heartbeatTimer;

public enum Timeouts implements RenewableTimeoutService.TimeoutName
Expand Down Expand Up @@ -112,13 +113,19 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
this.refuseToBecomeLeader = refuseToBecomeLeader;
this.clock = clock;

this.inFlightMap = inFlightMap;
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap,
logProvider );

leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );
}

public synchronized void startTimers()
/**
* This should be called after the major recovery operations are complete. Before this is called
* this instance cannot become a leader (the timers are disabled) and entries will not be cached
* in the in-flight map, because the application process is not running and ready to consume them.
*/
public synchronized void postRecoveryActions()
{
if ( !refuseToBecomeLeader )
{
Expand All @@ -128,6 +135,8 @@ public synchronized void startTimers()
heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0,
renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) );
}

inFlightMap.enable();
}

public synchronized void stopTimers()
Expand Down
Expand Up @@ -27,6 +27,22 @@
public class InFlightMap<V>
{
private final SortedMap<Long,V> map = new ConcurrentSkipListMap<>();
private volatile boolean enabled;

public InFlightMap()
{
this ( false );
}

public InFlightMap( boolean enabled )
{
this.enabled = enabled;
}

public void enable()
{
this.enabled = true;
}

/**
* Adds a new mapping.
Expand All @@ -37,6 +53,11 @@ public class InFlightMap<V>
*/
public void put( Long key, V value )
{
if ( !enabled )
{
return;
}

V previousValue = map.putIfAbsent( key, value );

if ( previousValue != null )
Expand Down
Expand Up @@ -81,7 +81,7 @@ public synchronized void start() throws Throwable
localDatabase.start();
coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() );
applicationProcess.start();
raftMachine.startTimers();
raftMachine.postRecoveryActions();
}

@Override
Expand Down
Expand Up @@ -78,14 +78,13 @@ public class RaftMachineBuilder
new InMemoryStateStorage<>( new RaftMembershipState() );
private Monitors monitors = new Monitors();
private CommitListener commitListener = commitIndex -> {};
private final InFlightMap<RaftLogEntry> inFlightMap;
private InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();

public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder )
{
this.member = member;
this.expectedClusterSize = expectedClusterSize;
this.memberSetBuilder = memberSetBuilder;
inFlightMap = new InFlightMap<>();
}

public RaftMachine build()
Expand Down Expand Up @@ -162,6 +161,12 @@ public RaftMachineBuilder raftLog( RaftLog raftLog )
return this;
}

public RaftMachineBuilder inFlightMap( InFlightMap<RaftLogEntry> inFlightMap )
{
this.inFlightMap = inFlightMap;
return this;
}

public RaftMachineBuilder clock( Clock clock )
{
this.clock = clock;
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogCursor;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.core.consensus.membership.MembershipEntry;
import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService;
Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.causalclustering.core.consensus.RaftMachine.Timeouts.ELECTION;
Expand All @@ -67,6 +69,7 @@ public class RaftMachineTest
private MemberId member4 = member( 4 );

private ReplicatedInteger data1 = ReplicatedInteger.valueOf( 1 );
private ReplicatedInteger data2 = ReplicatedInteger.valueOf( 2 );

private RaftLog raftLog = new InMemoryRaftLog();

Expand Down Expand Up @@ -96,7 +99,7 @@ public void shouldRequestVotesOnElectionTimeout() throws Exception
.build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

// When
timeouts.invokeTimeout( ELECTION );
Expand All @@ -121,7 +124,7 @@ public void shouldBecomeLeaderInMajorityOf3() throws Exception
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );
assertThat( raft.isLeader(), is( false ) );
Expand All @@ -144,7 +147,7 @@ public void shouldBecomeLeaderInMajorityOf5() throws Exception

raft.installCoreState( new RaftCoreState(
new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );

Expand All @@ -169,7 +172,7 @@ public void shouldNotBecomeLeaderOnMultipleVotesFromSameMember() throws Exceptio

raft.installCoreState( new RaftCoreState(
new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );

Expand All @@ -191,7 +194,7 @@ public void shouldNotBecomeLeaderWhenVotingOnItself() throws Exception
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );

Expand All @@ -212,7 +215,7 @@ public void shouldNotBecomeLeaderWhenMembersVoteNo() throws Exception
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );

Expand All @@ -234,7 +237,7 @@ public void shouldNotBecomeLeaderByVotesFromOldTerm() throws Exception
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );
// When
Expand All @@ -260,7 +263,7 @@ public void shouldVoteFalseForCandidateInOldTerm() throws Exception
.build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

// When
raft.handle( voteRequest().from( member1 ).term( -1 ).candidate( member1 )
Expand All @@ -281,7 +284,7 @@ public void shouldNotBecomeLeaderByVotesFromFutureTerm() throws Exception
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );

Expand Down Expand Up @@ -310,7 +313,7 @@ public void shouldAppendNewLeaderBarrierAfterBecomingLeader() throws Exception
.build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

// When
timeouts.invokeTimeout( ELECTION );
Expand All @@ -335,7 +338,7 @@ public void leaderShouldSendHeartBeatsOnHeartbeatTimeout() throws Exception
.build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

timeouts.invokeTimeout( ELECTION );
raft.handle( voteResponse().from( member1 ).term( 1 ).grant().build() );
Expand All @@ -359,7 +362,7 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr
.timeoutService( timeouts ).clock( fakeClock ).build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

try
{
Expand Down Expand Up @@ -424,7 +427,7 @@ public void handle( RaftMessages.RaftMessage message )
.build();

raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) );
raft.startTimers();
raft.postRecoveryActions();

// We make ourselves the leader
timeouts.invokeTimeout( ELECTION );
Expand Down Expand Up @@ -473,6 +476,40 @@ public void shouldMonitorLeaderNotFound() throws Exception
}
}

@Test
public void shouldNotCacheInFlightEntriesUntilAfterRecovery() throws Exception
{
// given
FakeClock fakeClock = Clocks.fakeClock();
InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();
ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock );
RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE )
.timeoutService( timeouts )
.clock( fakeClock )
.raftLog( raftLog )
.inFlightMap( inFlightMap )
.build();

raftLog.append( new RaftLogEntry(0, new MemberIdSet(asSet( myself, member1, member2 ))) );

// when
raft.handle( appendEntriesRequest().from( member1 ).prevLogIndex( 0 ).prevLogTerm( 0 ).leaderTerm( 0 )
.logEntry( new RaftLogEntry( 0, data1 ) ).build() );

// then
assertEquals( data1, readLogEntry( raftLog, 1 ).content() );
assertNull( inFlightMap.get( 1L ) );

// when
raft.postRecoveryActions();
raft.handle( appendEntriesRequest().from( member1 ).prevLogIndex( 1 ).prevLogTerm( 0 ).leaderTerm( 0 )
.logEntry( new RaftLogEntry( 0, data2 ) ).build() );

// then
assertEquals( data2, readLogEntry( raftLog, 2 ).content() );
assertEquals( data2, inFlightMap.get( 2L ).content() );
}

private static class ExplodingRaftLog implements RaftLog
{
private boolean startExploding = false;
Expand Down
Expand Up @@ -91,7 +91,7 @@ public void bootstrap( MemberId[] members ) throws RaftMachine.BootstrapExceptio
{
member.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) );
member.raftInstance().installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( members )) ) );
member.raftInstance().startTimers();
member.raftInstance().postRecoveryActions();
}
}

Expand Down
Expand Up @@ -106,7 +106,7 @@ void boot() throws BootstrapException, TimeoutException, InterruptedException, I
{
raft.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) );
raft.raftMachine().installCoreState( new RaftCoreState( new MembershipEntry( 0, members ) ) );
raft.raftMachine.startTimers();
raft.raftMachine.postRecoveryActions();
}
net.start();
awaitBootstrapped();
Expand Down
Expand Up @@ -25,16 +25,30 @@
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

public class InFlightLogEntriesCacheTest
{
@Test
public void shouldNotCacheUntilEnabled() throws Exception
{
InFlightMap<Object> cache = new InFlightMap<>();
Object entry = new Object();

cache.put( 1L, entry );
assertNull( cache.get( 1L ) );

cache.enable();
cache.put( 1L, entry );
assertEquals( entry, cache.get( 1L ) );
}

@Test
public void shouldRegisterAndUnregisterValues() throws Exception
{
InFlightMap<Object> entries = new InFlightMap<>();
entries.enable();

Map<Long, Object> logEntryList = new HashMap<>();
logEntryList.put(1L, new Object() );
Expand Down Expand Up @@ -65,6 +79,7 @@ public void shouldRegisterAndUnregisterValues() throws Exception
public void shouldNotReinsertValues() throws Exception
{
InFlightMap<Object> entries = new InFlightMap<>();
entries.enable();
Object addedObject = new Object();
entries.put( 1L, addedObject );
entries.put( 1L, addedObject );
Expand All @@ -74,6 +89,7 @@ public void shouldNotReinsertValues() throws Exception
public void shouldNotReplaceRegisteredValues() throws Exception
{
InFlightMap<Object> cache = new InFlightMap<>();
cache.enable();
Object first = new Object();
Object second = new Object();

Expand Down
Expand Up @@ -90,7 +90,7 @@ public void applyTo() throws Exception

BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries );

InFlightMap<RaftLogEntry> cache = new InFlightMap<>();
InFlightMap<RaftLogEntry> cache = new InFlightMap<>( true );

//when
batchAppend.applyTo( cache, log );
Expand Down
Expand Up @@ -44,7 +44,7 @@ public void applyTo() throws Exception
Log log = logProvider.getLog( getClass() );
long fromIndex = 2L;
TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex );
InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();
InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>( true );

inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) );
inFlightMap.put( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) );
Expand All @@ -71,7 +71,7 @@ public void shouldTruncateWithGaps() throws Exception
long fromIndex = 1L;
TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex );

InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();
InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>( true );

inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) );
inFlightMap.put( 2L, new RaftLogEntry( 1L, valueOf( 1 ) ) );
Expand Down

0 comments on commit ccfd5f8

Please sign in to comment.