diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java index a81f248631368..88ed0be669dad 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java @@ -37,6 +37,7 @@ import org.neo4j.coreedge.core.consensus.roles.Role; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.coreedge.core.consensus.shipping.RaftLogShippingManager; +import org.neo4j.coreedge.core.consensus.state.ExposedRaftState; import org.neo4j.coreedge.core.consensus.state.RaftState; import org.neo4j.coreedge.core.consensus.state.ReadableRaftState; import org.neo4j.coreedge.core.consensus.term.TermState; @@ -163,7 +164,7 @@ public synchronized RaftCoreState coreState() return new RaftCoreState( membershipManager.getCommitted() ); } - public void installCoreState( RaftCoreState coreState ) throws IOException + public synchronized void installCoreState( RaftCoreState coreState ) throws IOException { membershipManager.install( coreState.committed() ); } @@ -204,7 +205,7 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) thro } } - public void setTargetMembershipSet( Set targetMembers ) + public synchronized void setTargetMembershipSet( Set targetMembers ) { membershipManager.setTargetMembershipSet( targetMembers ); @@ -255,9 +256,14 @@ public synchronized void unregisterListener( Listener listener ) leaderListeners.remove( listener ); } - public ReadableRaftState state() + /** + * Every call to state() gives you an immutable copy of the current state. + * + * @return A fresh view of the state. + */ + public synchronized ExposedRaftState state() { - return state; + return state.copy(); } private void notifyLeaderChanges( Outcome outcome ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiter.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiter.java index 32b5025672182..b10652c0affaa 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiter.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiter.java @@ -23,7 +23,8 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import org.neo4j.coreedge.core.consensus.state.ReadableRaftState; +import org.neo4j.coreedge.core.consensus.RaftMachine; +import org.neo4j.coreedge.core.consensus.state.ExposedRaftState; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.internal.DatabaseHealth; @@ -31,7 +32,6 @@ import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; /** @@ -67,11 +67,11 @@ public MembershipWaiter( MemberId myself, JobScheduler jobScheduler, Supplier waitUntilCaughtUpMember( ReadableRaftState raftState ) + CompletableFuture waitUntilCaughtUpMember( RaftMachine raft ) { CompletableFuture catchUpFuture = new CompletableFuture<>(); - Evaluator evaluator = new Evaluator( raftState, catchUpFuture, dbHealthSupplier ); + Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring( new JobScheduler.Group( getClass().toString(), POOLED ), @@ -84,18 +84,18 @@ CompletableFuture waitUntilCaughtUpMember( ReadableRaftState raftState private class Evaluator implements Runnable { - private final ReadableRaftState raftState; + private final RaftMachine raft; private final CompletableFuture catchUpFuture; private long lastLeaderCommit; private final Supplier dbHealthSupplier; - private Evaluator( ReadableRaftState raftState, CompletableFuture catchUpFuture, + private Evaluator( RaftMachine raft, CompletableFuture catchUpFuture, Supplier dbHealthSupplier ) { - this.raftState = raftState; + this.raft = raft; this.catchUpFuture = catchUpFuture; - this.lastLeaderCommit = raftState.leaderCommit(); + this.lastLeaderCommit = raft.state().leaderCommit(); this.dbHealthSupplier = dbHealthSupplier; } @@ -113,7 +113,7 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() ) private boolean iAmAVotingMember() { - Set votingMembers = raftState.votingMembers(); + Set votingMembers = raft.state().votingMembers(); boolean votingMember = votingMembers.contains( myself ); if ( !votingMember ) { @@ -126,12 +126,14 @@ private boolean caughtUpWithLeader() { boolean caughtUpWithLeader = false; - long localCommit = raftState.commitIndex(); + ExposedRaftState state = raft.state(); + + long localCommit = state.commitIndex(); if ( lastLeaderCommit != -1 ) { caughtUpWithLeader = localCommit >= lastLeaderCommit; } - lastLeaderCommit = raftState.leaderCommit(); + lastLeaderCommit = state.leaderCommit(); if ( lastLeaderCommit != -1 ) { long gap = lastLeaderCommit - localCommit; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterLifecycle.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterLifecycle.java index c4667ed97d18c..b94152ced93ff 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterLifecycle.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterLifecycle.java @@ -50,7 +50,7 @@ public MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCa @Override public void start() throws Throwable { - CompletableFuture caughtUp = membershipWaiter.waitUntilCaughtUpMember( raft.state() ); + CompletableFuture caughtUp = membershipWaiter.waitUntilCaughtUpMember( raft ); try { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java index 0a2ec6954c2a7..3b597067cbfb8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.time.Clock; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.function.LongSupplier; @@ -65,8 +66,8 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe private final int expectedClusterSize; - private volatile Set votingMembers = new HashSet<>(); - private volatile Set replicationMembers = new HashSet<>(); // votingMembers + additionalReplicationMembers + private volatile Set votingMembers = Collections.unmodifiableSet( new HashSet<>() ); + private volatile Set replicationMembers = Collections.unmodifiableSet( new HashSet<>() ); // votingMembers + additionalReplicationMembers private Set listeners = new HashSet<>(); private Set additionalReplicationMembers = new HashSet<>(); @@ -137,12 +138,12 @@ private Set missingMembers() */ private void updateMemberSets() { - votingMembers = state.getLatest(); + votingMembers = Collections.unmodifiableSet( state.getLatest() ); HashSet newReplicationMembers = new HashSet<>( votingMembers ); newReplicationMembers.addAll( additionalReplicationMembers ); - replicationMembers = newReplicationMembers; + replicationMembers = Collections.unmodifiableSet( newReplicationMembers ); listeners.forEach( Listener::onMembershipChanged ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/ExposedRaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/ExposedRaftState.java new file mode 100644 index 0000000000000..78d71690d6b02 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/ExposedRaftState.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.core.consensus.state; + +import java.util.Set; + +import org.neo4j.coreedge.identity.MemberId; + +public interface ExposedRaftState +{ + long leaderCommit(); + + long commitIndex(); + + long appendIndex(); + + long term(); + + Set votingMembers(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java index 2acb87d330d09..2b36737408028 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java @@ -207,4 +207,47 @@ private void logIfLeaderChanged( MemberId leader ) log.info( "Leader changed from %s to %s", this.leader, leader ); } } + + public ExposedRaftState copy() + { + return new ExposedRaftState() + { + final long leaderCommit = RaftState.this.leaderCommit(); + final long commitIndex = RaftState.this.commitIndex(); + final long appendIndex = RaftState.this.entryLog().appendIndex(); + final long term = RaftState.this.term(); + + final Set votingMembers = RaftState.this.votingMembers(); // returned set is never mutated + + @Override + public long leaderCommit() + { + return this.leaderCommit; + } + + @Override + public long commitIndex() + { + return this.commitIndex; + } + + @Override + public long appendIndex() + { + return this.appendIndex; + } + + @Override + public long term() + { + return this.term; + } + + @Override + public Set votingMembers() + { + return this.votingMembers; + } + }; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java index 698167c4cb50b..4b88aff6ec86e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java @@ -176,7 +176,7 @@ public void start() throws Throwable private boolean haveState() { - return raftMachine.state().entryLog().appendIndex() > -1; + return raftMachine.state().appendIndex() > -1; } @Override diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterTest.java index 075154bb9db10..ed982503dc468 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/membership/MembershipWaiterTest.java @@ -25,9 +25,10 @@ import org.junit.Before; import org.junit.Test; +import org.neo4j.coreedge.core.consensus.RaftMachine; import org.neo4j.coreedge.core.consensus.log.InMemoryRaftLog; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; -import org.neo4j.coreedge.core.consensus.state.RaftState; +import org.neo4j.coreedge.core.consensus.state.ExposedRaftState; import org.neo4j.coreedge.core.consensus.state.RaftStateBuilder; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.logging.NullLogProvider; @@ -62,14 +63,17 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); - RaftState raftState = RaftStateBuilder.raftState() + ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ) ) .leaderCommit( 0 ) .entryLog( raftLog ) .commitIndex( 0L ) - .build(); + .build().copy(); - CompletableFuture future = waiter.waitUntilCaughtUpMember( raftState ); + RaftMachine raft = mock( RaftMachine.class ); + when( raft.state() ).thenReturn( raftState ); + + CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); future.get( 0, NANOSECONDS ); @@ -82,12 +86,15 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() ); - RaftState raftState = RaftStateBuilder.raftState() + ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) .leaderCommit( 0 ) - .build(); + .build().copy(); + + RaftMachine raft = mock( RaftMachine.class ); + when( raft.state() ).thenReturn( raftState ); - CompletableFuture future = waiter.waitUntilCaughtUpMember( raftState ); + CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); jobScheduler.runJob(); @@ -109,12 +116,15 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, () -> dbHealth, 1, NullLogProvider.getInstance() ); - RaftState raftState = RaftStateBuilder.raftState() + ExposedRaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) ) .leaderCommit( 0 ) - .build(); + .build().copy(); + + RaftMachine raft = mock( RaftMachine.class ); + when( raft.state() ).thenReturn( raftState ); - CompletableFuture future = waiter.waitUntilCaughtUpMember( raftState ); + CompletableFuture future = waiter.waitUntilCaughtUpMember( raft ); jobScheduler.runJob(); jobScheduler.runJob();