diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index da234d0330176..af846dd18e410 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -21,7 +21,6 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceNotActiveException; -import com.hazelcast.core.MultiMap; import java.util.List; import java.util.function.Function; @@ -39,9 +38,9 @@ import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME; -import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags; class HazelcastClient extends LifecycleAdapter implements TopologyService { @@ -140,8 +139,7 @@ private Void addReadReplica( HazelcastInstance hazelcastInstance ) hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) .put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); - MultiMap tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); - tags.forEach( tag -> tagsMap.put( uuid, tag ) ); + refreshTags( hazelcastInstance, uuid, tags ); return null; // return value not used. } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index 9b5eed16ee04b..a65ec87e129ce 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -26,10 +26,13 @@ import com.hazelcast.core.Member; import com.hazelcast.core.MultiMap; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.ClusterId; @@ -56,9 +59,8 @@ class HazelcastClusterTopology // cluster-wide attributes private static final String CLUSTER_UUID = "cluster_uuid"; static final String SERVER_TAGS_MULTIMAP_NAME = "tags"; - static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas"; - // hz client uuid string -> boltAddress string static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers"; + static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas"; // hz client uuid string -> boltAddress string static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids"; static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log ) @@ -196,6 +198,18 @@ static Map toCoreMemberMap( Set members, Log lo return coreMembers; } + static void refreshTags( HazelcastInstance hazelcastInstance, String memberId, List tags ) + { + MultiMap tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); + Collection existing = tagsMap.get( memberId ); + + Set superfluous = existing.stream().filter( t -> !tags.contains( t ) ).collect( Collectors.toSet() ); + Set missing = tags.stream().filter( t -> !existing.contains( t ) ).collect( Collectors.toSet() ); + + missing.forEach( tag -> tagsMap.put( memberId, tag ) ); + superfluous.forEach( tag -> tagsMap.remove( memberId, tag ) ); + } + static MemberAttributeConfig buildMemberAttributesForCore( MemberId myself, Config config ) { MemberAttributeConfig memberAttributeConfig = new MemberAttributeConfig(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index ab362b83f0580..2b5aafe5717a0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -55,6 +55,7 @@ import static com.hazelcast.spi.properties.GroupProperty.WAIT_SECONDS_BEFORE_JOIN; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService @@ -99,7 +100,7 @@ public boolean setClusterId( ClusterId clusterId ) } @Override - public void start() + public void start() throws Throwable { hazelcastInstance = createHazelcastInstance(); log.info( "Cluster discovery service started" ); @@ -108,18 +109,9 @@ public void start() refreshReadReplicaTopology(); listenerService.notifyListeners( coreServers() ); - try - { - scheduler.start(); - } - catch ( Throwable throwable ) - { - log.debug( "Failed to start job scheduler." ); - return; - } - - JobScheduler.Group group = new JobScheduler.Group( "Scheduler", POOLED ); - jobHandle = this.scheduler.scheduleRecurring( group, () -> + scheduler.start(); + JobScheduler.Group group = new JobScheduler.Group( "TopologyRefresh", POOLED ); + jobHandle = scheduler.scheduleRecurring( group, () -> { refreshCoreTopology(); refreshReadReplicaTopology(); @@ -211,9 +203,7 @@ private HazelcastInstance createHazelcastInstance() } List tags = config.get( CausalClusteringSettings.server_tags ); - - MultiMap tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); - tags.forEach( tag -> tagsMap.put( myself.getUuid().toString(), tag ) ); + refreshTags( hazelcastInstance, myself.getUuid().toString(), tags ); return hazelcastInstance; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index e2414a6996f8b..602e1bc83af3f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -44,6 +44,11 @@ public Collection allMemberInfo() return readReplicaMembers.values(); } + public Map replicaMembers() + { + return readReplicaMembers; + } + Optional find( MemberId memberId ) { return Optional.ofNullable( readReplicaMembers.get( memberId ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java index 0ba209eeb8acc..e32c94381a0b4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java @@ -298,10 +298,12 @@ public void shouldRegisterReadReplicaInTopology() throws Throwable when( clientService.getConnectedClients() ).thenReturn( asSet( client ) ); HazelcastMap hazelcastMap = new HazelcastMap(); + HazelcastMultiMap hazelcastMultiMap = new HazelcastMultiMap(); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getMap( anyString() ) ).thenReturn( hazelcastMap ); + when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( hazelcastMultiMap ); when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint ); when( hazelcastInstance.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -346,6 +348,7 @@ public void shouldRemoveReadReplicasOnGracefulShutdown() throws Throwable HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getMap( anyString() ) ).thenReturn( hazelcastMap ); + when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint ); when( hazelcastInstance.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -379,6 +382,7 @@ public void shouldSwallowNPEFromHazelcast() throws Throwable HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint ); when( hazelcastInstance.getMap( anyString() ) ).thenReturn( new HazelcastMap() ); + when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); doThrow( new NullPointerException( "boom!!!" ) ).when( hazelcastInstance ).shutdown(); HazelcastConnector connector = mock( HazelcastConnector.class ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ServerTagsIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ServerTagsIT.java new file mode 100644 index 0000000000000..e79e683c0475b --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ServerTagsIT.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.scenarios; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntFunction; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.CoreGraphDatabase; +import org.neo4j.causalclustering.discovery.Cluster; +import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; +import org.neo4j.graphdb.Result; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.enterprise.api.security.EnterpriseSecurityContext; +import org.neo4j.kernel.impl.coreapi.InternalTransaction; +import org.neo4j.kernel.impl.store.format.standard.Standard; +import org.neo4j.test.rule.TestDirectory; +import org.neo4j.test.rule.fs.DefaultFileSystemRule; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.neo4j.test.assertion.Assert.assertEventually; + +public class ServerTagsIT +{ + @Rule + public TestDirectory testDir = TestDirectory.testDirectory(); + @Rule + public DefaultFileSystemRule fsRule = new DefaultFileSystemRule(); + + private Cluster cluster; + + @After + public void after() throws Exception + { + if ( cluster != null ) + { + cluster.shutdown(); + } + } + + @Test + public void shouldUpdateTagsOnStart() throws Exception + { + AtomicReference suffix = new AtomicReference<>( "before" ); + List> expected; + + Map> instanceCoreParams = new HashMap<>(); + instanceCoreParams.put( CausalClusteringSettings.server_tags.name(), ( id ) -> String.join( ", ", makeCoreTags( suffix.get(), id ) ) ); + + Map> instanceReplicaParams = new HashMap<>(); + instanceReplicaParams.put( CausalClusteringSettings.server_tags.name(), ( id ) -> String.join( ", ", makeReplicaTags( suffix.get(), id ) ) ); + + int nServers = 3; + cluster = new Cluster( testDir.directory( "cluster" ), nServers, nServers, + new HazelcastDiscoveryServiceFactory(), emptyMap(), instanceCoreParams, + emptyMap(), instanceReplicaParams, Standard.LATEST_NAME ); + + // when + cluster.start(); + + // then + expected = new ArrayList<>(); + for ( CoreClusterMember core : cluster.coreMembers() ) + { + expected.add( makeCoreTags( suffix.get(), core.serverId() ) ); + expected.add( makeReplicaTags( suffix.get(), core.serverId() ) ); + } + + for ( CoreClusterMember core : cluster.coreMembers() ) + { + assertEventually( core + " should have tags", () -> getServerTags( core.database() ), + new TagsMatcher( expected ), 30, SECONDS ); + } + + // when + expected.remove( makeCoreTags( suffix.get(), 1 ) ); + expected.remove( makeReplicaTags( suffix.get(), 2 ) ); + cluster.getCoreMemberById( 1 ).shutdown(); + cluster.getReadReplicaById( 2 ).shutdown(); + + suffix.set( "after" ); // should update tags of restarted servers + cluster.addCoreMemberWithId( 1 ).start(); + cluster.addReadReplicaWithId( 2 ).start(); + expected.add( makeCoreTags( suffix.get(), 1 ) ); + expected.add( makeReplicaTags( suffix.get(), 2 ) ); + + // then + for ( CoreClusterMember core : cluster.coreMembers() ) + { + assertEventually( core + " should have tags", () -> getServerTags( core.database() ), + new TagsMatcher( expected ), 30, SECONDS ); + } + } + + class TagsMatcher extends TypeSafeMatcher>> + { + private final List> expected; + + TagsMatcher( List> expected ) + { + this.expected = expected; + } + + @Override + protected boolean matchesSafely( List> actual ) + { + if ( actual.size() != expected.size() ) + { + return false; + } + + for ( List actualTags : actual ) + { + boolean matched = false; + for ( List expectedTags : expected ) + { + if ( actualTags.size() != expectedTags.size() ) + { + continue; + } + + if ( !actualTags.containsAll( expectedTags ) ) + { + continue; + } + + matched = true; + break; + } + + if ( !matched ) + { + return false; + } + } + + return true; + } + + @Override + public void describeTo( Description description ) + { + description.appendText( expected.toString() ); + } + } + + private List makeCoreTags( String suffix, int id ) + { + return asList( format( "core-%d-%s", id, suffix ), "core" ); + } + + private List makeReplicaTags( String suffix, int id ) + { + return asList( format( "replica-%d-%s", id, suffix ), "replica" ); + } + + private List> getServerTags( CoreGraphDatabase db ) + { + List> serverTags = new ArrayList<>(); + try ( InternalTransaction tx = db.beginTransaction( KernelTransaction.Type.explicit, EnterpriseSecurityContext.AUTH_DISABLED ) ) + { + try ( Result result = db.execute( tx, "CALL dbms.cluster.overview", emptyMap() ) ) + { + while ( result.hasNext() ) + { + @SuppressWarnings( "unchecked" ) + List tags = (List) result.next().get( "tags" ); + serverTags.add( tags ); + } + } + } + return serverTags; + } +}