From 353fa2ee114b0342e945aaf3fafe0dc718fd8610 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 14 Feb 2017 13:18:39 +0100 Subject: [PATCH] introduce filters and tags for server policy load balancing This is the first step in building up the server-side policy based load balancing strategy. The idea is that servers are tagged with one or more tags and these are used in a server-side configured scheme of filters to select those which match. Different policies can select different sets of tags and there can be other filters attached apart from tag-based filters. Every policy has a name which the client can use to bind to a particular policy by supplying that name in the client supplied context, as supported by the V2 version of GetServers. --- .../core/CausalClusteringSettings.java | 13 ++ .../discovery/ClusterTopology.java | 17 +- ...CoreAddresses.java => CoreServerInfo.java} | 36 ++- .../discovery/CoreTopology.java | 16 +- .../discovery/HazelcastClient.java | 15 +- .../discovery/HazelcastClusterTopology.java | 76 +++---- .../HazelcastCoreTopologyService.java | 7 + ...icaAddresses.java => ReadReplicaInfo.java} | 34 ++- .../discovery/ReadReplicaTopology.java | 10 +- .../discovery/TaggedServer.java | 27 +++ .../procedures/ClusterOverviewProcedure.java | 10 +- .../load_balancing/filters/Filter.java | 35 +++ .../load_balancing/filters/FilterChain.java | 46 ++++ .../filters/FirstValidRule.java | 52 +++++ .../filters/MinimumCountFilter.java | 40 ++++ .../procedure/GetServersProcedureV1.java | 12 +- .../strategy/AllServersStrategy.java | 4 +- .../strategy/server_policy/AnyTagFilter.java | 52 +++++ .../strategy/server_policy/Policies.java | 56 +++++ .../strategy/server_policy/PolicyLoader.java | 30 +++ .../strategy/server_policy/ServerInfo.java | 77 +++++++ .../server_policy/ServerPolicyStrategy.java | 126 +++++++++++ .../messaging/RaftOutbound.java | 8 +- .../discovery/CoreTopologyTest.java | 42 ++-- .../discovery/HazelcastClientTest.java | 214 ++++++++++++++++++ .../HazelcastClusterTopologyTest.java | 72 +++--- .../discovery/SharedDiscoveryCoreClient.java | 10 +- .../SharedDiscoveryReadReplicaClient.java | 4 +- .../discovery/SharedDiscoveryService.java | 16 +- .../discovery/TestTopology.java | 17 +- .../ClusterOverviewProcedureTest.java | 10 +- .../filters/FilterChainTest.java | 51 +++++ .../filters/FirstValidRuleTest.java | 57 +++++ .../filters/MinimumCountFilterTest.java | 76 +++++++ .../GetServersProcedureV1RoutingTest.java | 5 +- .../procedure/GetServersProcedureV1Test.java | 28 +-- .../server_policy/AnyTagFilterTest.java | 68 ++++++ .../strategy/server_policy/PoliciesTest.java | 72 ++++++ .../ConnectToRandomCoreServerTest.java | 14 +- ...picallyConnectToRandomReadReplicaTest.java | 6 +- .../UpstreamDatabaseStrategySelectorTest.java | 12 +- 41 files changed, 1336 insertions(+), 237 deletions(-) rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/{CoreAddresses.java => CoreServerInfo.java} (61%) rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/{ReadReplicaAddresses.java => ReadReplicaInfo.java} (61%) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TaggedServer.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/Filter.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FilterChain.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRule.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilter.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilter.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/Policies.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PolicyLoader.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerInfo.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerPolicyStrategy.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FilterChainTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRuleTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilterTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilterTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PoliciesTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 0e09e135b5153..3e477b8e6aedd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -313,4 +313,17 @@ public class CausalClusteringSettings implements LoadableConfig "upstream database server from which to pull transactional updates." ) public static final Setting> upstream_selection_strategy = setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" ); + + @Description( "The load balancing plugin to use. This must be the same for all core servers participating in the cluster." ) + public static Setting load_balancing_plugin = + setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" ); + + @Description( "The configuration must be valid for the configured plugin." ) + public static Setting load_balancing_config = + setting( "causal_clustering.load_balancing.config", STRING, "" ); + + @Description( "Tags for the server used when configuring load balancing and replication policies." + + " Multiple tags can be configured by separating with a comma." ) + public static Setting> server_tags = + setting( "causal_clustering.server_tags", list( ",", STRING ), "" ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java index 2f9bbc9e84a61..60c2dd1d3fe43 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java @@ -36,20 +36,9 @@ public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readRepli public Optional find( MemberId upstream ) { - Optional coreAddresses = coreTopology.find( upstream ); - Optional readReplicaAddresses = readReplicaTopology.findAddressFor( upstream ); + Optional coreCatchupAddress = coreTopology.find( upstream ).map( a -> (CatchupServerAddress) a ); + Optional readCatchupAddress = readReplicaTopology.find( upstream ).map( a -> (CatchupServerAddress) a ); - if ( coreAddresses.isPresent() ) - { - return Optional.of( coreAddresses.get() ); - } - else if ( readReplicaAddresses.isPresent() ) - { - return Optional.of( readReplicaAddresses.get() ); - } - else - { - return Optional.empty(); - } + return coreCatchupAddress.map( Optional::of ).orElse( readCatchupAddress ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java similarity index 61% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java index 60e785dcdba61..044c5af627c95 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreServerInfo.java @@ -19,20 +19,32 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.Set; + import org.neo4j.helpers.AdvertisedSocketAddress; -public class CoreAddresses implements CatchupServerAddress, ClientConnector +import static java.util.Collections.emptySet; + +public class CoreServerInfo implements CatchupServerAddress, ClientConnector, TaggedServer { private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress catchupServer; private final ClientConnectorAddresses clientConnectorAddresses; + private Set tags; + + public CoreServerInfo( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer, + ClientConnectorAddresses clientConnectors ) + { + this( raftServer, catchupServer, clientConnectors, emptySet() ); + } - public CoreAddresses( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer, - ClientConnectorAddresses clientConnectorAddresses ) + public CoreServerInfo( AdvertisedSocketAddress raftServer, AdvertisedSocketAddress catchupServer, + ClientConnectorAddresses clientConnectorAddresses, Set tags ) { this.raftServer = raftServer; this.catchupServer = catchupServer; this.clientConnectorAddresses = clientConnectorAddresses; + this.tags = tags; } public AdvertisedSocketAddress getRaftServer() @@ -46,18 +58,26 @@ public AdvertisedSocketAddress getCatchupServer() return catchupServer; } + @Override public ClientConnectorAddresses connectors() { return clientConnectorAddresses; } + @Override + public Set tags() + { + return tags; + } + @Override public String toString() { - return "CoreAddresses{" + - "raftServer=" + raftServer + - ", catchupServer=" + catchupServer + - ", clientConnectorAddresses=" + clientConnectorAddresses + - '}'; + return "CoreServerInfo{" + + "raftServer=" + raftServer + + ", catchupServer=" + catchupServer + + ", clientConnectorAddresses=" + clientConnectorAddresses + + ", tags=" + tags + + '}'; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index 610f941efa6b6..c3cd134202f2c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -38,9 +38,9 @@ public class CoreTopology private final ClusterId clusterId; private final boolean canBeBootstrapped; - private final Map coreMembers; + private final Map coreMembers; - public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map coreMembers ) + public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map coreMembers ) { this.clusterId = clusterId; this.canBeBootstrapped = canBeBootstrapped; @@ -57,7 +57,7 @@ public ClusterId clusterId() return clusterId; } - public Collection addresses() + public Collection allMemberInfo() { return coreMembers.values(); } @@ -67,7 +67,7 @@ public boolean canBeBootstrapped() return canBeBootstrapped; } - public Optional find( MemberId memberId ) + public Optional find( MemberId memberId ) { return Optional.ofNullable( coreMembers.get( memberId ) ); } @@ -139,18 +139,18 @@ public String toString() private class Difference { private MemberId memberId; - private CoreAddresses coreAddresses; + private CoreServerInfo coreServerInfo; - Difference( MemberId memberId, CoreAddresses coreAddresses ) + Difference( MemberId memberId, CoreServerInfo coreServerInfo ) { this.memberId = memberId; - this.coreAddresses = coreAddresses; + this.coreServerInfo = coreServerInfo; } @Override public String toString() { - return String.format( "{memberId=%s, coreAddresses=%s}", memberId, coreAddresses ); + return String.format( "{memberId=%s, coreServerInfo=%s}", memberId, coreServerInfo ); } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index 3909233938c8d..a674e3ef9b370 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -21,7 +21,9 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceNotActiveException; +import com.hazelcast.core.MultiMap; +import java.util.List; import java.util.function.Function; import org.neo4j.causalclustering.core.CausalClusteringSettings; @@ -37,6 +39,9 @@ import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologyService { @@ -46,6 +51,7 @@ class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologySer private final HazelcastConnector connector; private final RenewableTimeoutService renewableTimeoutService; private final AdvertisedSocketAddress transactionSource; + private final List tags; private HazelcastInstance hazelcastInstance; private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer; private final long readReplicaTimeToLiveTimeout; @@ -62,6 +68,7 @@ class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologySer this.log = logProvider.getLog( getClass() ); this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address ); + this.tags = config.get( CausalClusteringSettings.server_tags ); this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout; this.myself = myself; } @@ -71,7 +78,7 @@ public CoreTopology coreServers() { try { - return retry( ( hazelcastInstance ) -> HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ) ); + return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, log ) ); } catch ( Exception e ) { @@ -87,8 +94,7 @@ public ReadReplicaTopology readReplicas() { try { - return retry( ( hazelcastInstance ) -> HazelcastClusterTopology - .getReadReplicaTopology( hazelcastInstance, log ) ); + return retry( ( hazelcastInstance ) -> getReadReplicaTopology( hazelcastInstance, log ) ); } catch ( Exception e ) { @@ -132,6 +138,9 @@ private Void addReadReplica( HazelcastInstance hazelcastInstance ) hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) .put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); + MultiMap tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); + tags.forEach( tag -> tagsMap.put( uuid, tag ) ); + return null; // return value not used. } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index 369d7c87ecf88..ad7a7ad724599 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -24,6 +24,7 @@ import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IMap; import com.hazelcast.core.Member; +import com.hazelcast.core.MultiMap; import java.util.HashMap; import java.util.Iterator; @@ -35,30 +36,32 @@ import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; -import org.neo4j.helpers.collection.Pair; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; import static java.util.Collections.emptyMap; -import static java.util.stream.Collectors.toSet; import static org.neo4j.helpers.SocketAddressFormat.socketAddress; +import static org.neo4j.helpers.collection.Iterables.asSet; class HazelcastClusterTopology { - // hz client uuid string -> boltAddress string - static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read-replicas"; - static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers"; - static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids"; - private static final String CLUSTER_UUID = "cluster_uuid"; + // per server attributes + private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used static final String MEMBER_UUID = "member_uuid"; static final String TRANSACTION_SERVER = "transaction_server"; - private static final String DISCOVERY_SERVER = "discovery_server"; static final String RAFT_SERVER = "raft_server"; static final String CLIENT_CONNECTOR_ADDRESSES = "client_connector_addresses"; + // cluster-wide attributes + private static final String CLUSTER_UUID = "cluster_uuid"; + static final String SERVER_TAGS_MULTIMAP_NAME = "tags"; + static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas"; // hz client uuid string -> boltAddress string + static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers"; + static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids"; + static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log ) { - Map readReplicas = emptyMap(); + Map readReplicas = emptyMap(); if ( hazelcastInstance != null ) { @@ -74,7 +77,7 @@ static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastIn static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log log ) { - Map coreMembers = emptyMap(); + Map coreMembers = emptyMap(); boolean canBeBootstrapped = false; ClusterId clusterId = null; @@ -83,7 +86,7 @@ static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log lo Set hzMembers = hazelcastInstance.getCluster().getMembers(); canBeBootstrapped = canBeBootstrapped( hzMembers ); - coreMembers = toCoreMemberMap( hzMembers, log ); + coreMembers = toCoreMemberMap( hzMembers, log, hazelcastInstance ); clusterId = getClusterId( hazelcastInstance ); } @@ -108,16 +111,16 @@ static boolean casClusterId( HazelcastInstance hazelcastInstance, ClusterId clus return uuidReference.compareAndSet( null, clusterId.uuid() ) || uuidReference.get().equals( clusterId.uuid() ); } - private static Map readReplicas( HazelcastInstance hazelcastInstance ) + private static Map readReplicas( HazelcastInstance hazelcastInstance ) { IMap clientAddressMap = hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ); IMap txServerMap = hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ); - IMap memberIdMap = hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ); + MultiMap serverTags = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); - Map result = new HashMap<>( ); + Map result = new HashMap<>(); for ( String hzUUID : clientAddressMap.keySet() ) { @@ -125,7 +128,7 @@ private static Map readReplicas( HazelcastInstanc AdvertisedSocketAddress catchupAddress = socketAddress( txServerMap.get( hzUUID ), AdvertisedSocketAddress::new ); result.put( new MemberId( UUID.fromString( memberIdMap.get( hzUUID ) ) ), - new ReadReplicaAddresses( clientConnectorAddresses, catchupAddress ) ) ; + new ReadReplicaInfo( clientConnectorAddresses, catchupAddress, asSet( serverTags.get( hzUUID ) ) ) ); } return result; } @@ -136,36 +139,24 @@ private static boolean canBeBootstrapped( Set coreMembers ) return iterator.hasNext() && iterator.next().localMember(); } - static Map toCoreMemberMap( Set members, Log log ) + static Map toCoreMemberMap( Set members, Log log, HazelcastInstance hazelcastInstance ) { - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); + MultiMap serverTagsMMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); for ( Member member : members ) { try { - Pair pair = extractMemberAttributesForCore( member ); - coreMembers.put( pair.first(), pair.other() ); - } - catch ( IllegalArgumentException e ) - { - log.warn( "Incomplete member attributes supplied from Hazelcast", e ); - } - } + MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) ); - return coreMembers; - } - - static Map toReadReplicaMemberMap( Set members, Log log ) - { - Map coreMembers = new HashMap<>(); + CoreServerInfo coreServerInfo = new CoreServerInfo( + socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ), + socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ), + ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ), + asSet( serverTagsMMap.get( memberId.getUuid().toString() ) ) ); - for ( Member member : members ) - { - try - { - Pair pair = extractMemberAttributesForCore( member ); - coreMembers.put( pair.first(), pair.other() ); + coreMembers.put( memberId, coreServerInfo ); } catch ( IllegalArgumentException e ) { @@ -193,16 +184,7 @@ static MemberAttributeConfig buildMemberAttributesForCore( MemberId myself, Conf ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); memberAttributeConfig.setStringAttribute( CLIENT_CONNECTOR_ADDRESSES, clientConnectorAddresses.toString() ); - return memberAttributeConfig; - } - static Pair extractMemberAttributesForCore( Member member ) - { - MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) ); - - return Pair.of( memberId, new CoreAddresses( - socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ), - socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ), - ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) ) ); + return memberAttributeConfig; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 092f0633523c7..68a1f77cc7be6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -30,6 +30,7 @@ import com.hazelcast.core.MemberAttributeEvent; import com.hazelcast.core.MembershipEvent; import com.hazelcast.core.MembershipListener; +import com.hazelcast.core.MultiMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -53,6 +54,7 @@ import static com.hazelcast.spi.properties.GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS; import static com.hazelcast.spi.properties.GroupProperty.WAIT_SECONDS_BEFORE_JOIN; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService @@ -209,6 +211,11 @@ private HazelcastInstance createHazelcastInstance() throw new RuntimeException( e ); } + List tags = config.get( CausalClusteringSettings.server_tags ); + + MultiMap tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME ); + tags.forEach( tag -> tagsMap.put( myself.getUuid().toString(), tag ) ); + return hazelcastInstance; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java similarity index 61% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java index b9700ff7e3af2..f777f09513d0f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaInfo.java @@ -19,33 +19,55 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.Set; + import org.neo4j.helpers.AdvertisedSocketAddress; -public class ReadReplicaAddresses implements CatchupServerAddress, ClientConnector +import static java.util.Collections.emptySet; + +public class ReadReplicaInfo implements CatchupServerAddress, ClientConnector, TaggedServer { private final AdvertisedSocketAddress catchupServerAddress; private final ClientConnectorAddresses clientConnectorAddresses; + private final Set tags; + + public ReadReplicaInfo( ClientConnectorAddresses clientConnectorAddresses, AdvertisedSocketAddress catchupServerAddress ) + { + this( clientConnectorAddresses, catchupServerAddress, emptySet() ); + } - public ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses, AdvertisedSocketAddress catchupServerAddress ) + public ReadReplicaInfo( ClientConnectorAddresses clientConnectorAddresses, AdvertisedSocketAddress catchupServerAddress, Set tags ) { this.clientConnectorAddresses = clientConnectorAddresses; this.catchupServerAddress = catchupServerAddress; + this.tags = tags; } + @Override public ClientConnectorAddresses connectors() { return clientConnectorAddresses; } @Override - public String toString() + public AdvertisedSocketAddress getCatchupServer() { - return String.format( "ReadReplicaAddresses{clientConnectorAddresses=%s}", clientConnectorAddresses ); + return catchupServerAddress; } @Override - public AdvertisedSocketAddress getCatchupServer() + public Set tags() { - return catchupServerAddress; + return tags; + } + + @Override + public String toString() + { + return "ReadReplicaInfo{" + + "catchupServerAddress=" + catchupServerAddress + + ", clientConnectorAddresses=" + clientConnectorAddresses + + ", tags=" + tags + + '}'; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index 0d9c8e7e6720e..e2414a6996f8b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -22,10 +22,8 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.helpers.collection.Pair; import static java.util.Collections.emptyMap; @@ -34,19 +32,19 @@ public class ReadReplicaTopology { static final ReadReplicaTopology EMPTY = new ReadReplicaTopology( emptyMap() ); - private final Map readReplicaMembers; + private final Map readReplicaMembers; - public ReadReplicaTopology( Map readReplicaMembers ) + public ReadReplicaTopology( Map readReplicaMembers ) { this.readReplicaMembers = readReplicaMembers; } - public Collection addresses() + public Collection allMemberInfo() { return readReplicaMembers.values(); } - Optional findAddressFor( MemberId memberId ) + Optional find( MemberId memberId ) { return Optional.ofNullable( readReplicaMembers.get( memberId ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TaggedServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TaggedServer.java new file mode 100644 index 0000000000000..a127d4ceaa1bd --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TaggedServer.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Set; + +interface TaggedServer +{ + Set tags(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java index c3f74fabf0ea4..bda88295cdee2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java @@ -30,10 +30,10 @@ import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.collection.RawIterator; import org.neo4j.kernel.api.exceptions.ProcedureException; @@ -88,7 +88,7 @@ public RawIterator apply( Context ctx, Object[] inp for ( MemberId memberId : coreMembers ) { Optional clientConnectorAddresses = - coreTopology.find( memberId ).map( CoreAddresses::connectors ); + coreTopology.find( memberId ).map( CoreServerInfo::connectors ); if ( clientConnectorAddresses.isPresent() ) { Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER; @@ -99,9 +99,9 @@ public RawIterator apply( Context ctx, Object[] inp log.debug( "No Address found for " + memberId ); } } - for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().addresses() ) + for ( ReadReplicaInfo readReplicaInfo : discoveryService.readReplicas().allMemberInfo() ) { - endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) ); + endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA ) ); } Collections.sort( endpoints, ( o1, o2 ) -> o1.addresses().toString().compareTo( o2.addresses().toString() ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/Filter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/Filter.java new file mode 100644 index 0000000000000..7d43e23648b3a --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/Filter.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import java.util.Set; + +/** + * A filter for sets. + * + * A convention used for filters is to return an empty set if the result is to + * be interpreted as invalid. This is used for example in rule-lists where the first + * rule to return a valid non-empty result will be used. + */ +@FunctionalInterface +public interface Filter +{ + Set apply( Set data ); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FilterChain.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FilterChain.java new file mode 100644 index 0000000000000..9030dd3bbab61 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FilterChain.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import java.util.List; +import java.util.Set; + +/** + * Filters the set through each filter of the chain in order. + */ +public class FilterChain implements Filter +{ + private List> chain; + + public FilterChain( List> chain ) + { + this.chain = chain; + } + + @Override + public Set apply( Set data ) + { + for ( Filter filter : chain ) + { + data = filter.apply( data ); + } + return data; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRule.java new file mode 100644 index 0000000000000..7845542fe8e50 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRule.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import java.util.List; +import java.util.Set; + +/** + * Each chain of filters is considered a rule and they are evaluated in order. The result + * of the first rule to return a valid result (non-empty set) will be the final result. + */ +public class FirstValidRule implements Filter +{ + private List> chains; + + public FirstValidRule( List> chains ) + { + this.chains = chains; + } + + @Override + public Set apply( Set input ) + { + Set output = input; + for ( Filter chain : chains ) + { + output = chain.apply( input ); + if ( !output.isEmpty() ) + { + break; + } + } + return output; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilter.java new file mode 100644 index 0000000000000..026b7c9c34527 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilter.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import java.util.Set; + +import static java.util.Collections.emptySet; + +public class MinimumCountFilter implements Filter +{ + private final int minCount; + + public MinimumCountFilter( int minCount ) + { + this.minCount = minCount; + } + + @Override + public Set apply( Set data ) + { + return data.size() >= minCount ? data : emptySet(); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java index 6e2a4166b69d4..6cdfd87445ed4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java @@ -28,7 +28,7 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.load_balancing.Endpoint; @@ -117,7 +117,7 @@ private Optional leaderBoltAddress() private List routeEndpoints() { Stream routers = discoveryService.coreServers() - .addresses().stream().map( extractBoltAddress() ); + .allMemberInfo().stream().map( extractBoltAddress() ); List routeEndpoints = routers.map( Endpoint::route ).collect( toList() ); Collections.shuffle( routeEndpoints ); return routeEndpoints; @@ -130,7 +130,7 @@ private List writeEndpoints() private List readEndpoints() { - List readReplicas = discoveryService.readReplicas().addresses().stream() + List readReplicas = discoveryService.readReplicas().allMemberInfo().stream() .map( extractBoltAddress() ).collect( toList() ); boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers ); Stream readCore = addFollowers ? coreReadEndPoints() : Stream.empty(); @@ -143,12 +143,12 @@ private List readEndpoints() private Stream coreReadEndPoints() { Optional leader = leaderBoltAddress(); - Collection coreAddresses = discoveryService.coreServers().addresses(); + Collection coreServerInfo = discoveryService.coreServers().allMemberInfo(); Stream boltAddresses = discoveryService.coreServers() - .addresses().stream().map( extractBoltAddress() ); + .allMemberInfo().stream().map( extractBoltAddress() ); // if the leader is present and it is not alone filter it out from the read end points - if ( leader.isPresent() && coreAddresses.size() > 1 ) + if ( leader.isPresent() && coreServerInfo.size() > 1 ) { AdvertisedSocketAddress advertisedSocketAddress = leader.get(); return boltAddresses.filter( address -> !advertisedSocketAddress.equals( address ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java index f88f5b3d0acaa..747d03ead0394 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java @@ -69,7 +69,7 @@ public Result run( Map context ) private List routeEndpoints( CoreTopology cores ) { - return cores.addresses().stream().map( extractBoltAddress() ) + return cores.allMemberInfo().stream().map( extractBoltAddress() ) .map( Endpoint::route ).collect( Collectors.toList() ); } @@ -94,7 +94,7 @@ private List writeEndpoints( CoreTopology cores ) private List readEndpoints( CoreTopology cores, ReadReplicaTopology readers ) { - return concat( readers.addresses().stream(), cores.addresses().stream() ) + return concat( readers.allMemberInfo().stream(), cores.allMemberInfo().stream() ) .map( extractBoltAddress() ) .map( Endpoint::read ) .collect( Collectors.toList() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilter.java new file mode 100644 index 0000000000000..a4ab34027ba7c --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilter.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.load_balancing.filters.Filter; + +public class AnyTagFilter implements Filter +{ + private final Predicate matchesAnyTag; + + AnyTagFilter( Set tags ) + { + this.matchesAnyTag = serverInfo -> + { + for ( String tag : serverInfo.tags() ) + { + if ( tags.contains( tag ) ) + { + return true; + } + } + return false; + }; + } + + @Override + public Set apply( Set data ) + { + return data.stream().filter( matchesAnyTag ).collect( Collectors.toSet() ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/Policies.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/Policies.java new file mode 100644 index 0000000000000..5bb7100f9ff67 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/Policies.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import java.util.HashMap; +import java.util.Map; + +import org.neo4j.causalclustering.load_balancing.filters.Filter; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +class Policies +{ + static final String POLICY_KEY = "load_balancing.policy"; // TODO: move somewhere (driver support package?) + private static final Filter DEFAULT_POLICY = input -> input; + + private final Map> policies = new HashMap<>(); + private final Log log; + + Policies( LogProvider logProvider ) + { + this.log = logProvider.getLog( getClass() ); + policies.put( null, DEFAULT_POLICY ); + } + + void addPolicy( String policyName, Filter filter ) + { + Filter oldPolicy = policies.putIfAbsent( policyName, filter ); + if ( oldPolicy != null ) + { + log.error( "Policy name conflict for: " + policyName ); + } + } + + Filter selectFor( Map context ) + { + return policies.get( context.get( POLICY_KEY ) ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PolicyLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PolicyLoader.java new file mode 100644 index 0000000000000..87dd2cf93deb8 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PolicyLoader.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import org.neo4j.kernel.configuration.Config; + +public class PolicyLoader +{ + PolicyLoader( Config config, Policies policies ) + { + // TODO: Load policies from config into policies. + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerInfo.java new file mode 100644 index 0000000000000..05740cdc3380c --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerInfo.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import java.util.Objects; +import java.util.Set; + +import org.neo4j.helpers.AdvertisedSocketAddress; + +/** + * Hold the server information that is interesting for load balancing purposes. + */ +class ServerInfo +{ + private final AdvertisedSocketAddress boltAddress; + private Set tags; + + ServerInfo( AdvertisedSocketAddress boltAddress, Set tags ) + { + this.boltAddress = boltAddress; + this.tags = tags; + } + + AdvertisedSocketAddress boltAddress() + { + return boltAddress; + } + + Set tags() + { + return tags; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { return true; } + if ( o == null || getClass() != o.getClass() ) + { return false; } + ServerInfo that = (ServerInfo) o; + return Objects.equals( boltAddress, that.boltAddress ) && + Objects.equals( tags, that.tags ); + } + + @Override + public int hashCode() + { + return Objects.hash( boltAddress, tags ); + } + + @Override + public String toString() + { + return "ServerInfo{" + + "boltAddress=" + boltAddress + + ", tags=" + tags + + '}'; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerPolicyStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerPolicyStrategy.java new file mode 100644 index 0000000000000..0e14786749577 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/ServerPolicyStrategy.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.CoreTopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopology; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.load_balancing.Endpoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.causalclustering.load_balancing.filters.Filter; +import org.neo4j.kernel.configuration.Config; + +import static java.util.Collections.emptyList; +import static org.neo4j.causalclustering.load_balancing.Util.asList; +import static org.neo4j.causalclustering.load_balancing.Util.extractBoltAddress; + +// TODO: This is work in progress. Currently mostly copies V1 behaviour. +public class ServerPolicyStrategy implements LoadBalancingStrategy +{ + private final CoreTopologyService topologyService; + private final LeaderLocator leaderLocator; + private final Long timeToLive; + private final boolean allowReadsOnFollowers; + private final Policies policies; + + public ServerPolicyStrategy( CoreTopologyService topologyService, + LeaderLocator leaderLocator, Policies policies, Config config ) + { + this.topologyService = topologyService; + this.leaderLocator = leaderLocator; + this.timeToLive = config.get( CausalClusteringSettings.cluster_routing_ttl ); + this.allowReadsOnFollowers = config.get( CausalClusteringSettings.cluster_allow_reads_on_followers ); + this.policies = policies; + } + + @Override + public Result run( Map context ) + { + CoreTopology coreTopology = topologyService.coreServers(); + ReadReplicaTopology rrTopology = topologyService.readReplicas(); + + return new LoadBalancingResult( routeEndpoints( coreTopology ), writeEndpoints( coreTopology ), + readEndpoints( coreTopology, rrTopology, policies.selectFor( context ) ), timeToLive ); + } + + private List routeEndpoints( CoreTopology cores ) + { + return cores.allMemberInfo().stream().map( extractBoltAddress() ) + .map( Endpoint::route ).collect( Collectors.toList() ); + } + + private List writeEndpoints( CoreTopology cores ) + { + MemberId leader; + try + { + leader = leaderLocator.getLeader(); + } + catch ( NoLeaderFoundException e ) + { + return emptyList(); + } + + Optional endPoint = cores.find( leader ) + .map( extractBoltAddress() ) + .map( Endpoint::write ); + + return asList( endPoint ); + } + + private List readEndpoints( CoreTopology coreTopology, ReadReplicaTopology rrTopology, Filter policyFilter ) + { + Set possibleReaders = rrTopology.allMemberInfo().stream() + .map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) ) + .collect( Collectors.toSet() ); + + if ( allowReadsOnFollowers || possibleReaders.size() == 0 ) + { + Set validCores = coreTopology.members(); + try + { + MemberId leader = leaderLocator.getLeader(); + validCores = validCores.stream().filter( memberId -> !memberId.equals( leader ) ).collect( Collectors.toSet() ); + } + catch ( NoLeaderFoundException ignored ) + { + // we might end up using the leader for reading during this ttl, should be fine in general + } + + possibleReaders.addAll( validCores.stream().map( coreTopology::find ).map( Optional::get ) + .map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) ) + .collect( Collectors.toSet() ) ); + } + + Set readers = policyFilter.apply( possibleReaders ); + return readers.stream().map( r -> Endpoint.read( r.boltAddress() ) ).collect( Collectors.toList() ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftOutbound.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftOutbound.java index f0c8de635cf7f..c4417b1cdbe81 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftOutbound.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftOutbound.java @@ -24,7 +24,7 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages.ClusterIdAwareMessage; import org.neo4j.causalclustering.core.consensus.RaftMessages.RaftMessage; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -62,10 +62,10 @@ public void send( MemberId to, RaftMessage message ) return; } - Optional coreAddresses = discoveryService.coreServers().find( to ); - if ( coreAddresses.isPresent() ) + Optional coreServerInfo = discoveryService.coreServers().find( to ); + if ( coreServerInfo.isPresent() ) { - outbound.send( coreAddresses.get().getRaftServer(), new ClusterIdAwareMessage( clusterId.get(), message ) ); + outbound.send( coreServerInfo.get().getRaftServer(), new ClusterIdAwareMessage( clusterId.get(), message ) ); } else { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java index fd304d3029d52..2bf8585b3960c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/CoreTopologyTest.java @@ -41,9 +41,9 @@ public void identicalTopologiesShouldHaveNoDifference() throws Exception UUID one = UUID.randomUUID(); UUID two = UUID.randomUUID(); - Map coreMembers = new HashMap<>(); - coreMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); - coreMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + Map coreMembers = new HashMap<>(); + coreMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); + coreMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers ); @@ -62,14 +62,14 @@ public void shouldDetectAddedMembers() throws Exception UUID one = UUID.randomUUID(); UUID two = UUID.randomUUID(); - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); - initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); + initialMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); - newMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); + newMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); @@ -88,12 +88,12 @@ public void shouldDetectRemovedMembers() throws Exception UUID one = UUID.randomUUID(); UUID two = UUID.randomUUID(); - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) ); - initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( one ), mock(CoreServerInfo.class) ); + initialMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( two ), mock(CoreAddresses.class) ); + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( two ), mock(CoreServerInfo.class) ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); @@ -110,13 +110,13 @@ public void shouldDetectAddedAndRemovedMembers() throws Exception { // given - Map initialMembers = new HashMap<>(); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); - initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + Map initialMembers = new HashMap<>(); + initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); + initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); - Map newMembers = new HashMap<>(); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); - newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) ); + Map newMembers = new HashMap<>(); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); + newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreServerInfo.class) ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java index 758ac08e6f95c..2f6f481ee70ac 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java @@ -37,6 +37,7 @@ import com.hazelcast.core.Member; import com.hazelcast.core.MemberSelector; import com.hazelcast.core.MultiExecutionCallback; +import com.hazelcast.core.MultiMap; import com.hazelcast.map.EntryProcessor; import com.hazelcast.map.MapInterceptor; import com.hazelcast.map.listener.MapListener; @@ -46,6 +47,7 @@ import com.hazelcast.mapreduce.aggregation.Supplier; import com.hazelcast.monitor.LocalExecutorStats; import com.hazelcast.monitor.LocalMapStats; +import com.hazelcast.monitor.LocalMultiMapStats; import com.hazelcast.query.Predicate; import org.junit.Test; @@ -128,6 +130,7 @@ public void shouldReturnTopologyUsingHazelcastMembers() throws Exception when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); + when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -156,6 +159,7 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); + when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); when( hazelcastInstance.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); @@ -250,8 +254,10 @@ public void shouldReconnectIfHazelcastUnavailable() throws Exception when( hazelcastInstance1.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance1.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); + when( hazelcastInstance1.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); when( hazelcastInstance2.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance2.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); + when( hazelcastInstance2.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); when( hazelcastInstance1.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); when( hazelcastInstance2.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); @@ -971,6 +977,214 @@ public void destroy() } } + private class HazelcastMultiMap implements MultiMap + { + private Map delegate = new HashMap<>(); + + @Override + public String getPartitionKey() + { + throw new UnsupportedOperationException(); + } + + @Override + public String getName() + { + throw new UnsupportedOperationException(); + } + + @Override + public String getServiceName() + { + throw new UnsupportedOperationException(); + } + + @Override + public void destroy() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean put( Object key, Object value ) + { + if ( delegate.get( key ) != null ) + { + throw new UnsupportedOperationException( "This is not a true multimap" ); + } + delegate.put( key, value ); + return true; + } + + @Override + public Collection get( Object key ) + { + return asSet( delegate.get( key ) ); + } + + @Override + public boolean remove( Object key, Object value ) + { + return delegate.remove( key, value ); + } + + @Override + public Collection remove( Object key ) + { + return asSet( delegate.remove( key ) ); + } + + @Override + public Set localKeySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { + return delegate.values(); + } + + @Override + public Set> entrySet() + { + return delegate.entrySet(); + } + + @Override + public boolean containsKey( Object key ) + { + return delegate.containsKey( key ); + } + + @Override + public boolean containsValue( Object value ) + { + return delegate.containsValue( value ); + } + + @Override + public boolean containsEntry( Object key, Object value ) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return delegate.size(); + } + + @Override + public void clear() + { + delegate.clear(); + } + + @Override + public int valueCount( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public String addLocalEntryListener( EntryListener listener ) + { + throw new UnsupportedOperationException(); + } + + @Override + public String addEntryListener( EntryListener listener, boolean includeValue ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeEntryListener( String registrationId ) + { + throw new UnsupportedOperationException(); + } + + @Override + public String addEntryListener( EntryListener listener, Object key, boolean includeValue ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void lock( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void lock( Object key, long leaseTime, TimeUnit timeUnit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLocked( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock( Object key, long time, TimeUnit timeunit ) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock( Object key, long time, TimeUnit timeunit, long leaseTime, TimeUnit leaseTimeunit ) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public void unlock( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void forceUnlock( Object key ) + { + throw new UnsupportedOperationException(); + } + + @Override + public LocalMultiMapStats getLocalMultiMapStats() + { + throw new UnsupportedOperationException(); + } + + @Override + public Result aggregate( Supplier supplier, Aggregation aggregation ) + { + throw new UnsupportedOperationException(); + } + + @Override + public Result aggregate( Supplier supplier, Aggregation aggregation, JobTracker jobTracker ) + { + throw new UnsupportedOperationException(); + } + } + private class HazelcastSet implements ISet { private Set delegate; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java index e5f5af78a6a6e..58e2e090f28d5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java @@ -19,6 +19,14 @@ */ package org.neo4j.causalclustering.discovery; +import com.hazelcast.client.impl.MemberImpl; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.Member; +import com.hazelcast.core.MultiMap; +import com.hazelcast.nio.Address; +import org.junit.Before; +import org.junit.Test; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -27,15 +35,9 @@ import java.util.Set; import java.util.UUID; -import com.hazelcast.client.impl.MemberImpl; -import com.hazelcast.core.Member; -import com.hazelcast.nio.Address; -import org.junit.Test; - import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; -import org.neo4j.helpers.collection.Pair; import org.neo4j.kernel.configuration.BoltConnector; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.NullLog; @@ -44,40 +46,27 @@ import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; - +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributesForCore; -import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractMemberAttributesForCore; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.toCoreMemberMap; +import static org.neo4j.helpers.collection.Iterators.asSet; public class HazelcastClusterTopologyTest { - @Test - public void shouldStoreMemberIdentityAndAddressesAsMemberAttributes() throws Exception - { - // given - MemberId memberId = new MemberId( UUID.randomUUID() ); - Config config = Config.defaults(); - HashMap settings = new HashMap<>(); - settings.put( CausalClusteringSettings.transaction_advertised_address.name(), "tx:1001" ); - settings.put( CausalClusteringSettings.raft_advertised_address.name(), "raft:2001" ); - settings.put( new BoltConnector( "bolt" ).type.name(), "BOLT" ); - settings.put( new BoltConnector( "bolt" ).enabled.name(), "true" ); - settings.put( new BoltConnector( "bolt" ).advertised_address.name(), "bolt:3001" ); - settings.put( new BoltConnector( "http" ).type.name(), "HTTP" ); - settings.put( new BoltConnector( "http" ).enabled.name(), "true" ); - settings.put( new BoltConnector( "http" ).advertised_address.name(), "http:3001" ); - config.augment( settings ); + private static final Set TAGS = asSet( "tag1", "tag2", "tag3" ); - // when - Map attributes = buildMemberAttributesForCore( memberId, config ).getAttributes(); - Pair extracted = extractMemberAttributesForCore( new MemberImpl( null, null, attributes, - false ) ); + private final HazelcastInstance hzInstance = mock( HazelcastInstance.class ); - // then - assertEquals( memberId, extracted.first() ); - CoreAddresses addresses = extracted.other(); - assertEquals( new AdvertisedSocketAddress( "tx", 1001 ), addresses.getCatchupServer() ); - assertEquals( new AdvertisedSocketAddress( "raft", 2001 ), addresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", 3001 ), addresses.connectors().boltAddress() ); + @Before + public void setup() + { + @SuppressWarnings( "unchecked" ) + MultiMap serverTagsMMap = mock( MultiMap.class ); + when( serverTagsMMap.get( any() ) ).thenReturn( TAGS ); + when( hzInstance.getMultiMap( anyString() ) ).thenReturn( (MultiMap) serverTagsMMap ); } @Test @@ -108,16 +97,16 @@ public void shouldCollectMembersAsAMap() throws Exception } // when - Map coreMemberMap = - HazelcastClusterTopology.toCoreMemberMap( hazelcastMembers, NullLog.getInstance() ); + Map coreMemberMap = toCoreMemberMap( hazelcastMembers, NullLog.getInstance(), hzInstance ); // then for ( int i = 0; i < 5; i++ ) { - CoreAddresses coreAddresses = coreMemberMap.get( coreMembers.get( i ) ); - assertEquals( new AdvertisedSocketAddress( "tx", (i + 1) ), coreAddresses.getCatchupServer() ); - assertEquals( new AdvertisedSocketAddress( "raft", (i + 1) ), coreAddresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreAddresses.connectors().boltAddress() ); + CoreServerInfo coreServerInfo = coreMemberMap.get( coreMembers.get( i ) ); + assertEquals( new AdvertisedSocketAddress( "tx", (i + 1) ), coreServerInfo.getCatchupServer() ); + assertEquals( new AdvertisedSocketAddress( "raft", (i + 1) ), coreServerInfo.getRaftServer() ); + assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreServerInfo.connectors().boltAddress() ); + assertEquals( coreServerInfo.tags(), TAGS ); } } @@ -149,8 +138,7 @@ public void shouldLogAndExcludeMembersWithMissingAttributes() throws Exception hazelcastMembers.add( new MemberImpl( new Address( "localhost", i ), null, attributes, false ) ); } // when - Map map = - HazelcastClusterTopology.toCoreMemberMap( hazelcastMembers, NullLog.getInstance() ); + Map map = toCoreMemberMap( hazelcastMembers, NullLog.getInstance(), hzInstance ); // then assertThat( map.keySet(), hasItems( coreMembers.get( 0 ), coreMembers.get( 1 ), coreMembers.get( 3 ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index 73f89cc1f3e02..14fc786ba2b77 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -35,7 +35,7 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology { private final SharedDiscoveryService sharedDiscoveryService; private final MemberId member; - private final CoreAddresses coreAddresses; + private final CoreServerInfo coreServerInfo; private final Set listeners = new LinkedHashSet<>(); private final Log log; @@ -46,7 +46,7 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology { this.sharedDiscoveryService = sharedDiscoveryService; this.member = member; - this.coreAddresses = extractAddresses( config ); + this.coreServerInfo = extractCoreServerInfo( config ); this.log = logProvider.getLog( getClass() ); } @@ -72,7 +72,7 @@ public void refreshCoreTopology() @Override public void start() throws InterruptedException { - sharedDiscoveryService.registerCoreMember( member, coreAddresses, this ); + sharedDiscoveryService.registerCoreMember( member, coreServerInfo, this ); log.info( "Registered core server %s", member ); sharedDiscoveryService.waitForClusterFormation(); log.info( "Cluster formed" ); @@ -119,12 +119,12 @@ synchronized void onReadReplicaTopologyChange( ReadReplicaTopology readReplicaTo this.readReplicaTopology = readReplicaTopology; } - private static CoreAddresses extractAddresses( Config config ) + private static CoreServerInfo extractCoreServerInfo( Config config ) { AdvertisedSocketAddress raftAddress = config.get( CausalClusteringSettings.raft_advertised_address ); AdvertisedSocketAddress transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address ); ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); - return new CoreAddresses( raftAddress, transactionSource, clientConnectorAddresses ); + return new CoreServerInfo( raftAddress, transactionSource, clientConnectorAddresses ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java index 7e12d162fc85a..2c7de5fd1a787 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java @@ -32,7 +32,7 @@ class SharedDiscoveryReadReplicaClient extends LifecycleAdapter implements ReadReplicaTopologyService { private final SharedDiscoveryService sharedDiscoveryService; - private final ReadReplicaAddresses addresses; + private final ReadReplicaInfo addresses; private final MemberId memberId; private final Log log; @@ -40,7 +40,7 @@ class SharedDiscoveryReadReplicaClient extends LifecycleAdapter implements ReadR LogProvider logProvider ) { this.sharedDiscoveryService = sharedDiscoveryService; - this.addresses = new ReadReplicaAddresses( ClientConnectorAddresses.extractFromConfig( config ), + this.addresses = new ReadReplicaInfo( ClientConnectorAddresses.extractFromConfig( config ), socketAddress( config.get( CausalClusteringSettings.transaction_advertised_address ).toString(), AdvertisedSocketAddress::new ) ); this.memberId = memberId; this.log = logProvider.getLog( getClass() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index b6662f5f9d2aa..41635130fcc75 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -39,8 +39,8 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory { - private final Map coreMembers = new HashMap<>(); - private final Map readReplicaAddresses = new HashMap<>(); + private final Map coreMembers = new HashMap<>(); + private final Map readReplicaInfoMap = new HashMap<>(); private final List coreClients = new ArrayList<>(); private final Lock lock = new ReentrantLock(); @@ -101,7 +101,7 @@ ReadReplicaTopology readReplicaTopology() lock.lock(); try { - return new ReadReplicaTopology( unmodifiableMap( readReplicaAddresses ) ); + return new ReadReplicaTopology( unmodifiableMap( readReplicaInfoMap ) ); } finally { @@ -109,12 +109,12 @@ ReadReplicaTopology readReplicaTopology() } } - void registerCoreMember( MemberId memberId, CoreAddresses coreAddresses, SharedDiscoveryCoreClient client ) + void registerCoreMember( MemberId memberId, CoreServerInfo coreServerInfo, SharedDiscoveryCoreClient client ) { lock.lock(); try { - coreMembers.put( memberId, coreAddresses ); + coreMembers.put( memberId, coreServerInfo ); coreClients.add( client ); enoughMembers.signalAll(); notifyCoreClients(); @@ -149,12 +149,12 @@ private void notifyCoreClients() } } - void registerReadReplica( MemberId memberId, ReadReplicaAddresses readReplicaAddresses ) + void registerReadReplica( MemberId memberId, ReadReplicaInfo readReplicaInfo ) { lock.lock(); try { - this.readReplicaAddresses.put( memberId, readReplicaAddresses ); + this.readReplicaInfoMap.put( memberId, readReplicaInfo ); notifyCoreClients(); } finally @@ -168,7 +168,7 @@ void unRegisterReadReplica( MemberId memberId ) lock.lock(); try { - ReadReplicaAddresses removed = this.readReplicaAddresses.remove( memberId ); + ReadReplicaInfo removed = this.readReplicaInfoMap.remove( memberId ); notifyCoreClients(); } finally diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java index a77540ecb17fa..808fb44bec9dd 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java @@ -19,10 +19,6 @@ */ package org.neo4j.causalclustering.discovery; -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - import org.neo4j.helpers.AdvertisedSocketAddress; import static java.util.Collections.singletonList; @@ -30,30 +26,25 @@ public class TestTopology { - public static Set addressesForReadReplicas( int... ids ) - { - return Arrays.stream( ids ).mapToObj( TestTopology::addressesForReadReplica ).collect( Collectors.toSet() ); - } - private static ClientConnectorAddresses wrapAsClientConnectorAddresses( AdvertisedSocketAddress advertisedSocketAddress ) { return new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ); } - public static CoreAddresses adressesForCore( int id ) + public static CoreServerInfo adressesForCore( int id ) { AdvertisedSocketAddress raftServerAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); AdvertisedSocketAddress catchupServerAddress = new AdvertisedSocketAddress( "localhost", (4000 + id) ); AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (5000 + id) ); - return new CoreAddresses( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ) ); + return new CoreServerInfo( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ) ); } - public static ReadReplicaAddresses addressesForReadReplica( int id ) + public static ReadReplicaInfo addressesForReadReplica( int id ) { AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) ); ClientConnectorAddresses clientConnectorAddresses = new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ); - return new ReadReplicaAddresses( clientConnectorAddresses, advertisedSocketAddress ); + return new ReadReplicaInfo( clientConnectorAddresses, advertisedSocketAddress ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java index 8347c01ccf606..b65c0746424e3 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java @@ -28,10 +28,10 @@ import java.util.UUID; import org.neo4j.causalclustering.core.consensus.LeaderLocator; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.logging.NullLogProvider; @@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; -import static org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1Test.addresses; +import static org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1Test.readReplicaInfoMap; import static org.neo4j.helpers.collection.Iterators.asList; public class ClusterOverviewProcedureTest @@ -51,7 +51,7 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); MemberId theLeader = new MemberId( UUID.randomUUID() ); MemberId follower1 = new MemberId( UUID.randomUUID() ); MemberId follower2 = new MemberId( UUID.randomUUID() ); @@ -60,7 +60,7 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception coreMembers.put( follower1, adressesForCore( 1 ) ); coreMembers.put( follower2, adressesForCore( 2 ) ); - Map readReplicas = addresses( 4, 5 ); + Map readReplicas = readReplicaInfoMap( 4, 5 ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( null, false, coreMembers ) ); when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( readReplicas ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FilterChainTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FilterChainTest.java new file mode 100644 index 0000000000000..f56d955ac2543 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FilterChainTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import org.junit.Test; + +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class FilterChainTest +{ + @Test + public void shouldFilterThroughAll() throws Exception + { + // given + Filter removeValuesOfFive = data -> data.stream().filter( value -> value != 5 ).collect( Collectors.toSet() ); + Filter mustHaveThreeValues = data -> data.size() == 3 ? data : emptySet(); + Filter keepValuesBelowTen = data -> data.stream().filter( value -> value < 10 ).collect( Collectors.toSet() ); + + FilterChain filterChain = new FilterChain<>( asList( removeValuesOfFive, mustHaveThreeValues, keepValuesBelowTen ) ); + Set data = asSet( 5, 5, 5, 3, 5, 10, 9 ); // carefully crafted to check order as well + + // when + data = filterChain.apply( data ); + + // then + assertEquals( asSet( 3, 9 ), data ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRuleTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRuleTest.java new file mode 100644 index 0000000000000..33ce47b388111 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/FirstValidRuleTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class FirstValidRuleTest +{ + @Test + public void shouldUseResultOfFirstNonEmpty() throws Exception + { + // given + Filter removeValuesOfFive = data -> data.stream().filter( value -> value != 5 ).collect( Collectors.toSet() ); + Filter countMoreThanFour = data -> data.size() > 4 ? data : Collections.emptySet(); + Filter countMoreThanThree = data -> data.size() > 3 ? data : Collections.emptySet(); + + FilterChain ruleA = new FilterChain<>( asList( removeValuesOfFive, countMoreThanFour ) ); // should not succeed + FilterChain ruleB = new FilterChain<>( asList( removeValuesOfFive, countMoreThanThree ) ); // should succeed + FilterChain ruleC = new FilterChain<>( singletonList( countMoreThanFour ) ); // never reached + + FirstValidRule firstValidRule = new FirstValidRule<>( asList( ruleA, ruleB, ruleC ) ); + + Set data = asSet( 5, 1, 5, 2, 5, 3, 5, 4 ); + + // when + data = firstValidRule.apply( data ); + + // then + assertEquals( asSet( 1, 2, 3, 4 ), data ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilterTest.java new file mode 100644 index 0000000000000..3ea371014fe86 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/filters/MinimumCountFilterTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.filters; + +import org.junit.Test; + +import java.util.Set; + +import static java.util.Collections.emptySet; +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class MinimumCountFilterTest +{ + @Test + public void shouldFilterBelowCount() throws Exception + { + // given + MinimumCountFilter minFilter = new MinimumCountFilter<>( 3 ); + + Set input = asSet( 1, 2 ); + + // when + Set output = minFilter.apply( input ); + + // then + assertEquals( emptySet(), output ); + } + + @Test + public void shouldPassAtCount() throws Exception + { + // given + MinimumCountFilter minFilter = new MinimumCountFilter<>( 3 ); + + Set input = asSet( 1, 2, 3 ); + + // when + Set output = minFilter.apply( input ); + + // then + assertEquals( input, output ); + } + + @Test + public void shouldPassAboveCount() throws Exception + { + // given + MinimumCountFilter minFilter = new MinimumCountFilter<>( 3 ); + + Set input = asSet( 1, 2, 3, 4 ); + + // when + Set output = minFilter.apply( input ); + + // then + assertEquals( input, output ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java index 56e2c89a9d40f..67a0d1399ea6c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java @@ -32,7 +32,7 @@ import java.util.UUID; import org.neo4j.causalclustering.core.consensus.LeaderLocator; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; @@ -41,7 +41,6 @@ import org.neo4j.kernel.configuration.Config; import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.junit.Assert.assertFalse; import static org.junit.runners.Parameterized.Parameters; import static org.mockito.Mockito.mock; @@ -75,7 +74,7 @@ public void shouldReturnEndpointsInDifferentOrders() throws Exception LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( member( 0 ) ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); coreMembers.put( member( 1 ), adressesForCore( 1 ) ); coreMembers.put( member( 2 ), adressesForCore( 2 ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java index d5835d8bb0468..c2034abb79c33 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java @@ -39,10 +39,10 @@ import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -149,7 +149,7 @@ public void shouldProvideReaderAndRouterForSingleCoreSetup() throws Exception LeaderLocator leaderLocator = mock( LeaderLocator.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); @@ -179,7 +179,7 @@ public void shouldReturnCoreServersWithRouteAllCoresButLeaderAsReadAndSingleWrit LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( member( 0 ) ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); coreMembers.put( member( 1 ), adressesForCore( 1 ) ); coreMembers.put( member( 2 ), adressesForCore( 2 ) ); @@ -215,7 +215,7 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( member( 0 ) ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); @@ -243,12 +243,12 @@ public void shouldReturnTheCoreLeaderForWriteAndReadReplicasAndCoresForReads() t // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); MemberId theLeader = member( 0 ); coreMembers.put( theLeader, adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( addresses( 1 ) ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( readReplicaInfoMap( 1 ) ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( theLeader ); @@ -278,7 +278,7 @@ public void shouldReturnCoreMemberAsReadServerIfNoReadReplicasAvailable() throws // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); MemberId theLeader = member( 0 ); coreMembers.put( theLeader, adressesForCore( 0 ) ); @@ -309,7 +309,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoLeader() throws Exception // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); @@ -338,7 +338,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws // given final CoreTopologyService topologyService = mock( CoreTopologyService.class ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); coreMembers.put( member( 0 ), adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); @@ -370,16 +370,16 @@ private ClusterView run( GetServersProcedureV1 proc ) throws ProcedureException return ClusterView.parse( (List>) rows[1] ); } - public static Map addresses( int... ids ) + public static Map readReplicaInfoMap( int... ids ) { - return Arrays.stream( ids ).mapToObj( GetServersProcedureV1Test::readReplicaAddresses ).collect( Collectors + return Arrays.stream( ids ).mapToObj( GetServersProcedureV1Test::readReplicaInfo ).collect( Collectors .toMap( (p) -> new MemberId( UUID.randomUUID() ), Function.identity() ) ); } - private static ReadReplicaAddresses readReplicaAddresses( int id ) + private static ReadReplicaInfo readReplicaInfo( int id ) { AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) ); - return new ReadReplicaAddresses( + return new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ), new AdvertisedSocketAddress( "localhost", 4000 + id )); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilterTest.java new file mode 100644 index 0000000000000..e4c3da9df2ee1 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/AnyTagFilterTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class AnyTagFilterTest +{ + @Test + public void shouldReturnServersMatchingAnyTag() throws Exception + { + // given + AnyTagFilter tagFilter = new AnyTagFilter( asSet( "china-west", "europe" ) ); + + ServerInfo serverA = new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "china-west" ) ); + ServerInfo serverB = new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "europe" ) ); + ServerInfo serverC = new ServerInfo( new AdvertisedSocketAddress( "bolt", 3 ), asSet( "china", "china-west" ) ); + ServerInfo serverD = new ServerInfo( new AdvertisedSocketAddress( "bolt", 4 ), asSet( "china-west", "china" ) ); + ServerInfo serverE = new ServerInfo( new AdvertisedSocketAddress( "bolt", 5 ), asSet( "china-east", "asia" ) ); + ServerInfo serverF = new ServerInfo( new AdvertisedSocketAddress( "bolt", 6 ), asSet( "europe-west" ) ); + ServerInfo serverG = new ServerInfo( new AdvertisedSocketAddress( "bolt", 7 ), asSet( "china-west", "europe" ) ); + ServerInfo serverH = new ServerInfo( new AdvertisedSocketAddress( "bolt", 8 ), asSet( "africa" ) ); + + Set data = asSet( serverA, serverB, serverC, serverD, serverE, serverF, serverG, serverH ); + + // when + Set output = tagFilter.apply( data ); + + // then + Set ports = new HashSet<>(); + for ( ServerInfo info : output ) + { + ports.add( info.boltAddress().getPort() ); + } + + assertEquals( asSet( 1, 2, 3, 4, 7 ), ports ); + } + + ServerInfo serverInfo( String... tags ) + { + return new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( tags ) ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PoliciesTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PoliciesTest.java new file mode 100644 index 0000000000000..61356f479d027 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/server_policy/PoliciesTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy.server_policy; + +import org.junit.Test; + +import java.util.Set; + +import org.neo4j.causalclustering.load_balancing.filters.Filter; +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.logging.NullLogProvider; + +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.asSet; +import static org.neo4j.helpers.collection.MapUtil.stringMap; + +public class PoliciesTest +{ + @Test + public void shouldSupplyDefaultUnfilteredPolicy() throws Exception + { + // given + Policies policies = new Policies( NullLogProvider.getInstance() ); + + // when + Filter filter = policies.selectFor( emptyMap() ); + Set input = asSet( + new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "tagA" ) ), + new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "tagB" ) ) + ); + + Set output = filter.apply( input ); + + // then + assertEquals( input, output ); + } + + @Test + public void shouldAllowLookupOfAddedPolicy() throws Exception + { + // given + Policies policies = new Policies( NullLogProvider.getInstance() ); + + String myPolicyName = "china"; + Filter myPolicy = data -> data; + + // when + policies.addPolicy( myPolicyName, myPolicy ); + Filter selectedPolicy = policies.selectFor( stringMap( Policies.POLICY_KEY, myPolicyName ) ); + + // then + assertEquals( myPolicy, selectedPolicy ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java index 3e8a14a1d8162..dc7ee65967776 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java @@ -23,10 +23,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.ClusterId; @@ -37,6 +38,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.AnyOf.anyOf; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -46,7 +48,6 @@ public class ConnectToRandomCoreServerTest public void shouldConnectToRandomCoreServer() throws Exception { // given - MemberId memberId1 = new MemberId( UUID.randomUUID() ); MemberId memberId2 = new MemberId( UUID.randomUUID() ); MemberId memberId3 = new MemberId( UUID.randomUUID() ); @@ -59,10 +60,11 @@ public void shouldConnectToRandomCoreServer() throws Exception connectionStrategy.setDiscoveryService( readReplicaTopologyService ); // when - MemberId memberId = connectionStrategy.upstreamDatabase().get(); + Optional memberId = connectionStrategy.upstreamDatabase(); // then - assertThat( memberId, anyOf( equalTo( memberId1 ), equalTo( memberId2 ), equalTo( memberId3 ) ) ); + assertTrue( memberId.isPresent() ); + assertThat( memberId.get(), anyOf( equalTo( memberId1 ), equalTo( memberId2 ), equalTo( memberId3 ) ) ); } static CoreTopology fakeCoreTopology( MemberId... memberIds ) @@ -70,13 +72,13 @@ static CoreTopology fakeCoreTopology( MemberId... memberIds ) assert memberIds.length > 0; ClusterId clusterId = new ClusterId( UUID.randomUUID() ); - Map coreMembers = new HashMap<>(); + Map coreMembers = new HashMap<>(); int offset = 0; for ( MemberId memberId : memberIds ) { - coreMembers.put( memberId, new CoreAddresses( new AdvertisedSocketAddress( "localhost", 5000 + offset ), + coreMembers.put( memberId, new CoreServerInfo( new AdvertisedSocketAddress( "localhost", 5000 + offset ), new AdvertisedSocketAddress( "localhost", 6000 + offset ), new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java index ce24cb83e5e4c..bcf8b58aef28a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java @@ -30,7 +30,7 @@ import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; import org.neo4j.causalclustering.discovery.ClusterTopology; import org.neo4j.causalclustering.discovery.CoreTopology; -import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.MemberId; @@ -131,13 +131,13 @@ private ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds { assert readReplicaIds.length > 0; - Map readReplicas = new HashMap<>(); + Map readReplicas = new HashMap<>(); int offset = 0; for ( MemberId memberId : readReplicaIds ) { - readReplicas.put( memberId, new ReadReplicaAddresses( new ClientConnectorAddresses( singletonList( + readReplicas.put( memberId, new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, new AdvertisedSocketAddress( "localhost", 11000 + offset ) ) ) ), new AdvertisedSocketAddress( "localhost", 10000 + offset ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java index 89c2905d324ba..5a59cd3b8f442 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java @@ -26,7 +26,7 @@ import java.util.Optional; import java.util.UUID; -import org.neo4j.causalclustering.discovery.CoreAddresses; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.ClusterId; @@ -75,7 +75,7 @@ public void shouldDefaultToRandomCoreServerIfNoOtherStrategySpecified() throws E ReadReplicaTopologyService readReplicaTopologyService = mock( ReadReplicaTopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); when( readReplicaTopologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, - mapOf( memberId, mock( CoreAddresses.class ) ) ) ); + mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); defaultStrategy.setDiscoveryService( readReplicaTopologyService ); @@ -96,7 +96,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception ReadReplicaTopologyService readReplicaTopologyService = mock( ReadReplicaTopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); when( readReplicaTopologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, - mapOf( memberId, mock( CoreAddresses.class ) ) ) ); + mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); ConnectToRandomCoreServer shouldNotUse = new ConnectToRandomCoreServer(); @@ -165,11 +165,11 @@ public Optional upstreamDatabase() throws UpstreamDatabaseSelectionExc } } - private Map mapOf( MemberId memberId, CoreAddresses coreAddresses ) + private Map mapOf( MemberId memberId, CoreServerInfo coreServerInfo ) { - HashMap map = new HashMap<>(); + HashMap map = new HashMap<>(); - map.put( memberId, coreAddresses ); + map.put( memberId, coreServerInfo ); return map; }