Skip to content

Commit

Permalink
Make membership waiter block until leader commit is available
Browse files Browse the repository at this point in the history
before executing the catchup algorithm

At the moment we unnecessarily wait for up to 2 rounds because
in the first round of catchup the leader commit is nearly always
-1 since we go into this code path before the RAFT machinery is
up and running.

This commit introduces a job which blocks waiting for the leader
commit to be -1. The code in MembershipWaiter#waitUntilCaughtUpMember
relies on MembershipWaiterLifecycle to timeout if the leader commit
is never discovered.
  • Loading branch information
Mark Needham authored and Max Sumrall committed Oct 28, 2016
1 parent 51ccd5f commit 7689b08
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
* Waits until member has "fully joined" the raft membership.
* We consider a member fully joined where:
* <ul>
* <li>It is a member of the voting group
* (its opinion will count towards leader elections and committing entries), and</li>
* <li>It is sufficiently caught up with the leader,
* so that long periods of unavailability are unlikely, should the leader fail.</li>
* <li>It is a member of the voting group
* (its opinion will count towards leader elections and committing entries), and</li>
* <li>It is sufficiently caught up with the leader,
* so that long periods of unavailability are unlikely, should the leader fail.</li>
* </ul>
*
* <p>
* To determine whether the member is sufficiently caught up, we check periodically how far behind we are,
* once every {@code maxCatchupLag}. If the leader is always moving forwards we will never fully catch up,
* so all we look for is that we have caught up with where the leader was the <i>previous</i> time
Expand All @@ -55,15 +55,17 @@ public class MembershipWaiter
private final JobScheduler jobScheduler;
private final Supplier<DatabaseHealth> dbHealthSupplier;
private final long maxCatchupLag;
private LeaderCommitWaiter<MEMBER> waiter;
private final Log log;

public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier<DatabaseHealth> dbHealthSupplier,
long maxCatchupLag, LogProvider logProvider )
long maxCatchupLag, LeaderCommitWaiter<MemberId> leaderCommitWaiter, LogProvider logProvider )
{
this.myself = myself;
this.jobScheduler = jobScheduler;
this.dbHealthSupplier = dbHealthSupplier;
this.maxCatchupLag = maxCatchupLag;
this.waiter = leaderCommitWaiter;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -73,11 +75,17 @@ CompletableFuture<Boolean> waitUntilCaughtUpMember( RaftMachine raft )

Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier );

JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
evaluator, maxCatchupLag, MILLISECONDS );
jobScheduler.schedule( new JobScheduler.Group( getClass().toString(), POOLED ), () -> {
while ( waiter.keepWaiting(raftState) )
{
waiter.waitMore();
}
JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring(
new JobScheduler.Group( getClass().toString(), POOLED ),
evaluator, maxCatchupLag, MILLISECONDS );

catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );
catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );
} );

return catchUpFuture;
}
Expand Down Expand Up @@ -132,6 +140,8 @@ private boolean caughtUpWithLeader()
if ( lastLeaderCommit != -1 )
{
caughtUpWithLeader = localCommit >= lastLeaderCommit;
log.info( "%s Caught up?: %b %d => %d (%d behind)%n", myself, caughtUpWithLeader, localCommit,
lastLeaderCommit, lastLeaderCommit - localCommit );
}
lastLeaderCommit = state.leaderCommit();
if ( lastLeaderCommit != -1 )
Expand All @@ -147,4 +157,5 @@ private boolean caughtUpWithLeader()
return caughtUpWithLeader;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
*/
package org.neo4j.causalclustering.core.consensus.membership;

import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
Expand Down Expand Up @@ -59,7 +61,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500,
NullLogProvider.getInstance() );
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
Expand All @@ -75,16 +77,44 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();
jobScheduler.runJob();

future.get( 0, NANOSECONDS );
}

@Test
public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter<RaftTestMember> waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 500,
NullLogProvider.getInstance(), new LeaderCommitWaiter<>( new NoSleepSleeper() ) );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
raftLog.commit( 0 );

RaftState<RaftTestMember> raftState = mock( RaftState.class );
when( raftState.leaderCommit() ).thenReturn( -1L, -1L, 0L, 0L );

when( raftState.entryLog() ).thenReturn( raftLog );

HashSet<RaftTestMember> members = new HashSet<>();
members.add( member( 0 ) );
when( raftState.votingMembers() ).thenReturn( members );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
jobScheduler.runJob();
jobScheduler.runJob();

future.get( 1, TimeUnit.SECONDS );
}

@Test
public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
NullLogProvider.getInstance() );
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 1 ) )
Expand Down Expand Up @@ -114,7 +144,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
NullLogProvider.getInstance() );
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ), member( 1 ) )
Expand All @@ -138,4 +168,62 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
// expected
}
}

@Test
public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter<RaftTestMember> waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 1,
NullLogProvider.getInstance(), new LimitedTriesLeaderCommitWaiter<>( new NoSleepSleeper(), 1 ) );

RaftState<RaftTestMember> raftState = RaftStateBuilder.raftState()
.leaderCommit( -1L )
.build();

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
jobScheduler.runJob();

try
{
future.get( 10, MILLISECONDS );
fail( "Should have timed out." );
}
catch ( TimeoutException e )
{
// expected
}
}

private class NoSleepSleeper implements Sleeper
{
@Override
public void sleep( long millis )
{
// no need to sleep, only for tests
}
}

class LimitedTriesLeaderCommitWaiter<MEMBER> extends LeaderCommitWaiter<MEMBER>
{
private int attempts;

public LimitedTriesLeaderCommitWaiter( Sleeper sleeper, int attempts )
{
super( sleeper );
this.attempts = attempts;
}

@Override
public void waitMore()
{
attempts--;
}

@Override
public boolean keepWaiting( ReadableRaftState<MEMBER> raftState )
{
return super.keepWaiting( raftState ) && attempts > 0;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

public interface Sleeper
{
void sleep( long millis );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

public class ThreadSleeper implements Sleeper
{
@Override
public void sleep( long millis )
{
try
{
Thread.sleep( millis );
}
catch ( InterruptedException ignored )
{
// ignore me
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.membership;

import org.neo4j.coreedge.catchup.Sleeper;
import org.neo4j.coreedge.raft.state.ReadableRaftState;

public class LeaderCommitWaiter<MEMBER>
{
private Sleeper sleeper;

public LeaderCommitWaiter( Sleeper sleeper )
{
this.sleeper = sleeper;
}

public void waitMore()
{
sleeper.sleep( 100 );
}

public boolean keepWaiting( ReadableRaftState<MEMBER> raftState )
{
return raftState.leaderCommit() == -1;
}
}

0 comments on commit 7689b08

Please sign in to comment.