Skip to content

Commit

Permalink
Have one check on increasing delay rather than two checks on differen…
Browse files Browse the repository at this point in the history
…t intervals.
  • Loading branch information
Max Sumrall committed Oct 31, 2016
1 parent 7636ea1 commit 8083c90
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 168 deletions.

This file was deleted.

Expand Up @@ -32,6 +32,7 @@
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

/**
Expand All @@ -55,17 +56,17 @@ public class MembershipWaiter
private final JobScheduler jobScheduler;
private final Supplier<DatabaseHealth> dbHealthSupplier;
private final long maxCatchupLag;
private LeaderCommitWaiter waiter;
private long currentCatchupDelayInMs;
private final Log log;

public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier<DatabaseHealth> dbHealthSupplier,
long maxCatchupLag, LeaderCommitWaiter leaderCommitWaiter, LogProvider logProvider )
long maxCatchupLag, LogProvider logProvider )
{
this.myself = myself;
this.jobScheduler = jobScheduler;
this.dbHealthSupplier = dbHealthSupplier;
this.maxCatchupLag = maxCatchupLag;
this.waiter = leaderCommitWaiter;
this.currentCatchupDelayInMs = maxCatchupLag;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -75,17 +76,11 @@ CompletableFuture<Boolean> waitUntilCaughtUpMember( RaftMachine raft )

Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier );

jobScheduler.schedule( new JobScheduler.Group( getClass().toString(), POOLED ), () -> {
while ( waiter.keepWaiting( raft.state() ) )
{
waiter.waitMore();
}
JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
evaluator, maxCatchupLag, MILLISECONDS );
JobScheduler.JobHandle jobHandle = jobScheduler.schedule(
new JobScheduler.Group( getClass().toString(), POOLED ),
evaluator, currentCatchupDelayInMs, MILLISECONDS );

catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );
} );
catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );

return catchUpFuture;
}
Expand Down Expand Up @@ -118,6 +113,13 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() )
{
catchUpFuture.complete( true );
}
else
{
currentCatchupDelayInMs += SECONDS.toMillis( 1 );
long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag;
jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString(), POOLED ), this,
longerDelay, MILLISECONDS );
}
}

private boolean iAmAVotingMember()
Expand All @@ -136,19 +138,18 @@ private boolean caughtUpWithLeader()
boolean caughtUpWithLeader = false;

ExposedRaftState state = raft.state();

long localCommit = state.commitIndex();
lastLeaderCommit = state.leaderCommit();
if ( lastLeaderCommit != -1 )
{
caughtUpWithLeader = localCommit >= lastLeaderCommit;
caughtUpWithLeader = localCommit == lastLeaderCommit;
long gap = lastLeaderCommit - localCommit;
log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap );
}
else
{
log.info( "Leader commit unknown" );
}

return caughtUpWithLeader;
}
}
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -41,10 +41,8 @@
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.RaftServer;
import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler;
import org.neo4j.causalclustering.core.consensus.membership.LeaderCommitWaiter;
import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiter;
import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiterLifecycle;
import org.neo4j.causalclustering.core.consensus.membership.ThreadSleeper;
import org.neo4j.causalclustering.core.state.ClusteringModule;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.core.state.CoreState;
Expand Down Expand Up @@ -183,7 +181,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

MembershipWaiter membershipWaiter =
new MembershipWaiter( identityModule.myself(), jobScheduler, dbHealthSupplier, electionTimeout * 4,
new LeaderCommitWaiter( new ThreadSleeper() ), logProvider );
logProvider );
long joinCatchupTimeout = config.get( CausalClusteringSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftMachine(), logProvider );
Expand Down
Expand Up @@ -57,8 +57,7 @@ public void mocking()
public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500,
new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, NullLogProvider.getInstance() );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
Expand All @@ -83,8 +82,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception
public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500,
new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, NullLogProvider.getInstance() );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
Expand All @@ -100,7 +98,6 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();
jobScheduler.runJob();

future.get( 1, TimeUnit.SECONDS );
}
Expand All @@ -109,9 +106,7 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception
public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new
LeaderCommitWaiter( new NoSleepSleeper() ),
NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 1 ) )
Expand Down Expand Up @@ -140,9 +135,7 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new
LeaderCommitWaiter( new NoSleepSleeper() ),
NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ), member( 1 ) )
Expand Down Expand Up @@ -171,8 +164,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
new LimitedTriesLeaderCommitWaiter( new NoSleepSleeper(), 1 ), NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.leaderCommit( -1 )
Expand All @@ -194,37 +186,4 @@ public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception
// expected
}
}

private class NoSleepSleeper implements Sleeper
{
@Override
public void sleep( long millis )
{
// no need to sleep, only for tests
}
}

class LimitedTriesLeaderCommitWaiter extends LeaderCommitWaiter
{
private int attempts;

public LimitedTriesLeaderCommitWaiter( Sleeper sleeper, int attempts )
{
super( sleeper );
this.attempts = attempts;
}

@Override
public void waitMore()
{
attempts--;
}

@Override
public boolean keepWaiting( ExposedRaftState raftState )
{
return super.keepWaiting( raftState ) && attempts > 0;
}
}

}

0 comments on commit 8083c90

Please sign in to comment.