Skip to content

Commit

Permalink
Reintroduce this membership waiter change to the codebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Oct 28, 2016
1 parent 7689b08 commit 7636ea1
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 48 deletions.
Expand Up @@ -17,12 +17,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.membership;
package org.neo4j.causalclustering.core.consensus.membership;

import org.neo4j.coreedge.catchup.Sleeper;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState;

public class LeaderCommitWaiter<MEMBER>
public class LeaderCommitWaiter
{
private Sleeper sleeper;

Expand All @@ -36,7 +35,7 @@ public void waitMore()
sleeper.sleep( 100 );
}

public boolean keepWaiting( ReadableRaftState<MEMBER> raftState )
public boolean keepWaiting( ExposedRaftState raftState )
{
return raftState.leaderCommit() == -1;
}
Expand Down
Expand Up @@ -55,11 +55,11 @@ public class MembershipWaiter
private final JobScheduler jobScheduler;
private final Supplier<DatabaseHealth> dbHealthSupplier;
private final long maxCatchupLag;
private LeaderCommitWaiter<MEMBER> waiter;
private LeaderCommitWaiter waiter;
private final Log log;

public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier<DatabaseHealth> dbHealthSupplier,
long maxCatchupLag, LeaderCommitWaiter<MemberId> leaderCommitWaiter, LogProvider logProvider )
long maxCatchupLag, LeaderCommitWaiter leaderCommitWaiter, LogProvider logProvider )
{
this.myself = myself;
this.jobScheduler = jobScheduler;
Expand All @@ -76,7 +76,7 @@ CompletableFuture<Boolean> waitUntilCaughtUpMember( RaftMachine raft )
Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier );

jobScheduler.schedule( new JobScheduler.Group( getClass().toString(), POOLED ), () -> {
while ( waiter.keepWaiting(raftState) )
while ( waiter.keepWaiting( raft.state() ) )
{
waiter.waitMore();
}
Expand Down Expand Up @@ -107,6 +107,7 @@ private Evaluator( RaftMachine raft, CompletableFuture<Boolean> catchUpFuture,
this.dbHealthSupplier = dbHealthSupplier;
}

@Override
public void run()
{
if ( !dbHealthSupplier.get().isHealthy() )
Expand Down Expand Up @@ -140,12 +141,6 @@ private boolean caughtUpWithLeader()
if ( lastLeaderCommit != -1 )
{
caughtUpWithLeader = localCommit >= lastLeaderCommit;
log.info( "%s Caught up?: %b %d => %d (%d behind)%n", myself, caughtUpWithLeader, localCommit,
lastLeaderCommit, lastLeaderCommit - localCommit );
}
lastLeaderCommit = state.leaderCommit();
if ( lastLeaderCommit != -1 )
{
long gap = lastLeaderCommit - localCommit;
log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap );
}
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;
package org.neo4j.causalclustering.core.consensus.membership;

public interface Sleeper
{
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;
package org.neo4j.causalclustering.core.consensus.membership;

public class ThreadSleeper implements Sleeper
{
Expand Down
Expand Up @@ -41,8 +41,10 @@
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.RaftServer;
import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler;
import org.neo4j.causalclustering.core.consensus.membership.LeaderCommitWaiter;
import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiter;
import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiterLifecycle;
import org.neo4j.causalclustering.core.consensus.membership.ThreadSleeper;
import org.neo4j.causalclustering.core.state.ClusteringModule;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.core.state.CoreState;
Expand Down Expand Up @@ -181,7 +183,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

MembershipWaiter membershipWaiter =
new MembershipWaiter( identityModule.myself(), jobScheduler, dbHealthSupplier, electionTimeout * 4,
logProvider );
new LeaderCommitWaiter( new ThreadSleeper() ), logProvider );
long joinCatchupTimeout = config.get( CausalClusteringSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftMachine(), logProvider );
Expand Down
Expand Up @@ -19,14 +19,13 @@
*/
package org.neo4j.causalclustering.core.consensus.membership;

import java.util.HashSet;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;

import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
Expand All @@ -38,10 +37,8 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.core.consensus.ReplicatedInteger.valueOf;
import static org.neo4j.causalclustering.identity.RaftTestMember.member;
Expand All @@ -61,7 +58,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500,
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );
new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
Expand All @@ -86,23 +83,22 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception
public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter<RaftTestMember> waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 500,
NullLogProvider.getInstance(), new LeaderCommitWaiter<>( new NoSleepSleeper() ) );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500,
new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() );

InMemoryRaftLog raftLog = new InMemoryRaftLog();
raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
raftLog.commit( 0 );

RaftState<RaftTestMember> raftState = mock( RaftState.class );
when( raftState.leaderCommit() ).thenReturn( -1L, -1L, 0L, 0L );

when( raftState.entryLog() ).thenReturn( raftLog );
ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ) )
.leaderCommit( 0 )
.entryLog( raftLog )
.commitIndex( 0L )
.build().copy();

HashSet<RaftTestMember> members = new HashSet<>();
members.add( member( 0 ) );
when( raftState.votingMembers() ).thenReturn( members );
RaftMachine raft = mock( RaftMachine.class );
when( raft.state() ).thenReturn( raftState );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();
jobScheduler.runJob();

Expand All @@ -113,8 +109,9 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception
public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new
LeaderCommitWaiter( new NoSleepSleeper() ),
NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 1 ) )
Expand Down Expand Up @@ -143,8 +140,9 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception
public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new
LeaderCommitWaiter( new NoSleepSleeper() ),
NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.votingMembers( member( 0 ), member( 1 ) )
Expand Down Expand Up @@ -173,14 +171,17 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception
public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception
{
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
MembershipWaiter<RaftTestMember> waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 1,
NullLogProvider.getInstance(), new LimitedTriesLeaderCommitWaiter<>( new NoSleepSleeper(), 1 ) );
MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1,
new LimitedTriesLeaderCommitWaiter( new NoSleepSleeper(), 1 ), NullLogProvider.getInstance() );

ExposedRaftState raftState = RaftStateBuilder.raftState()
.leaderCommit( -1 )
.build().copy();

RaftState<RaftTestMember> raftState = RaftStateBuilder.raftState()
.leaderCommit( -1L )
.build();
RaftMachine raft = mock( RaftMachine.class );
when(raft.state()).thenReturn( raftState );

CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raftState );
CompletableFuture<Boolean> future = waiter.waitUntilCaughtUpMember( raft );
jobScheduler.runJob();

try
Expand All @@ -203,7 +204,7 @@ public void sleep( long millis )
}
}

class LimitedTriesLeaderCommitWaiter<MEMBER> extends LeaderCommitWaiter<MEMBER>
class LimitedTriesLeaderCommitWaiter extends LeaderCommitWaiter
{
private int attempts;

Expand All @@ -220,7 +221,7 @@ public void waitMore()
}

@Override
public boolean keepWaiting( ReadableRaftState<MEMBER> raftState )
public boolean keepWaiting( ExposedRaftState raftState )
{
return super.keepWaiting( raftState ) && attempts > 0;
}
Expand Down
Empty file.

0 comments on commit 7636ea1

Please sign in to comment.