From 7270fd9dc2c818eb13609f6bf3f6e079e1b8902b Mon Sep 17 00:00:00 2001 From: Hugo Firth Date: Thu, 19 Apr 2018 08:33:01 +0100 Subject: [PATCH] Address review feedback --- .../core/consensus/LeaderInfo.java | 16 ++++++++-------- .../core/consensus/RaftMachine.java | 15 +++------------ .../discovery/CoreTopologyService.java | 6 +++--- .../discovery/HazelcastClusterTopology.java | 17 ++++++++++++----- .../discovery/HazelcastCoreTopologyService.java | 6 ++---- .../causalclustering/discovery/Cluster.java | 4 +++- .../discovery/SharedDiscoveryCoreClient.java | 6 +++--- .../discovery/SharedDiscoveryService.java | 8 ++++++-- .../scenarios/ClusterOverviewIT.java | 1 - 9 files changed, 40 insertions(+), 39 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java index d58c351c0bc0d..515e2cae9c587 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java @@ -32,23 +32,23 @@ public class LeaderInfo implements Serializable private final MemberId memberId; private final long term; - private boolean isStepDown; + private boolean isSteppingDown; public LeaderInfo( MemberId memberId, long term ) { - this.memberId = memberId; - this.term = term; - this.isStepDown = false; + this( memberId, term, false ); } - public void stepDown() + public LeaderInfo( MemberId memberId, long term, boolean isSteppingDown ) { - this.isStepDown = true; + this.memberId = memberId; + this.term = term; + this.isSteppingDown = isSteppingDown; } - public boolean isStepDown() + public boolean isSteppingDown() { - return isStepDown; + return isSteppingDown; } public MemberId memberId() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index b0579c74790bd..24e2577045c5d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -213,17 +213,11 @@ public synchronized ExposedRaftState state() private void notifyLeaderChanges( Outcome outcome ) { - LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() ); - boolean isStepDown = outcome.isSteppingDown(); - - if ( isStepDown ) - { - leaderInfo.stepDown(); - } + LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm(), outcome.isSteppingDown() ); for ( LeaderListener listener : leaderListeners ) { - if ( isStepDown ) + if ( outcome.isSteppingDown() ) { listener.onLeaderStepDown( leaderInfo ); } @@ -251,10 +245,7 @@ else if ( outcome.isSteppingDown() ) private boolean leaderChanged( Outcome outcome, MemberId oldLeader ) { - Optional oldLeaderOpt = Optional.ofNullable( oldLeader ); - Optional outcomeLeaderOpt = Optional.ofNullable( outcome.getLeader() ); - - return !oldLeaderOpt.equals( outcomeLeaderOpt ); + return !Objects.equals( oldLeader, outcome.getLeader() ); } public synchronized ConsensusOutcome handle( RaftMessages.RaftMessage incomingMessage ) throws IOException diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java index df811849917cd..d2df4eb40e9ee 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java @@ -56,10 +56,10 @@ public interface CoreTopologyService extends TopologyService * Set the leader memberId to null for a given database (i.e. Raft consensus group). * This is intended to trigger state cleanup for informational procedures like {@link ClusterOverviewProcedure} * - * @param leaderInfo - * @param dbName + * @param stepDownLeaderInfo Information about the stepdown event, including term + * @param dbName The database for which this topology member should handle a stepdown */ - void handleStepDown( LeaderInfo leaderInfo, String dbName ); + void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName ); interface Listener { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index e84615bcf405b..52ed78b57d2e1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -216,18 +216,25 @@ static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderIn { IAtomicReference leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); - LeaderInfo expected = leaderRef.get(); + LeaderInfo current = leaderRef.get(); + Optional currentOpt = Optional.ofNullable( current ); - boolean noUpdate = Optional.ofNullable( expected ).map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); + boolean noUpdate = currentOpt.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); - boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= leaderInfo.term() ).orElse( false ); + int termComparison = currentOpt.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 ); - if ( (greaterOrEqualTermExists || noUpdate) && !leaderInfo.isStepDown() ) + boolean greaterTermExists = termComparison > 0; + + boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() ); + + boolean success = !( invalidTerm || noUpdate); + + if ( !success ) { return; } - leaderRef.compareAndSet( expected, leaderInfo ); + leaderRef.compareAndSet( current, leaderInfo ); } private static Optional getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 8d45a28a62e7b..c0298050cabac 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -71,7 +71,6 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem private static final long HAZELCAST_IS_HEALTHY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis( 10 ); private static final int HAZELCAST_MIN_CLUSTER = 2; - private final int minimumConsensusSize; private final Config config; private final MemberId myself; private final Log log; @@ -101,7 +100,6 @@ public HazelcastCoreTopologyService( Config config, MemberId myself, JobSchedule TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { this.config = config; - this.minimumConsensusSize = config.get( CausalClusteringSettings.minimum_core_cluster_size_at_runtime ); this.myself = myself; this.listenerService = new CoreTopologyListenerService(); this.log = logProvider.getLog( getClass() ); @@ -143,12 +141,12 @@ public void setLeader( LeaderInfo leaderInfo, String dbName ) } @Override - public void handleStepDown( LeaderInfo leaderInfo, String dbName ) + public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName ) { boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() ); if ( wasPreviousLeader ) { - HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, dbName ); + HazelcastClusterTopology.casLeaders( hazelcastInstance, stepDownLeaderInfo, dbName ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java index 9638d014df3d1..a39e469d0a0ae 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java @@ -395,7 +395,9 @@ public List getAllMembersWithAnyRole( String dbName, Role... Set roleSet = Arrays.stream( roles ).collect( toSet() ); return coreMembers.values().stream() - .filter( m -> m.database() != null && m.dbName().equals( dbName ) && roleSet.contains( m.database().getRole() ) ) + .filter( m -> m.database() != null ) + .filter( m -> m.dbName().equals( dbName ) ) + .filter( m -> roleSet.contains( m.database().getRole() ) ) .collect( Collectors.toList() ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index 25b807be41cc7..68977d08be08b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -91,7 +91,7 @@ public Map allCoreRoles() @Override public void setLeader( LeaderInfo newLeader, String dbName ) { - if ( this.leaderInfo.term() < newLeader.term() ) + if ( this.leaderInfo.term() < newLeader.term() && newLeader.memberId() != null ) { this.leaderInfo = newLeader; sharedDiscoveryService.casLeaders( newLeader, localDBName ); @@ -143,12 +143,12 @@ public CoreTopology allCoreServers() } @Override - public void handleStepDown( LeaderInfo leaderInfo, String dbName ) + public void handleStepDown( LeaderInfo stepDownLeaderInfo, String dbName ) { boolean wasPreviousLeader = myself.equals( this.leaderInfo.memberId() ); if ( wasPreviousLeader ) { - sharedDiscoveryService.casLeaders( leaderInfo, dbName ); + sharedDiscoveryService.casLeaders( stepDownLeaderInfo, dbName ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index 5a736c34bdcff..30c8ac298f74c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -134,9 +134,13 @@ void casLeaders( LeaderInfo leaderInfo, String dbName ) boolean noUpdate = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); - boolean greaterOrEqualTermExists = current.map( l -> l.term() >= leaderInfo.term() ).orElse( false ); + int termComparison = current.map( l -> Long.compare( l.term(), leaderInfo.term() ) ).orElse( -1 ); - boolean success = !(greaterOrEqualTermExists || noUpdate) || leaderInfo.isStepDown(); + boolean greaterTermExists = termComparison > 0; + + boolean invalidTerm = greaterTermExists || ( termComparison == 0 && !leaderInfo.isSteppingDown() ); + + boolean success = !( invalidTerm || noUpdate); if ( success ) { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java index 7ca740e73a49a..587cfc4cc9ce6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java @@ -277,7 +277,6 @@ public void shouldHaveNoLeaderIfMajorityCoreMembersDead() throws Exception clusterRule.withNumberOfCoreMembers( 2 ); Cluster cluster = clusterRule.startCluster(); - //TODO: address proliferation of Role enums List followers = cluster.getAllMembersWithRole( Role.FOLLOWER ); CoreClusterMember leader = cluster.getMemberWithRole( Role.LEADER ); followers.forEach( CoreClusterMember::shutdown );