diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 726c13855aeba..b08304d54762b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -317,6 +317,9 @@ public class CausalClusteringSettings implements LoadableConfig public static final Setting> upstream_selection_strategy = setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" ); + public static final Setting user_defined_upstream_selection_strategy = + setting( "causal_clustering.user_defined_upstream_strategy", STRING, "" ); + @Description( "Tags for the server used when configuring load balancing and replication policies." ) public static final Setting> server_tags = setting( "causal_clustering.server_tags", list( ",", STRING ), "" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/FilterConfigParser.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/FilterConfigParser.java index 07595818218ca..19ffa83d258a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/FilterConfigParser.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/FilterConfigParser.java @@ -33,7 +33,7 @@ import static java.lang.String.format; import static java.util.Collections.singletonList; -class FilterConfigParser +public class FilterConfigParser { private static Filter filterFor( String filterName, String[] args ) throws InvalidFilterSpecification { @@ -84,7 +84,7 @@ private static Filter filterFor( String filterName, String[] args ) } } - static Filter parse( String filterConfig ) throws InvalidFilterSpecification + public static Filter parse( String filterConfig ) throws InvalidFilterSpecification { if ( filterConfig.length() == 0 ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerInfo.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerInfo.java index c34d1ee4149ab..cf478bbb3946f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerInfo.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerInfo.java @@ -22,22 +22,30 @@ import java.util.Objects; import java.util.Set; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; /** * Hold the server information that is interesting for load balancing purposes. */ -class ServerInfo +public class ServerInfo { private final AdvertisedSocketAddress boltAddress; + private MemberId memberId; private Set tags; - ServerInfo( AdvertisedSocketAddress boltAddress, Set tags ) + public ServerInfo( AdvertisedSocketAddress boltAddress, MemberId memberId, Set tags ) { this.boltAddress = boltAddress; + this.memberId = memberId; this.tags = tags; } + public MemberId memberId() + { + return memberId; + } + AdvertisedSocketAddress boltAddress() { return boltAddress; @@ -56,22 +64,19 @@ public boolean equals( Object o ) if ( o == null || getClass() != o.getClass() ) { return false; } ServerInfo that = (ServerInfo) o; - return Objects.equals( boltAddress, that.boltAddress ) && + return Objects.equals( boltAddress, that.boltAddress ) && Objects.equals( memberId, that.memberId ) && Objects.equals( tags, that.tags ); } @Override public int hashCode() { - return Objects.hash( boltAddress, tags ); + return Objects.hash( boltAddress, memberId, tags ); } @Override public String toString() { - return "ServerInfo{" + - "boltAddress=" + boltAddress + - ", tags=" + tags + - '}'; + return "ServerInfo{" + "boltAddress=" + boltAddress + ", memberId=" + memberId + ", tags=" + tags + '}'; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java index 4fd2e48ad72aa..d083e8291eea8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java @@ -28,6 +28,7 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.discovery.TopologyService; @@ -130,8 +131,9 @@ private List writeEndpoints( CoreTopology cores ) private List readEndpoints( CoreTopology coreTopology, ReadReplicaTopology rrTopology, Policy policy ) { - Set possibleReaders = rrTopology.allMemberInfo().stream() - .map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) ) + Set possibleReaders = rrTopology.members().entrySet().stream() + .map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(), + entry.getValue().tags() ) ) .collect( Collectors.toSet() ); if ( allowReadsOnFollowers || possibleReaders.size() == 0 ) @@ -147,9 +149,16 @@ private List readEndpoints( CoreTopology coreTopology, ReadReplicaTopo // we might end up using the leader for reading during this ttl, should be fine in general } - possibleReaders.addAll( validCores.stream().map( coreTopology::find ).map( Optional::get ) - .map( info -> new ServerInfo( info.connectors().boltAddress(), info.tags() ) ) - .collect( Collectors.toSet() ) ); + for ( MemberId validCore : validCores ) + { + Optional coreServerInfo = coreTopology.find( validCore ); + if ( coreServerInfo.isPresent() ) + { + CoreServerInfo serverInfo = coreServerInfo.get(); + possibleReaders.add( + new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.tags() ) ); + } + } } Set readers = policy.apply( possibleReaders ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java similarity index 93% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java index 4b17495f3344f..07ac37e6a53bb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java @@ -28,11 +28,11 @@ import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) -public class ConnectToRandomCoreServer extends UpstreamDatabaseSelectionStrategy +public class ConnectToRandomCoreServerStrategy extends UpstreamDatabaseSelectionStrategy { private final Random random = new Random(); - public ConnectToRandomCoreServer() + public ConnectToRandomCoreServerStrategy() { super( "connect-to-random-core-server" ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenterStrategy.java similarity index 92% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenter.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenterStrategy.java index 1ee6f7689473b..2e18e2fda2f7c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectWithinDataCenterStrategy.java @@ -31,11 +31,11 @@ import org.neo4j.helpers.Service; @Service.Implementation(UpstreamDatabaseSelectionStrategy.class) -public class ConnectWithinDataCenter extends UpstreamDatabaseSelectionStrategy +public class ConnectWithinDataCenterStrategy extends UpstreamDatabaseSelectionStrategy { - private Random random = new Random();; + private Random random = new Random(); - public ConnectWithinDataCenter() + public ConnectWithinDataCenterStrategy() { super( "connect-within-data-center" ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index f998d760c4b73..5a800c868f516 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -248,7 +248,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data new StoreCopyProcess( fileSystem, pageCache, localDatabase, copiedStoreRecovery, remoteStore, logProvider ); - ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); + ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy(); defaultStrategy.setTopologyService( topologyService ); defaultStrategy.setConfig( config ); defaultStrategy.setMyself( myself ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java similarity index 92% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java index 012840c38b0b0..2dd52ca16b574 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java @@ -25,11 +25,11 @@ import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) -public class TypicallyConnectToRandomReadReplica extends UpstreamDatabaseSelectionStrategy +public class TypicallyConnectToRandomReadReplicaStrategy extends UpstreamDatabaseSelectionStrategy { private final ModuloCounter counter = new ModuloCounter( 10 ); - public TypicallyConnectToRandomReadReplica() + public TypicallyConnectToRandomReadReplicaStrategy() { super( "typically-connect-to-random-read-replica" ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java new file mode 100644 index 0000000000000..fbc197f4d14e5 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.readreplica; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.discovery.CoreServerInfo; +import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.load_balancing.filters.Filter; +import org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilterConfigParser; +import org.neo4j.causalclustering.load_balancing.plugins.server_policies.InvalidFilterSpecification; +import org.neo4j.causalclustering.load_balancing.plugins.server_policies.ServerInfo; +import org.neo4j.helpers.Service; + +@Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) +public class UserDefinedConfigurationStrategy extends UpstreamDatabaseSelectionStrategy +{ + public UserDefinedConfigurationStrategy() + { + super( "user-defined" ); + } + + @Override + public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException + { + try + { + Filter filters = FilterConfigParser + .parse( config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ) ); + + Set possibleReaders = topologyService.readReplicas().members().entrySet().stream() + .map( entry -> new ServerInfo( entry.getValue().connectors().boltAddress(), entry.getKey(), + entry.getValue().tags() ) ).collect( Collectors.toSet() ); + + CoreTopology coreTopology = topologyService.coreServers(); + for ( MemberId validCore : coreTopology.members().keySet() ) + { + Optional coreServerInfo = coreTopology.find( validCore ); + if ( coreServerInfo.isPresent() ) + { + CoreServerInfo serverInfo = coreServerInfo.get(); + possibleReaders.add( + new ServerInfo( serverInfo.connectors().boltAddress(), validCore, serverInfo.tags() ) ); + } + } + + return filters.apply( possibleReaders ).stream().map( ServerInfo::memberId ).findAny(); + } + catch ( InvalidFilterSpecification invalidFilterSpecification ) + { + return Optional.empty(); + } + } +} diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy index 19f6edf13de3d..f89b1a0d4c293 100644 --- a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy @@ -1,2 +1,2 @@ -org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer -org.neo4j.causalclustering.readreplica.ConnectWithinDataCenter +org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategy +org.neo4j.causalclustering.readreplica.ConnectWithinDataCenterStrategy diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/AnyTagFilterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/AnyTagFilterTest.java index e31bb96bccc01..297deb0685c7d 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/AnyTagFilterTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/AnyTagFilterTest.java @@ -23,7 +23,9 @@ import java.util.HashSet; import java.util.Set; +import java.util.UUID; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import static org.junit.Assert.assertEquals; @@ -37,14 +39,14 @@ public void shouldReturnServersMatchingAnyTag() throws Exception // given AnyTagFilter tagFilter = new AnyTagFilter( asSet( "china-west", "europe" ) ); - ServerInfo serverA = new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "china-west" ) ); - ServerInfo serverB = new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "europe" ) ); - ServerInfo serverC = new ServerInfo( new AdvertisedSocketAddress( "bolt", 3 ), asSet( "china", "china-west" ) ); - ServerInfo serverD = new ServerInfo( new AdvertisedSocketAddress( "bolt", 4 ), asSet( "china-west", "china" ) ); - ServerInfo serverE = new ServerInfo( new AdvertisedSocketAddress( "bolt", 5 ), asSet( "china-east", "asia" ) ); - ServerInfo serverF = new ServerInfo( new AdvertisedSocketAddress( "bolt", 6 ), asSet( "europe-west" ) ); - ServerInfo serverG = new ServerInfo( new AdvertisedSocketAddress( "bolt", 7 ), asSet( "china-west", "europe" ) ); - ServerInfo serverH = new ServerInfo( new AdvertisedSocketAddress( "bolt", 8 ), asSet( "africa" ) ); + ServerInfo serverA = new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "china-west" ) ); + ServerInfo serverB = new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "europe" ) ); + ServerInfo serverC = new ServerInfo( new AdvertisedSocketAddress( "bolt", 3 ), new MemberId( UUID.randomUUID() ), asSet( "china", "china-west" ) ); + ServerInfo serverD = new ServerInfo( new AdvertisedSocketAddress( "bolt", 4 ), new MemberId( UUID.randomUUID() ), asSet( "china-west", "china" ) ); + ServerInfo serverE = new ServerInfo( new AdvertisedSocketAddress( "bolt", 5 ), new MemberId( UUID.randomUUID() ), asSet( "china-east", "asia" ) ); + ServerInfo serverF = new ServerInfo( new AdvertisedSocketAddress( "bolt", 6 ), new MemberId( UUID.randomUUID() ), asSet( "europe-west" ) ); + ServerInfo serverG = new ServerInfo( new AdvertisedSocketAddress( "bolt", 7 ), new MemberId( UUID.randomUUID() ), asSet( "china-west", "europe" ) ); + ServerInfo serverH = new ServerInfo( new AdvertisedSocketAddress( "bolt", 8 ), new MemberId( UUID.randomUUID() ), asSet( "africa" ) ); Set data = asSet( serverA, serverB, serverC, serverD, serverE, serverF, serverG, serverH ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/PoliciesTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/PoliciesTest.java index ae933e104c47c..5a9ca6352de92 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/PoliciesTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/PoliciesTest.java @@ -22,7 +22,9 @@ import org.junit.Test; import java.util.Set; +import java.util.UUID; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.logging.Log; @@ -46,8 +48,8 @@ public void shouldSupplyDefaultUnfilteredPolicyForEmptyContext() throws Exceptio // when Policy policy = policies.selectFor( emptyMap() ); Set input = asSet( - new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "tagA" ) ), - new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "tagB" ) ) + new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "tagA" ) ), + new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "tagB" ) ) ); Set output = policy.apply( input ); @@ -66,8 +68,8 @@ public void shouldSupplyDefaultUnfilteredPolicyForUnknownPolicyName() throws Exc // when Policy policy = policies.selectFor( stringMap( Policies.POLICY_KEY, "unknown-policy" ) ); Set input = asSet( - new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), asSet( "tagA" ) ), - new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), asSet( "tagB" ) ) + new ServerInfo( new AdvertisedSocketAddress( "bolt", 1 ), new MemberId( UUID.randomUUID() ), asSet( "tagA" ) ), + new ServerInfo( new AdvertisedSocketAddress( "bolt", 2 ), new MemberId( UUID.randomUUID() ), asSet( "tagB" ) ) ); Set output = policy.apply( input ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java similarity index 92% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java index a1f368b27c73f..70dc043aedcb7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java @@ -41,8 +41,9 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.helpers.collection.Iterators.asSet; -public class ConnectToRandomCoreServerTest +public class ConnectToRandomCoreServerStrategyTest { @Test public void shouldConnectToRandomCoreServer() throws Exception @@ -56,7 +57,7 @@ public void shouldConnectToRandomCoreServer() throws Exception when( topologyService.coreServers() ) .thenReturn( fakeCoreTopology( memberId1, memberId2, memberId3 ) ); - ConnectToRandomCoreServer connectionStrategy = new ConnectToRandomCoreServer(); + ConnectToRandomCoreServerStrategy connectionStrategy = new ConnectToRandomCoreServerStrategy(); connectionStrategy.setTopologyService( topologyService ); // when @@ -81,7 +82,7 @@ static CoreTopology fakeCoreTopology( MemberId... memberIds ) coreMembers.put( memberId, new CoreServerInfo( new AdvertisedSocketAddress( "localhost", 5000 + offset ), new AdvertisedSocketAddress( "localhost", 6000 + offset ), new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, - new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ) ) ); + new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ), asSet( "core" ) ) ); offset++; } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java new file mode 100644 index 0000000000000..3da68d7c3b042 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.readreplica; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; +import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.ReadReplicaInfo; +import org.neo4j.causalclustering.discovery.ReadReplicaTopology; +import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.junit.Assert.assertThat; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; +import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; +import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.fakeReadReplicaTopology; +import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.fakeTopologyService; +import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.memberIDs; + +public class TypicallyConnectToRandomReadReplicaStrategyTest +{ + @Test + public void shouldConnectToCoreOneInTenTimesByDefault() throws Exception + { + // given + MemberId theCoreMemberId = new MemberId( UUID.randomUUID() ); + TopologyService topologyService = + fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ) ) ); + + TypicallyConnectToRandomReadReplicaStrategy connectionStrategy = + new TypicallyConnectToRandomReadReplicaStrategy(); + connectionStrategy.setTopologyService( topologyService ); + + List responses = new ArrayList<>(); + + // when + for ( int i = 0; i < 10; i++ ) + { + responses.add( connectionStrategy.upstreamDatabase().get() ); + } + + // then + assertThat( responses, hasItem( theCoreMemberId ) ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java index 8bfa581fded9a..035da45be9201 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java @@ -78,7 +78,7 @@ public void shouldDefaultToRandomCoreServerIfNoOtherStrategySpecified() throws E when( topologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); - ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); + ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy(); defaultStrategy.setTopologyService( topologyService ); UpstreamDatabaseStrategySelector selector = new UpstreamDatabaseStrategySelector( defaultStrategy ); @@ -99,7 +99,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception when( topologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); - ConnectToRandomCoreServer shouldNotUse = new ConnectToRandomCoreServer(); + ConnectToRandomCoreServerStrategy shouldNotUse = new ConnectToRandomCoreServerStrategy(); UpstreamDatabaseSelectionStrategy mockStrategy = mock( UpstreamDatabaseSelectionStrategy.class ); when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java similarity index 59% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java index 0363683148ddb..aef34232b341a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java @@ -21,13 +21,14 @@ import org.junit.Test; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.Set; import java.util.UUID; +import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.ReadReplicaInfo; @@ -35,40 +36,65 @@ import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.helpers.collection.MapUtil; +import org.neo4j.kernel.configuration.Config; import static java.util.Collections.singletonList; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; -import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerTest.fakeCoreTopology; +import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; +import static org.neo4j.helpers.collection.Iterators.asSet; +import static org.neo4j.helpers.collection.MapUtil.stringMap; -public class TypicallyConnectToRandomReadReplicaTest + +public class UserDefinedConfigurationStrategyTest { @Test - public void shouldConnectToCoreOneInTenTimesByDefault() throws Exception + public void shouldPickTheFirstMatchingServer() throws Exception { // given MemberId theCoreMemberId = new MemberId( UUID.randomUUID() ); - TopologyService topologyService = - fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ) ) ); + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( theCoreMemberId ), + fakeReadReplicaTopology( memberIDs( 100 ), new NoEastTagGenerator() ) ); + + UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); + Config config = Config.defaults() + .with( stringMap( CausalClusteringSettings.user_defined_upstream_selection_strategy.name(), + "tags(east); tags(core); halt()" ) ); + + strategy.setConfig( config ); + strategy.setTopologyService( topologyService ); + + //when + + Optional memberId = strategy.upstreamDatabase(); + + // then + assertEquals( theCoreMemberId, memberId.get() ); + } - TypicallyConnectToRandomReadReplica connectionStrategy = new TypicallyConnectToRandomReadReplica(); - connectionStrategy.setTopologyService( topologyService ); + private ReadReplicaTopology fakeReadReplicaTopology( MemberId[] readReplicaIds, NoEastTagGenerator tagGenerator ) + { + assert readReplicaIds.length > 0; - List responses = new ArrayList<>(); + Map readReplicas = new HashMap<>(); - // when - for ( int i = 0; i < 10; i++ ) + int offset = 0; + + for ( MemberId memberId : readReplicaIds ) { - responses.add( connectionStrategy.upstreamDatabase().get() ); + readReplicas.put( memberId, new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( + new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, + new AdvertisedSocketAddress( "localhost", 11000 + offset ) ) ) ), + new AdvertisedSocketAddress( "localhost", 10000 + offset ), tagGenerator.get( memberId ) ) ); + + offset++; } - // then - assertThat( responses, hasItem( theCoreMemberId ) ); + return new ReadReplicaTopology( readReplicas ); } - private TopologyService fakeTopologyService( CoreTopology coreTopology, - ReadReplicaTopology readReplicaTopology ) + static TopologyService fakeTopologyService( CoreTopology coreTopology, ReadReplicaTopology readReplicaTopology ) { return new TopologyService() { @@ -119,7 +145,7 @@ public void shutdown() throws Throwable }; } - private MemberId[] memberIDs( int howMany ) + static MemberId[] memberIDs( int howMany ) { MemberId[] result = new MemberId[howMany]; @@ -131,7 +157,7 @@ private MemberId[] memberIDs( int howMany ) return result; } - private ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds ) + static ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds ) { assert readReplicaIds.length > 0; @@ -151,4 +177,15 @@ private ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds return new ReadReplicaTopology( readReplicas ); } + + private static class NoEastTagGenerator + { + private static final String[] SOME_ORDINALS = {"north", "south", "west"}; + private static final Random random = new Random(); + + public Set get( MemberId memberId ) + { + return asSet( SOME_ORDINALS[random.nextInt( SOME_ORDINALS.length )] ); + } + } }