From 800e17c17f34fd5131dd5cd2b01096571955f64e Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Wed, 14 Dec 2016 17:00:59 +0000 Subject: [PATCH] more logging to make it easier to see what's happening in membership land --- .../membership/RaftMembershipManager.java | 30 +++- .../discovery/CoreTopology.java | 79 ++++++++++- .../HazelcastCoreTopologyService.java | 19 ++- .../discovery/ReadReplicaTopology.java | 27 +++- .../discovery/CoreTopologyTest.java | 130 ++++++++++++++++++ .../discovery/SharedDiscoveryService.java | 3 +- .../scenarios/CoreReplicationIT.java | 4 + 7 files changed, 282 insertions(+), 10 deletions(-) create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipManager.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipManager.java index a545d09ae13c..8a6fef7267fd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipManager.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipManager.java @@ -113,9 +113,15 @@ public void start() throws IOException public void setTargetMembershipSet( Set targetMembers ) { + boolean targetMembershipChanged = !targetMembers.equals( this.targetMembers ); + this.targetMembers = new HashSet<>( targetMembers ); - log.info( "Target membership: " + targetMembers ); + if ( targetMembershipChanged ) + { + log.info( "Target membership: " + targetMembers ); + } + membershipChanger.onTargetChanged( targetMembers ); checkForStartCondition(); @@ -175,7 +181,20 @@ void removeAdditionalReplicationMember( MemberId member ) private boolean isSafeToRemoveMember() { - return votingMembers() != null && votingMembers().size() > expectedClusterSize; + Set votingMembers = votingMembers(); + boolean safeToRemoveMember = votingMembers != null && votingMembers.size() > expectedClusterSize; + + if ( !safeToRemoveMember ) + { + Set membersToRemove = superfluousMembers(); + + log.info( "Not safe to remove %s %s because it would reduce the number of voting members below the expected " + + "cluster size of %d. Voting members: %s", + membersToRemove.size() > 1 ? "members" : "member", + membersToRemove, expectedClusterSize, votingMembers ); + } + + return safeToRemoveMember; } private Set superfluousMembers() @@ -196,7 +215,7 @@ private void checkForStartCondition() { membershipChanger.onMissingMember( first( missingMembers() ) ); } - else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 ) + else if ( superfluousMembers().size() > 0 && isSafeToRemoveMember( ) ) { membershipChanger.onSuperfluousMember( first( superfluousMembers() ) ); } @@ -209,6 +228,7 @@ else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 ) */ void doConsensus( Set newVotingMemberSet ) { + log.info( "Getting consensus on new voting member set %s", newVotingMemberSet ); sendToMyself.replicate( memberSetBuilder.build( newVotingMemberSet ) ); } @@ -288,12 +308,14 @@ public void append( long baseIndex, RaftLogEntry... entries ) throws IOException if ( state.append( baseIndex, new HashSet<>( raftGroup.getMembers() ) ) ) { + log.info( "Appending new member set %s", state ); storage.persistStoreData( state ); updateMemberSets(); } else { - log.warn( "Appending member set was ignored. Current state: %s, Appended set: %s, Log index: %d%n", state, raftGroup, baseIndex ); + log.warn( "Appending member set was ignored. Current state: %s, Appended set: %s, Log index: %d%n", + state, raftGroup, baseIndex ); } } baseIndex++; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index 5c13522af7c7..86f75d15574b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -22,12 +22,19 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.collection.Pair; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toSet; public class CoreTopology { @@ -72,7 +79,77 @@ public Optional find( MemberId memberId ) @Override public String toString() { - return String.format( "{coreMembers=%s, bootstrappable=%s}", coreMembers, canBeBootstrapped() ); + return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", + clusterId, canBeBootstrapped(), coreMembers ); + } + + TopologyDifference difference( CoreTopology other ) + { + Set members = coreMembers.keySet(); + Set otherMembers = other.coreMembers.keySet(); + + Set added = otherMembers.stream().filter( m -> !members.contains(m) ) + .map( memberId -> asDifference( other, memberId ) ).collect( toSet() ); + + Set removed = members.stream().filter( m -> !otherMembers.contains(m) ) + .map( memberId -> asDifference(CoreTopology.this, memberId ) ).collect( toSet() ); + + return new TopologyDifference( added, removed ); + } + + private Difference asDifference( CoreTopology topology, MemberId memberId ) + { + return new Difference( memberId, topology.find( memberId ).orElse( null ) ); + } + + class TopologyDifference + { + private Set added; + private Set removed; + + TopologyDifference( Set added, Set removed ) + { + this.added = added; + this.removed = removed; + } + + Set added() + { + return added; + } + + Set removed() + { + return removed; + } + + boolean hasChanges() + { + return added.size() > 0 || removed.size() > 0; + } + + @Override + public String toString() + { + return String.format( "{added=%s, removed=%s}", added, removed ); + } } + private class Difference + { + private MemberId memberId; + private CoreAddresses coreAddresses; + + Difference( MemberId memberId, CoreAddresses coreAddresses ) + { + this.memberId = memberId; + this.coreAddresses = coreAddresses; + } + + @Override + public String toString() + { + return String.format( "{memberId=%s, coreAddresses=%s}", memberId, coreAddresses ); + } + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 993e7c820a18..a69b375b7575 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -232,8 +232,23 @@ public CoreTopology coreServers() @Override public void refreshCoreTopology() { - latestCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ); - log.info( "Current core topology is %s", coreServers() ); + CoreTopology newCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ); + + if ( coreServers() != null ) + { + CoreTopology.TopologyDifference difference = coreServers().difference( newCoreTopology ); + if ( difference.hasChanges() ) + { + log.info( "Core topology changed %s", difference ); + } + } + else + { + log.info( "Initial Core topology %s", newCoreTopology ); + } + + latestCoreTopology = newCoreTopology; + listenerService.notifyListeners( coreServers() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index e1f8a0d5dfa3..2be2293ddace 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -19,9 +19,12 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.neo4j.causalclustering.identity.ClusterId; +import org.neo4j.helpers.collection.Pair; public class ReadReplicaTopology { @@ -37,9 +40,31 @@ public Set members() return readReplicaMembers; } + public Set difference( ReadReplicaTopology other ) + { + Pair, Set> split = split( readReplicaMembers, other.members() ); + Set big = split.first(); + Set small = split.other(); + + return big.stream().filter( n -> !small.contains( n ) ).collect( Collectors.toSet() ); + } + + private Pair, Set> split( + Set one, Set two ) + { + if ( one.size() > two.size() ) + { + return Pair.pair( one, two ); + } + else + { + return Pair.pair( two, one ); + } + } + @Override public String toString() { - return String.format( "ReadReplicaTopology{readReplicas=%s}", readReplicaMembers ); + return String.format( "{readReplicas=%s}", readReplicaMembers ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java new file mode 100644 index 000000000000..fd304d3029d5 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.hamcrest.Matchers; +import org.junit.Test; + +import org.neo4j.causalclustering.identity.ClusterId; +import org.neo4j.causalclustering.identity.MemberId; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +public class CoreTopologyTest +{ + @Test + public void identicalTopologiesShouldHaveNoDifference() throws Exception + { + // given + UUID one = UUID.randomUUID(); + UUID two = UUID.randomUUID(); + + Map coreMembers = new HashMap<>(); + coreMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); + coreMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + + CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers ); + + // when + CoreTopology.TopologyDifference diff = topology.difference(topology); + + // then + assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); + assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); + } + + @Test + public void shouldDetectAddedMembers() throws Exception + { + // given + UUID one = UUID.randomUUID(); + UUID two = UUID.randomUUID(); + + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); + initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); + newMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + + CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); + + // when + CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); + + // then + assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); + assertThat( diff.removed().size(), Matchers.equalTo( 0 ) ); + } + + @Test + public void shouldDetectRemovedMembers() throws Exception + { + // given + UUID one = UUID.randomUUID(); + UUID two = UUID.randomUUID(); + + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); + initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + + CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); + + // when + CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); + + // then + assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); + assertThat( diff.removed().size(), Matchers.equalTo( 1 ) ); + } + + @Test + public void shouldDetectAddedAndRemovedMembers() throws Exception + { + // given + + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + + CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); + + // when + CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); + + // then + assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); + assertThat( diff.removed().size(), Matchers.equalTo( 2 ) ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index 31fac6beb80d..bf5b47bf75d4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -106,8 +106,7 @@ private ReadReplicaTopology readReplicaTopology() lock.lock(); try { - return new ReadReplicaTopology( unmodifiableSet( readReplicaAddresses ) - ); + return new ReadReplicaTopology( unmodifiableSet( readReplicaAddresses ) ); } finally { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java index f9b0cf4a9e88..4290ad0dd115 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java @@ -31,6 +31,7 @@ import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; @@ -161,7 +162,10 @@ private void awaitForDataToBeApplied( CoreClusterMember leader ) throws Interrup public void shouldReplicateTransactionToCoreMemberAddedAfterInitialStartUp() throws Exception { // given + cluster.getCoreMemberById( 0 ).shutdown(); + cluster.addCoreMemberWithId( 3 ).start(); + cluster.getCoreMemberById( 0 ).start(); cluster.coreTx( ( db, tx ) -> {