From 8083c90ae73e66509460fa7ed2dbcd8e70fb1d2d Mon Sep 17 00:00:00 2001 From: Max Sumrall Date: Mon, 31 Oct 2016 09:55:54 +0100 Subject: [PATCH] Have one check on increasing delay rather than two checks on different intervals. --- .../membership/LeaderCommitWaiter.java | 42 --------------- .../membership/MembershipWaiter.java | 33 ++++++------ .../core/consensus/membership/Sleeper.java | 25 --------- .../consensus/membership/ThreadSleeper.java | 36 ------------- .../core/server/CoreServerModule.java | 4 +- .../membership/MembershipWaiterTest.java | 51 ++----------------- 6 files changed, 23 insertions(+), 168 deletions(-) delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java deleted file mode 100644 index f28ac64690050..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.core.consensus.membership; - -import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; - -public class LeaderCommitWaiter -{ - private Sleeper sleeper; - - public LeaderCommitWaiter( Sleeper sleeper ) - { - this.sleeper = sleeper; - } - - public void waitMore() - { - sleeper.sleep( 100 ); - } - - public boolean keepWaiting( ExposedRaftState raftState ) - { - return raftState.leaderCommit() == -1; - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java index 8f601801bb7cb..aabe27805f2a0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java @@ -32,6 +32,7 @@ import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; /** @@ -55,17 +56,17 @@ public class MembershipWaiter private final JobScheduler jobScheduler; private final Supplier dbHealthSupplier; private final long maxCatchupLag; - private LeaderCommitWaiter waiter; + private long currentCatchupDelayInMs; private final Log log; public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier dbHealthSupplier, - long maxCatchupLag, LeaderCommitWaiter leaderCommitWaiter, LogProvider logProvider ) + long maxCatchupLag, LogProvider logProvider ) { this.myself = myself; this.jobScheduler = jobScheduler; this.dbHealthSupplier = dbHealthSupplier; this.maxCatchupLag = maxCatchupLag; - this.waiter = leaderCommitWaiter; + this.currentCatchupDelayInMs = maxCatchupLag; this.log = logProvider.getLog( getClass() ); } @@ -75,17 +76,11 @@ CompletableFuture waitUntilCaughtUpMember( RaftMachine raft ) Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); - jobScheduler.schedule( new JobScheduler.Group( getClass().toString(), POOLED ), () -> { - while ( waiter.keepWaiting( raft.state() ) ) - { - waiter.waitMore(); - } - JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring( - new JobScheduler.Group( getClass().toString(), POOLED ), - evaluator, maxCatchupLag, MILLISECONDS ); + JobScheduler.JobHandle jobHandle = jobScheduler.schedule( + new JobScheduler.Group( getClass().toString(), POOLED ), + evaluator, currentCatchupDelayInMs, MILLISECONDS ); - catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); - } ); + catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); return catchUpFuture; } @@ -118,6 +113,13 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() ) { catchUpFuture.complete( true ); } + else + { + currentCatchupDelayInMs += SECONDS.toMillis( 1 ); + long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag; + jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString(), POOLED ), this, + longerDelay, MILLISECONDS ); + } } private boolean iAmAVotingMember() @@ -136,11 +138,11 @@ private boolean caughtUpWithLeader() boolean caughtUpWithLeader = false; ExposedRaftState state = raft.state(); - long localCommit = state.commitIndex(); + lastLeaderCommit = state.leaderCommit(); if ( lastLeaderCommit != -1 ) { - caughtUpWithLeader = localCommit >= lastLeaderCommit; + caughtUpWithLeader = localCommit == lastLeaderCommit; long gap = lastLeaderCommit - localCommit; log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap ); } @@ -148,7 +150,6 @@ private boolean caughtUpWithLeader() { log.info( "Leader commit unknown" ); } - return caughtUpWithLeader; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java deleted file mode 100644 index 9c0a2d7b30ecc..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.core.consensus.membership; - -public interface Sleeper -{ - void sleep( long millis ); -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java deleted file mode 100644 index 5118c6d8b2edf..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.core.consensus.membership; - -public class ThreadSleeper implements Sleeper -{ - @Override - public void sleep( long millis ) - { - try - { - Thread.sleep( millis ); - } - catch ( InterruptedException ignored ) - { - // ignore me - } - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index b3bd8ec1ea2df..cf59a5625d676 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -41,10 +41,8 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftServer; import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler; -import org.neo4j.causalclustering.core.consensus.membership.LeaderCommitWaiter; import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiter; import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiterLifecycle; -import org.neo4j.causalclustering.core.consensus.membership.ThreadSleeper; import org.neo4j.causalclustering.core.state.ClusteringModule; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CoreState; @@ -183,7 +181,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data MembershipWaiter membershipWaiter = new MembershipWaiter( identityModule.myself(), jobScheduler, dbHealthSupplier, electionTimeout * 4, - new LeaderCommitWaiter( new ThreadSleeper() ), logProvider ); + logProvider ); long joinCatchupTimeout = config.get( CausalClusteringSettings.join_catch_up_timeout ); membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, consensusModule.raftMachine(), logProvider ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java index b7557e0053bab..fc94f65540aa0 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java @@ -57,8 +57,7 @@ public void mocking() public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, - new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -83,8 +82,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, - new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -100,7 +98,6 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); - jobScheduler.runJob(); future.get( 1, TimeUnit.SECONDS ); } @@ -109,9 +106,7 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception public void shouldTimeoutIfCaughtUpButNotMember() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new - LeaderCommitWaiter( new NoSleepSleeper() ), - NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() ); ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) @@ -140,9 +135,7 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new - LeaderCommitWaiter( new NoSleepSleeper() ), - NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() ); ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) ) @@ -171,8 +164,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, - new LimitedTriesLeaderCommitWaiter( new NoSleepSleeper(), 1 ), NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() ); ExposedRaftState raftState = RaftStateBuilder.raftState() .leaderCommit( -1 ) @@ -194,37 +186,4 @@ public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception // expected } } - - private class NoSleepSleeper implements Sleeper - { - @Override - public void sleep( long millis ) - { - // no need to sleep, only for tests - } - } - - class LimitedTriesLeaderCommitWaiter extends LeaderCommitWaiter - { - private int attempts; - - public LimitedTriesLeaderCommitWaiter( Sleeper sleeper, int attempts ) - { - super( sleeper ); - this.attempts = attempts; - } - - @Override - public void waitMore() - { - attempts--; - } - - @Override - public boolean keepWaiting( ExposedRaftState raftState ) - { - return super.keepWaiting( raftState ) && attempts > 0; - } - } - }