diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/LeaderCommitWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java similarity index 80% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/LeaderCommitWaiter.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java index 5aca166ddf9dc..f28ac64690050 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/LeaderCommitWaiter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/LeaderCommitWaiter.java @@ -17,12 +17,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.raft.membership; +package org.neo4j.causalclustering.core.consensus.membership; -import org.neo4j.coreedge.catchup.Sleeper; -import org.neo4j.coreedge.raft.state.ReadableRaftState; +import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; -public class LeaderCommitWaiter +public class LeaderCommitWaiter { private Sleeper sleeper; @@ -36,7 +35,7 @@ public void waitMore() sleeper.sleep( 100 ); } - public boolean keepWaiting( ReadableRaftState raftState ) + public boolean keepWaiting( ExposedRaftState raftState ) { return raftState.leaderCommit() == -1; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java index f0ce83d2c6bf4..8f601801bb7cb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java @@ -55,11 +55,11 @@ public class MembershipWaiter private final JobScheduler jobScheduler; private final Supplier dbHealthSupplier; private final long maxCatchupLag; - private LeaderCommitWaiter waiter; + private LeaderCommitWaiter waiter; private final Log log; public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier dbHealthSupplier, - long maxCatchupLag, LeaderCommitWaiter leaderCommitWaiter, LogProvider logProvider ) + long maxCatchupLag, LeaderCommitWaiter leaderCommitWaiter, LogProvider logProvider ) { this.myself = myself; this.jobScheduler = jobScheduler; @@ -76,7 +76,7 @@ CompletableFuture waitUntilCaughtUpMember( RaftMachine raft ) Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); jobScheduler.schedule( new JobScheduler.Group( getClass().toString(), POOLED ), () -> { - while ( waiter.keepWaiting(raftState) ) + while ( waiter.keepWaiting( raft.state() ) ) { waiter.waitMore(); } @@ -107,6 +107,7 @@ private Evaluator( RaftMachine raft, CompletableFuture catchUpFuture, this.dbHealthSupplier = dbHealthSupplier; } + @Override public void run() { if ( !dbHealthSupplier.get().isHealthy() ) @@ -140,12 +141,6 @@ private boolean caughtUpWithLeader() if ( lastLeaderCommit != -1 ) { caughtUpWithLeader = localCommit >= lastLeaderCommit; - log.info( "%s Caught up?: %b %d => %d (%d behind)%n", myself, caughtUpWithLeader, localCommit, - lastLeaderCommit, lastLeaderCommit - localCommit ); - } - lastLeaderCommit = state.leaderCommit(); - if ( lastLeaderCommit != -1 ) - { long gap = lastLeaderCommit - localCommit; log.info( "%s Catchup: %d => %d (%d behind)", myself, localCommit, lastLeaderCommit, gap ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Sleeper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java similarity index 93% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Sleeper.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java index f3ae9cd826334..9c0a2d7b30ecc 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Sleeper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/Sleeper.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.catchup; +package org.neo4j.causalclustering.core.consensus.membership; public interface Sleeper { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ThreadSleeper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java similarity index 94% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ThreadSleeper.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java index b43517053a2ce..5118c6d8b2edf 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ThreadSleeper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/ThreadSleeper.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.catchup; +package org.neo4j.causalclustering.core.consensus.membership; public class ThreadSleeper implements Sleeper { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index cf59a5625d676..b3bd8ec1ea2df 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -41,8 +41,10 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftServer; import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler; +import org.neo4j.causalclustering.core.consensus.membership.LeaderCommitWaiter; import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiter; import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiterLifecycle; +import org.neo4j.causalclustering.core.consensus.membership.ThreadSleeper; import org.neo4j.causalclustering.core.state.ClusteringModule; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CoreState; @@ -181,7 +183,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data MembershipWaiter membershipWaiter = new MembershipWaiter( identityModule.myself(), jobScheduler, dbHealthSupplier, electionTimeout * 4, - logProvider ); + new LeaderCommitWaiter( new ThreadSleeper() ), logProvider ); long joinCatchupTimeout = config.get( CausalClusteringSettings.join_catch_up_timeout ); membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, consensusModule.raftMachine(), logProvider ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java index 8fe8e308f7586..b7557e0053bab 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiterTest.java @@ -19,14 +19,13 @@ */ package org.neo4j.causalclustering.core.consensus.membership; -import java.util.HashSet; +import org.junit.Before; +import org.junit.Test; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.junit.Before; -import org.junit.Test; - import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; @@ -38,10 +37,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; - import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.core.consensus.ReplicatedInteger.valueOf; import static org.neo4j.causalclustering.identity.RaftTestMember.member; @@ -61,7 +58,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, - new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() ); + new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -86,23 +83,22 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 500, - NullLogProvider.getInstance(), new LeaderCommitWaiter<>( new NoSleepSleeper() ) ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 500, + new LeaderCommitWaiter( new NoSleepSleeper() ), NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); - raftLog.commit( 0 ); - - RaftState raftState = mock( RaftState.class ); - when( raftState.leaderCommit() ).thenReturn( -1L, -1L, 0L, 0L ); - - when( raftState.entryLog() ).thenReturn( raftLog ); + ExposedRaftState raftState = RaftStateBuilder.raftState() + .votingMembers( member( 0 ) ) + .leaderCommit( 0 ) + .entryLog( raftLog ) + .commitIndex( 0L ) + .build().copy(); - HashSet members = new HashSet<>(); - members.add( member( 0 ) ); - when( raftState.votingMembers() ).thenReturn( members ); + RaftMachine raft = mock( RaftMachine.class ); + when( raft.state() ).thenReturn( raftState ); - CompletableFuture future = waiter.waitUntilCaughtUpMember( raftState ); + CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); jobScheduler.runJob(); @@ -113,8 +109,9 @@ public void shouldWaitUntilLeaderCommitIsAvailable() throws Exception public void shouldTimeoutIfCaughtUpButNotMember() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, - new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new + LeaderCommitWaiter( new NoSleepSleeper() ), + NullLogProvider.getInstance() ); ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) @@ -143,8 +140,9 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, - new LeaderCommitWaiter<>( new NoSleepSleeper() ), NullLogProvider.getInstance() ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, new + LeaderCommitWaiter( new NoSleepSleeper() ), + NullLogProvider.getInstance() ); ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) ) @@ -173,14 +171,17 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception public void shouldTimeoutIfLeaderCommitIsNeverKnown() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - MembershipWaiter waiter = new MembershipWaiter<>( member( 0 ), jobScheduler, 1, - NullLogProvider.getInstance(), new LimitedTriesLeaderCommitWaiter<>( new NoSleepSleeper(), 1 ) ); + MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, + new LimitedTriesLeaderCommitWaiter( new NoSleepSleeper(), 1 ), NullLogProvider.getInstance() ); + + ExposedRaftState raftState = RaftStateBuilder.raftState() + .leaderCommit( -1 ) + .build().copy(); - RaftState raftState = RaftStateBuilder.raftState() - .leaderCommit( -1L ) - .build(); + RaftMachine raft = mock( RaftMachine.class ); + when(raft.state()).thenReturn( raftState ); - CompletableFuture future = waiter.waitUntilCaughtUpMember( raftState ); + CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); try @@ -203,7 +204,7 @@ public void sleep( long millis ) } } - class LimitedTriesLeaderCommitWaiter extends LeaderCommitWaiter + class LimitedTriesLeaderCommitWaiter extends LeaderCommitWaiter { private int attempts; @@ -220,7 +221,7 @@ public void waitMore() } @Override - public boolean keepWaiting( ReadableRaftState raftState ) + public boolean keepWaiting( ExposedRaftState raftState ) { return super.keepWaiting( raftState ) && attempts > 0; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java deleted file mode 100644 index e69de29bb2d1d..0000000000000