From 4e5bb646309fe15a24a36304de752c362cc52063 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 7 Feb 2017 12:55:17 +0100 Subject: [PATCH] add more to the GetServersV2 framework Extract common functionality from the V1 code, split procedure functionality from load balancing logic and add an injectable strategy. There is a concrete AllServersStrategy hooked in, but it is merely a plot device for driving out the framework. --- .../core/EnterpriseCoreEditionModule.java | 12 +- .../discovery/ClientConnector.java | 25 ++ .../discovery/ClientConnectorAddresses.java | 2 +- .../discovery/CoreAddresses.java | 4 +- .../discovery/ReadReplicaAddresses.java | 6 +- .../procedures/ClusterOverviewProcedure.java | 4 +- .../load_balancing/EndPoint.java | 26 ++- .../load_balancing/GetServersProcedureV2.java | 216 ------------------ .../load_balancing/LoadBalancingResult.java | 88 +++++++ .../load_balancing/LoadBalancingStrategy.java | 62 +++++ .../causalclustering/load_balancing/Util.java | 43 ++++ .../GetServersProcedureV1.java | 102 ++------- .../procedure/GetServersProcedureV2.java | 85 +++++++ .../ParameterNames.java} | 6 +- .../{ => procedure}/ProcedureNames.java | 4 +- .../procedure/ResultFormatV1.java | 136 +++++++++++ .../strategy/AllServersStrategy.java | 102 +++++++++ .../strategy/ServerShufflingStrategy.java | 53 +++++ .../HazelcastClusterTopologyTest.java | 4 +- ...{TopologyHelper.java => TestTopology.java} | 21 +- .../ClusterOverviewProcedureTest.java | 14 +- .../GetServersProcedureV1RoutingTest.java | 4 +- .../GetServersProcedureV1Test.java | 60 ++--- .../procedure/GetServersProcedureV2Test.java | 76 ++++++ .../procedure/ResultFormatV1Test.java | 70 ++++++ .../strategy/ServerShufflingStrategyTest.java | 101 ++++++++ .../scenarios/ClusterDiscoveryIT.java | 2 +- .../scenarios/ClusterMembershipChangeIT.java | 2 +- .../scenarios/ClusterOverviewIT.java | 2 +- 29 files changed, 969 insertions(+), 363 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnector.java delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV2.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingResult.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingStrategy.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/Util.java rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/{ => procedure}/GetServersProcedureV1.java (58%) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2.java rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/{GetServersParameters.java => procedure/ParameterNames.java} (93%) rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/{ => procedure}/ProcedureNames.java (95%) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategy.java rename enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/{TopologyHelper.java => TestTopology.java} (56%) rename enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/{ => procedure}/GetServersProcedureV1RoutingTest.java (96%) rename enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/{ => procedure}/GetServersProcedureV1Test.java (87%) create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2Test.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1Test.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategyTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 3db7cbefe29e5..eb8c78d097ae9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -42,8 +42,11 @@ import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure; import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.load_balancing.GetServersProcedureV1; -import org.neo4j.causalclustering.load_balancing.GetServersProcedureV2; +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1; +import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV2; +import org.neo4j.causalclustering.load_balancing.strategy.AllServersStrategy; +import org.neo4j.causalclustering.load_balancing.strategy.ServerShufflingStrategy; import org.neo4j.causalclustering.logging.BetterMessageLogger; import org.neo4j.causalclustering.logging.MessageLogger; import org.neo4j.causalclustering.logging.NullMessageLogger; @@ -110,11 +113,14 @@ public enum RaftLogImplementation @Override public void registerEditionSpecificProcedures( Procedures procedures ) throws KernelException { + LoadBalancingStrategy loadBalancingStrategy = new ServerShufflingStrategy( + new AllServersStrategy( topologyService, consensusModule.raftMachine(), config ) ); + procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true ); procedures.register( new GetServersProcedureV1( topologyService, consensusModule.raftMachine(), config, logProvider ) ); procedures.register( - new GetServersProcedureV2( topologyService, consensusModule.raftMachine(), config, logProvider ) ); + new GetServersProcedureV2( loadBalancingStrategy ) ); procedures.register( new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) ); procedures.register( new CoreRoleProcedure( consensusModule.raftMachine() ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnector.java new file mode 100644 index 0000000000000..32aa3c670c356 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnector.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +public interface ClientConnector +{ + ClientConnectorAddresses connectors(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnectorAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnectorAddresses.java index 44b27e491ea59..d37e9461695e0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnectorAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClientConnectorAddresses.java @@ -66,7 +66,7 @@ static ClientConnectorAddresses extractFromConfig( Config config ) return new ClientConnectorAddresses( connectorUris ); } - public AdvertisedSocketAddress getBoltAddress() + public AdvertisedSocketAddress boltAddress() { return connectorUris.stream().filter( connectorUri -> connectorUri.scheme == bolt ).findFirst().orElseThrow( () -> new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java index 076d003ced7e8..b6003d26fef03 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java @@ -21,7 +21,7 @@ import org.neo4j.helpers.AdvertisedSocketAddress; -public class CoreAddresses +public class CoreAddresses implements ClientConnector { private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress catchupServer; @@ -45,7 +45,7 @@ public AdvertisedSocketAddress getCatchupServer() return catchupServer; } - public ClientConnectorAddresses getClientConnectorAddresses() + public ClientConnectorAddresses connectors() { return clientConnectorAddresses; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java index 8857821682ea2..33d7fa2ae0523 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java @@ -19,16 +19,16 @@ */ package org.neo4j.causalclustering.discovery; -public class ReadReplicaAddresses +public class ReadReplicaAddresses implements ClientConnector { private final ClientConnectorAddresses clientConnectorAddresses; - public ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses ) + ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses ) { this.clientConnectorAddresses = clientConnectorAddresses; } - public ClientConnectorAddresses getClientConnectorAddresses() + public ClientConnectorAddresses connectors() { return clientConnectorAddresses; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java index 7a5cf84089b36..893925bd1c0d0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java @@ -88,7 +88,7 @@ public RawIterator apply( Context ctx, Object[] inp for ( MemberId memberId : coreMembers ) { Optional clientConnectorAddresses = - coreTopology.find( memberId ).map( CoreAddresses::getClientConnectorAddresses ); + coreTopology.find( memberId ).map( CoreAddresses::connectors ); if ( clientConnectorAddresses.isPresent() ) { Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER; @@ -101,7 +101,7 @@ public RawIterator apply( Context ctx, Object[] inp } for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().members() ) { - endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.getClientConnectorAddresses(), Role.READ_REPLICA ) ); + endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) ); } Collections.sort( endpoints, ( o1, o2 ) -> o1.addresses().toString().compareTo( o2.addresses().toString() ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/EndPoint.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/EndPoint.java index 69727de043999..86179a3e246fb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/EndPoint.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/EndPoint.java @@ -19,13 +19,15 @@ */ package org.neo4j.causalclustering.load_balancing; +import java.util.Objects; + import org.neo4j.helpers.AdvertisedSocketAddress; /** * This class binds a certain role with an address and * thus defines a reachable endpoint with defined capabilities. */ -class EndPoint +public class EndPoint { private final AdvertisedSocketAddress address; private final Role role; @@ -35,7 +37,7 @@ public String address() return address.toString(); } - private EndPoint( AdvertisedSocketAddress address, Role role ) + public EndPoint( AdvertisedSocketAddress address, Role role ) { this.address = address; this.role = role; @@ -51,11 +53,29 @@ public static EndPoint read( AdvertisedSocketAddress address ) return new EndPoint( address, Role.READ ); } - static EndPoint route( AdvertisedSocketAddress address ) + public static EndPoint route( AdvertisedSocketAddress address ) { return new EndPoint( address, Role.ROUTE ); } + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { return true; } + if ( o == null || getClass() != o.getClass() ) + { return false; } + EndPoint endPoint = (EndPoint) o; + return Objects.equals( address, endPoint.address ) && + role == endPoint.role; + } + + @Override + public int hashCode() + { + return Objects.hash( address, role ); + } + @Override public String toString() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV2.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV2.java deleted file mode 100644 index ed081b692a0b0..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV2.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.load_balancing; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.stream.Stream; - -import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.core.consensus.LeaderLocator; -import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; -import org.neo4j.causalclustering.discovery.CoreAddresses; -import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.collection.RawIterator; -import org.neo4j.helpers.AdvertisedSocketAddress; -import org.neo4j.kernel.api.exceptions.ProcedureException; -import org.neo4j.kernel.api.proc.CallableProcedure; -import org.neo4j.kernel.api.proc.Context; -import org.neo4j.kernel.api.proc.Neo4jTypes; -import org.neo4j.kernel.api.proc.ProcedureSignature; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; - -import static java.util.Collections.emptyList; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Stream.concat; -import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; -import static org.neo4j.causalclustering.load_balancing.Role.READ; -import static org.neo4j.causalclustering.load_balancing.Role.ROUTE; -import static org.neo4j.causalclustering.load_balancing.Role.WRITE; -import static org.neo4j.causalclustering.load_balancing.GetServersParameters.CONTEXT; -import static org.neo4j.causalclustering.load_balancing.GetServersParameters.SERVERS; -import static org.neo4j.causalclustering.load_balancing.GetServersParameters.TTL; -import static org.neo4j.causalclustering.load_balancing.ProcedureNames.GET_SERVERS_V2; -import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; - -/** - * Returns endpoints and their capabilities. - * - * TODO: Detail signature (input and output). - * TODO: This is just a copy of the V1 behaviour right now, to be implemented appropriately. - */ -public class GetServersProcedureV2 implements CallableProcedure -{ - private final String DESCRIPTION = "Returns cluster endpoints and their capabilities."; - - private final ProcedureSignature procedureSignature = - procedureSignature( GET_SERVERS_V2.fullyQualifiedProcedureName() ) - .in( CONTEXT.parameterName(), Neo4jTypes.NTMap ) - .out( TTL.parameterName(), Neo4jTypes.NTInteger ) - .out( SERVERS.parameterName(), Neo4jTypes.NTMap ) - .description( DESCRIPTION ) - .build(); - - private final CoreTopologyService discoveryService; - private final LeaderLocator leaderLocator; - private final Config config; - private final Log log; - - public GetServersProcedureV2( CoreTopologyService discoveryService, LeaderLocator leaderLocator, - Config config, LogProvider logProvider ) - { - this.discoveryService = discoveryService; - this.leaderLocator = leaderLocator; - this.config = config; - this.log = logProvider.getLog( getClass() ); - } - - @Override - public ProcedureSignature signature() - { - return procedureSignature; - } - - @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException - { - List routeEndpoints = routeEndpoints(); - List writeEndpoints = writeEndpoints(); - List readEndpoints = readEndpoints(); - return wrapUpEndpoints( routeEndpoints, writeEndpoints, readEndpoints ); - } - - private Optional leaderAdvertisedSocketAddress() - { - MemberId leader; - try - { - leader = leaderLocator.getLeader(); - } - catch ( NoLeaderFoundException e ) - { - log.debug( "No leader server found. This can happen during a leader switch. No write end points available" ); - return Optional.empty(); - } - - return discoveryService.coreServers().find( leader ) - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); - } - - private List routeEndpoints() - { - Stream routers = - discoveryService.coreServers().addresses().stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); - List routeEndpoints = routers.map( EndPoint::route ).collect( toList() ); - Collections.shuffle( routeEndpoints ); - return routeEndpoints; - } - - private List writeEndpoints() - { - return leaderAdvertisedSocketAddress() - .map( EndPoint::write ).map( Collections::singletonList ).orElse( emptyList() ); - } - - private List readEndpoints() - { - List readReplicas = discoveryService.readReplicas().members().stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ).collect( toList() ); - boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers ); - Stream readCore = addFollowers ? coreReadEndPoints() : Stream.empty(); - List readEndPoints = - concat( readReplicas.stream(), readCore ).map( EndPoint::read ).collect( toList() ); - Collections.shuffle( readEndPoints ); - return readEndPoints; - } - - private Stream coreReadEndPoints() - { - Optional leader = leaderAdvertisedSocketAddress(); - Collection addresses = discoveryService.coreServers().addresses(); - Stream allAddresses = addresses.stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); - - // if the leader is present and it is not alone filter it out from the read end points - if ( leader.isPresent() && addresses.size() > 1 ) - { - AdvertisedSocketAddress advertisedSocketAddress = leader.get(); - return allAddresses.filter( address -> !advertisedSocketAddress.equals( address ) ); - } - - // if there is only the leader return it as read end point - // or if we cannot locate the leader return all cores as read end points - return allAddresses; - } - - private RawIterator wrapUpEndpoints( List routeEndpoints, - List writeEndpoints, List readEndpoints ) - { - Object[] routers = routeEndpoints.stream().map( EndPoint::address ).toArray(); - Object[] readers = readEndpoints.stream().map( EndPoint::address ).toArray(); - Object[] writers = writeEndpoints.stream().map( EndPoint::address ).toArray(); - - List> servers = new ArrayList<>(); - - if ( writers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", WRITE.name() ); - map.put( "addresses", writers ); - - servers.add( map ); - } - - if ( readers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", READ.name() ); - map.put( "addresses", readers ); - - servers.add( map ); - } - - if ( routers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", ROUTE.name() ); - map.put( "addresses", routers ); - - servers.add( map ); - } - - long ttl = MILLISECONDS.toSeconds( config.get( CausalClusteringSettings.cluster_routing_ttl ) ); - Object[] row = new Object[]{ttl, servers}; - return RawIterator.of( row ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingResult.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingResult.java new file mode 100644 index 0000000000000..5ace2555cc8b6 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingResult.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing; + +import java.util.List; +import java.util.Objects; + +/** + * The outcome of applying a load balancing strategy, which will be used by client + * software for scheduling work at the endpoints. + */ +public class LoadBalancingResult implements LoadBalancingStrategy.Result +{ + private final List routeEndpoints; + private final List writeEndpoints; + private final List readEndpoints; + private final long timeToLiveMillis; + + public LoadBalancingResult( List routeEndpoints, List writeEndpoints, + List readEndpoints, long timeToLiveMillis ) + { + this.routeEndpoints = routeEndpoints; + this.writeEndpoints = writeEndpoints; + this.readEndpoints = readEndpoints; + this.timeToLiveMillis = timeToLiveMillis; + } + + @Override + public long getTimeToLiveMillis() + { + return timeToLiveMillis; + } + + @Override + public List routeEndpoints() + { + return routeEndpoints; + } + + @Override + public List writeEndpoints() + { + return writeEndpoints; + } + + @Override + public List readEndpoints() + { + return readEndpoints; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { return true; } + if ( o == null || getClass() != o.getClass() ) + { return false; } + LoadBalancingResult that = (LoadBalancingResult) o; + return timeToLiveMillis == that.timeToLiveMillis && + Objects.equals( routeEndpoints, that.routeEndpoints ) && + Objects.equals( writeEndpoints, that.writeEndpoints ) && + Objects.equals( readEndpoints, that.readEndpoints ); + } + + @Override + public int hashCode() + { + return Objects.hash( routeEndpoints, writeEndpoints, readEndpoints, timeToLiveMillis ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingStrategy.java new file mode 100644 index 0000000000000..2943d980f80aa --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/LoadBalancingStrategy.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing; + +import java.util.List; +import java.util.Map; + +/** + * Defines the interface for an implementation of the GetServersV2 + * cluster discovery and load balancing procedure. + */ +public interface LoadBalancingStrategy +{ + interface Result + { + /** + * @return The time-to-live of the returned result. + */ + long getTimeToLiveMillis(); + + /** + * @return List of ROUTE-capable endpoints. + */ + List routeEndpoints(); + + /** + * @return List of WRITE-capable endpoints. + */ + List writeEndpoints(); + + /** + * @return List of READ-capable endpoints. + */ + List readEndpoints(); + } + + /** + * Runs the procedure using the supplied client context + * and returns the result. + * + * @param context The client supplied context. + * @return The result of invoking the procedure. + */ + Result run( Map context ); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/Util.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/Util.java new file mode 100644 index 0000000000000..ac0338ba037d2 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/Util.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import org.neo4j.causalclustering.discovery.ClientConnector; +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static java.util.Collections.emptyList; + +public class Util +{ + public static List asList( @SuppressWarnings( "OptionalUsedAsFieldOrParameterType" ) Optional optional ) + { + return optional.map( Collections::singletonList ).orElse( emptyList() ); + } + + public static Function extractBoltAddress() + { + return c -> c.connectors().boltAddress(); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java similarity index 58% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java index c827f77f3bac9..76b31bea085b8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java @@ -17,15 +17,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.load_balancing; +package org.neo4j.causalclustering.load_balancing.procedure; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.TreeMap; import java.util.stream.Stream; import org.neo4j.causalclustering.core.CausalClusteringSettings; @@ -34,6 +31,8 @@ import org.neo4j.causalclustering.discovery.CoreAddresses; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.load_balancing.EndPoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; import org.neo4j.collection.RawIterator; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.api.exceptions.ProcedureException; @@ -45,30 +44,24 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static java.util.Collections.emptyList; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Stream.concat; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; -import static org.neo4j.causalclustering.load_balancing.Role.READ; -import static org.neo4j.causalclustering.load_balancing.Role.ROUTE; -import static org.neo4j.causalclustering.load_balancing.Role.WRITE; -import static org.neo4j.causalclustering.load_balancing.GetServersParameters.SERVERS; -import static org.neo4j.causalclustering.load_balancing.GetServersParameters.TTL; -import static org.neo4j.causalclustering.load_balancing.ProcedureNames.GET_SERVERS_V1; -import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; +import static org.neo4j.causalclustering.load_balancing.Util.asList; +import static org.neo4j.causalclustering.load_balancing.Util.extractBoltAddress; +import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.SERVERS; +import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.TTL; +import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V1; /** * Returns endpoints and their capabilities. - * - * TODO: Detail signature (input and output). */ public class GetServersProcedureV1 implements CallableProcedure { private final String DESCRIPTION = "Returns cluster endpoints and their capabilities."; private final ProcedureSignature procedureSignature = - procedureSignature( GET_SERVERS_V1.fullyQualifiedProcedureName() ) + ProcedureSignature.procedureSignature( GET_SERVERS_V1.fullyQualifiedProcedureName() ) .out( TTL.parameterName(), Neo4jTypes.NTInteger ) .out( SERVERS.parameterName(), Neo4jTypes.NTMap ) .description( DESCRIPTION ) @@ -100,10 +93,12 @@ public RawIterator apply( Context ctx, Object[] inp List routeEndpoints = routeEndpoints(); List writeEndpoints = writeEndpoints(); List readEndpoints = readEndpoints(); - return wrapUpEndpoints( routeEndpoints, writeEndpoints, readEndpoints ); + + return ResultFormatV1.build( new LoadBalancingResult( routeEndpoints, writeEndpoints, readEndpoints, + config.get( CausalClusteringSettings.cluster_routing_ttl ) ) ); } - private Optional leaderAdvertisedSocketAddress() + private Optional leaderBoltAddress() { MemberId leader; try @@ -116,15 +111,13 @@ private Optional leaderAdvertisedSocketAddress() return Optional.empty(); } - return discoveryService.coreServers().find( leader ) - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); + return discoveryService.coreServers().find( leader ).map( extractBoltAddress() ); } private List routeEndpoints() { - Stream routers = - discoveryService.coreServers().addresses().stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); + Stream routers = discoveryService.coreServers() + .addresses().stream().map( extractBoltAddress() ); List routeEndpoints = routers.map( EndPoint::route ).collect( toList() ); Collections.shuffle( routeEndpoints ); return routeEndpoints; @@ -132,14 +125,13 @@ private List routeEndpoints() private List writeEndpoints() { - return leaderAdvertisedSocketAddress() - .map( EndPoint::write ).map( Collections::singletonList ).orElse( emptyList() ); + return asList( leaderBoltAddress().map( EndPoint::write ) ); } private List readEndpoints() { List readReplicas = discoveryService.readReplicas().members().stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ).collect( toList() ); + .map( extractBoltAddress() ).collect( toList() ); boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers ); Stream readCore = addFollowers ? coreReadEndPoints() : Stream.empty(); List readEndPoints = @@ -150,64 +142,20 @@ private List readEndpoints() private Stream coreReadEndPoints() { - Optional leader = leaderAdvertisedSocketAddress(); - Collection addresses = discoveryService.coreServers().addresses(); - Stream allAddresses = addresses.stream() - .map( server -> server.getClientConnectorAddresses().getBoltAddress() ); + Optional leader = leaderBoltAddress(); + Collection coreAddresses = discoveryService.coreServers().addresses(); + Stream boltAddresses = discoveryService.coreServers() + .addresses().stream().map( extractBoltAddress() ); // if the leader is present and it is not alone filter it out from the read end points - if ( leader.isPresent() && addresses.size() > 1 ) + if ( leader.isPresent() && coreAddresses.size() > 1 ) { AdvertisedSocketAddress advertisedSocketAddress = leader.get(); - return allAddresses.filter( address -> !advertisedSocketAddress.equals( address ) ); + return boltAddresses.filter( address -> !advertisedSocketAddress.equals( address ) ); } // if there is only the leader return it as read end point // or if we cannot locate the leader return all cores as read end points - return allAddresses; - } - - private RawIterator wrapUpEndpoints( List routeEndpoints, - List writeEndpoints, List readEndpoints ) - { - Object[] routers = routeEndpoints.stream().map( EndPoint::address ).toArray(); - Object[] readers = readEndpoints.stream().map( EndPoint::address ).toArray(); - Object[] writers = writeEndpoints.stream().map( EndPoint::address ).toArray(); - - List> servers = new ArrayList<>(); - - if ( writers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", WRITE.name() ); - map.put( "addresses", writers ); - - servers.add( map ); - } - - if ( readers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", READ.name() ); - map.put( "addresses", readers ); - - servers.add( map ); - } - - if ( routers.length > 0 ) - { - Map map = new TreeMap<>(); - - map.put( "role", ROUTE.name() ); - map.put( "addresses", routers ); - - servers.add( map ); - } - - long ttl = MILLISECONDS.toSeconds( config.get( CausalClusteringSettings.cluster_routing_ttl ) ); - Object[] row = new Object[]{ttl, servers}; - return RawIterator.of( row ); + return boltAddresses; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2.java new file mode 100644 index 0000000000000..d3b957e550865 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.procedure; + +import java.util.Map; + +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.proc.CallableProcedure; +import org.neo4j.kernel.api.proc.Context; +import org.neo4j.kernel.api.proc.Neo4jTypes; +import org.neo4j.kernel.api.proc.ProcedureSignature; + +import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.CONTEXT; +import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.SERVERS; +import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.TTL; +import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V2; +import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; + +/** + * Returns endpoints and their capabilities. + * + * GetServersV2 extends upon V1 by allowing a client context consisting of + * key-value pairs to be supplied to and used by the concrete load + * balancing strategies. + */ +public class GetServersProcedureV2 implements CallableProcedure +{ + private final String DESCRIPTION = "Returns cluster endpoints and their capabilities."; + + private final ProcedureSignature procedureSignature = + procedureSignature( GET_SERVERS_V2.fullyQualifiedProcedureName() ) + .in( CONTEXT.parameterName(), Neo4jTypes.NTMap ) + .out( TTL.parameterName(), Neo4jTypes.NTInteger ) + .out( SERVERS.parameterName(), Neo4jTypes.NTMap ) + .description( DESCRIPTION ) + .build(); + + private final LoadBalancingStrategy loadBalancingStrategy; + + public GetServersProcedureV2( LoadBalancingStrategy loadBalancingStrategy ) + { + this.loadBalancingStrategy = loadBalancingStrategy; + } + + @Override + public ProcedureSignature signature() + { + return procedureSignature; + } + + @Override + public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + { + @SuppressWarnings( "unchecked" ) + Map clientContext = (Map) input[0]; + + LoadBalancingStrategy.Result result = loadBalancingStrategy.run( clientContext ); + + return ResultFormatV1.build( new LoadBalancingResult( + result.routeEndpoints(), + result.writeEndpoints(), + result.readEndpoints(), + result.getTimeToLiveMillis() ) ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersParameters.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ParameterNames.java similarity index 93% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersParameters.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ParameterNames.java index fd99af4581b09..58dbaf9179659 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/GetServersParameters.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ParameterNames.java @@ -17,13 +17,13 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.load_balancing; +package org.neo4j.causalclustering.load_balancing.procedure; /** * Enumerates the parameter names used for the GetServers * procedures in a causal cluster. */ -public enum GetServersParameters +public enum ParameterNames { /** * Type: IN @@ -58,7 +58,7 @@ public enum GetServersParameters private final String parameterName; - GetServersParameters( String parameterName ) + ParameterNames( String parameterName ) { this.parameterName = parameterName; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/ProcedureNames.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ProcedureNames.java similarity index 95% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/ProcedureNames.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ProcedureNames.java index b53dc59780610..57fe06bd128f0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/ProcedureNames.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ProcedureNames.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.load_balancing; +package org.neo4j.causalclustering.load_balancing.procedure; import java.util.Arrays; @@ -34,7 +34,7 @@ public enum ProcedureNames { GET_SERVERS_V1( "getServers" ), - GET_SERVERS_V2( "getServers_v2" ); + GET_SERVERS_V2( "getServersV2" ); private static final String[] nameSpace = new String[]{"dbms", "cluster", "routing"}; private final String name; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1.java new file mode 100644 index 0000000000000..1d543c4f99ef6 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.procedure; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Stream; + +import org.neo4j.causalclustering.load_balancing.EndPoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.causalclustering.load_balancing.Role; +import org.neo4j.collection.RawIterator; +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.kernel.api.exceptions.ProcedureException; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toList; +import static org.neo4j.causalclustering.load_balancing.Role.READ; +import static org.neo4j.causalclustering.load_balancing.Role.ROUTE; +import static org.neo4j.causalclustering.load_balancing.Role.WRITE; + +/** + * The result format of GetServersV1 and GetServersV2 procedures. + */ +class ResultFormatV1 +{ + private static final String ROLE_KEY = "role"; + private static final String ADDRESSES_KEY = "addresses"; + + static RawIterator build( LoadBalancingResult result ) + { + Object[] routers = result.routeEndpoints().stream().map( EndPoint::address ).toArray(); + Object[] readers = result.readEndpoints().stream().map( EndPoint::address ).toArray(); + Object[] writers = result.writeEndpoints().stream().map( EndPoint::address ).toArray(); + + List> servers = new ArrayList<>(); + + if ( writers.length > 0 ) + { + Map map = new TreeMap<>(); + + map.put( ROLE_KEY, WRITE.name() ); + map.put( ADDRESSES_KEY, writers ); + + servers.add( map ); + } + + if ( readers.length > 0 ) + { + Map map = new TreeMap<>(); + + map.put( ROLE_KEY, READ.name() ); + map.put( ADDRESSES_KEY, readers ); + + servers.add( map ); + } + + if ( routers.length > 0 ) + { + Map map = new TreeMap<>(); + + map.put( ROLE_KEY, ROUTE.name() ); + map.put( ADDRESSES_KEY, routers ); + + servers.add( map ); + } + + long timeToLiveSeconds = MILLISECONDS.toSeconds( result.getTimeToLiveMillis() ); + Object[] row = new Object[]{timeToLiveSeconds, servers}; + return RawIterator.of( row ); + } + + static LoadBalancingResult parse( RawIterator records ) throws ProcedureException + { + Object[] record = records.next(); + + long timeToLiveSeconds = (long) record[0]; + @SuppressWarnings( "unchecked" ) + List> endpointData = (List>) record[1]; + + Map> endpoints = parse( endpointData ); + assert !records.hasNext(); + + return new LoadBalancingResult( + endpoints.get( ROUTE ), + endpoints.get( WRITE ), + endpoints.get( READ ), + timeToLiveSeconds * 1000 ); + } + + private static Map> parse( List> result ) + { + Map> endpoints = new HashMap<>(); + for ( Map single : result ) + { + Role role = Role.valueOf( (String) single.get( "role" ) ); + List addresses = parse( (Object[]) single.get( "addresses" ), role ); + endpoints.put( role, addresses ); + } + return endpoints; + } + + private static List parse( Object[] addresses, Role role ) + { + return Stream.of( addresses ) + .map( rawAddress -> parse( (String) rawAddress ) ) + .map( address -> new EndPoint( address, role ) ) + .collect( toList() ); + } + + private static AdvertisedSocketAddress parse( String address ) + { + String[] split = address.split( ":" ); + return new AdvertisedSocketAddress( split[0], Integer.valueOf( split[1] ) ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java new file mode 100644 index 0000000000000..ab04445e01870 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/AllServersStrategy.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.CoreTopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopology; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.load_balancing.EndPoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.kernel.configuration.Config; + +import static java.util.Collections.emptyList; +import static java.util.stream.Stream.concat; +import static org.neo4j.causalclustering.load_balancing.Util.extractBoltAddress; +import static org.neo4j.causalclustering.load_balancing.Util.asList; + +/** + * This is just a simple strategy and not intended for actual use. Will be replaced. + */ +public class AllServersStrategy implements LoadBalancingStrategy +{ + private final CoreTopologyService topologyService; + private final LeaderLocator leaderLocator; + private final Long timeToLive; + + public AllServersStrategy( CoreTopologyService topologyService, LeaderLocator leaderLocator, Config config ) + { + this.topologyService = topologyService; + this.leaderLocator = leaderLocator; + this.timeToLive = config.get( CausalClusteringSettings.cluster_routing_ttl ); + } + + @Override + public Result run( Map context ) + { + CoreTopology cores = topologyService.coreServers(); + ReadReplicaTopology readers = topologyService.readReplicas(); + + return new LoadBalancingResult( routeEndpoints( cores ), writeEndpoints( cores ), + readEndpoints( cores, readers ), timeToLive ); + } + + private List routeEndpoints( CoreTopology cores ) + { + return cores.addresses().stream().map( extractBoltAddress() ) + .map( EndPoint::route ).collect( Collectors.toList() ); + } + + private List writeEndpoints( CoreTopology cores ) + { + MemberId leader; + try + { + leader = leaderLocator.getLeader(); + } + catch ( NoLeaderFoundException e ) + { + return emptyList(); + } + + Optional endPoint = cores.find( leader ) + .map( extractBoltAddress() ) + .map( EndPoint::write ); + + return asList( endPoint ); + } + + private List readEndpoints( CoreTopology cores, ReadReplicaTopology readers ) + { + return concat( readers.members().stream(), cores.addresses().stream() ) + .map( extractBoltAddress() ) + .map( EndPoint::read ) + .collect( Collectors.toList() ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategy.java new file mode 100644 index 0000000000000..dee9dfacfafb6 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategy.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; + +/** + * Shuffles the servers of the delegate around so that every client + * invocation gets a a little bit of that extra entropy spice. + * + * N.B: Lists are shuffled in place. + */ +public class ServerShufflingStrategy implements LoadBalancingStrategy +{ + private final LoadBalancingStrategy delegate; + + public ServerShufflingStrategy( LoadBalancingStrategy delegate ) + { + this.delegate = delegate; + } + + @Override + public Result run( Map context ) + { + Result result = delegate.run( context ); + + Collections.shuffle( result.routeEndpoints() ); + Collections.shuffle( result.writeEndpoints() ); + Collections.shuffle( result.readEndpoints() ); + + return result; + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java index 62d993456d870..3a01a0458dd26 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java @@ -77,7 +77,7 @@ public void shouldStoreMemberIdentityAndAddressesAsMemberAttributes() throws Exc CoreAddresses addresses = extracted.other(); assertEquals( new AdvertisedSocketAddress( "tx", 1001 ), addresses.getCatchupServer() ); assertEquals( new AdvertisedSocketAddress( "raft", 2001 ), addresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", 3001 ), addresses.getClientConnectorAddresses().getBoltAddress() ); + assertEquals( new AdvertisedSocketAddress( "bolt", 3001 ), addresses.connectors().boltAddress() ); } @Test @@ -117,7 +117,7 @@ public void shouldCollectMembersAsAMap() throws Exception CoreAddresses coreAddresses = coreMemberMap.get( coreMembers.get( i ) ); assertEquals( new AdvertisedSocketAddress( "tx", (i + 1) ), coreAddresses.getCatchupServer() ); assertEquals( new AdvertisedSocketAddress( "raft", (i + 1) ), coreAddresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreAddresses.getClientConnectorAddresses().getBoltAddress() ); + assertEquals( new AdvertisedSocketAddress( "bolt", (i + 1) ), coreAddresses.connectors().boltAddress() ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyHelper.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java similarity index 56% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyHelper.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java index e394becc48424..0bfc09d929df6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TopologyHelper.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/TestTopology.java @@ -28,24 +28,29 @@ import static java.util.Collections.singletonList; import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt; -public class TopologyHelper +public class TestTopology { public static Set addressesForReadReplicas( int... ids ) { - return Arrays.stream( ids ).mapToObj( TopologyHelper::addressesForReadReplica ).collect( Collectors.toSet() ); + return Arrays.stream( ids ).mapToObj( TestTopology::addressesForReadReplica ).collect( Collectors.toSet() ); + } + + private static ClientConnectorAddresses wrapAsClientConnectorAddresses( AdvertisedSocketAddress advertisedSocketAddress ) + { + return new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ); } public static CoreAddresses adressesForCore( int id ) { - AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); - return new CoreAddresses( advertisedSocketAddress, advertisedSocketAddress, - new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ) ); + AdvertisedSocketAddress raftServerAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); + AdvertisedSocketAddress catchupServerAddress = new AdvertisedSocketAddress( "localhost", (4000 + id) ); + AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (5000 + id) ); + return new CoreAddresses( raftServerAddress, catchupServerAddress, wrapAsClientConnectorAddresses( boltServerAddress ) ); } public static ReadReplicaAddresses addressesForReadReplica( int id ) { - AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); - return new ReadReplicaAddresses( - new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ) ); + AdvertisedSocketAddress boltServerAddress = new AdvertisedSocketAddress( "localhost", (6000 + id) ); + return new ReadReplicaAddresses( wrapAsClientConnectorAddresses( boltServerAddress ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java index 518ff5067853b..28aa216ca7d1a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java @@ -41,8 +41,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.causalclustering.discovery.TopologyHelper.addressesForReadReplicas; -import static org.neo4j.causalclustering.discovery.TopologyHelper.adressesForCore; +import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplicas; +import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; import static org.neo4j.helpers.collection.Iterators.asList; public class ClusterOverviewProcedureTest @@ -78,11 +78,11 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception // then assertThat( members, IsIterableContainingInOrder.contains( - new Object[]{theLeader.getUuid().toString(), new String[] {"bolt://localhost:3000"}, "LEADER"}, - new Object[]{follower1.getUuid().toString(), new String[] {"bolt://localhost:3001"}, "FOLLOWER"}, - new Object[]{follower2.getUuid().toString(), new String[] {"bolt://localhost:3002"}, "FOLLOWER"}, - new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:3004"}, "READ_REPLICA"}, - new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:3005"}, "READ_REPLICA"} + new Object[]{theLeader.getUuid().toString(), new String[] {"bolt://localhost:5000"}, "LEADER"}, + new Object[]{follower1.getUuid().toString(), new String[] {"bolt://localhost:5001"}, "FOLLOWER"}, + new Object[]{follower2.getUuid().toString(), new String[] {"bolt://localhost:5002"}, "FOLLOWER"}, + new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6004"}, "READ_REPLICA"}, + new Object[]{"00000000-0000-0000-0000-000000000000", new String[] {"bolt://localhost:6005"}, "READ_REPLICA"} ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1RoutingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java similarity index 96% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1RoutingTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java index 6cc084a6910e6..612b412473632 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1RoutingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.load_balancing; +package org.neo4j.causalclustering.load_balancing.procedure; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,7 +45,7 @@ import static org.junit.runners.Parameterized.Parameters; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.causalclustering.discovery.TopologyHelper.adressesForCore; +import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.logging.NullLogProvider.getInstance; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java similarity index 87% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1Test.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java index 6751a325c1e5b..f88d10c59df24 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/GetServersProcedureV1Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.load_balancing; +package org.neo4j.causalclustering.load_balancing.procedure; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,10 +42,10 @@ import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.load_balancing.Role; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.FieldSignature; -import org.neo4j.kernel.api.proc.Neo4jTypes; import org.neo4j.kernel.api.proc.ProcedureSignature; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; @@ -61,12 +61,13 @@ import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_routing_ttl; -import static org.neo4j.causalclustering.discovery.TopologyHelper.addressesForReadReplicas; -import static org.neo4j.causalclustering.discovery.TopologyHelper.adressesForCore; -import static org.neo4j.causalclustering.discovery.TopologyHelper.addressesForReadReplica; +import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplicas; +import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; +import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica; import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.helpers.collection.MapUtil.stringMap; +import static org.neo4j.kernel.api.proc.Neo4jTypes.NTInteger; import static org.neo4j.kernel.api.proc.Neo4jTypes.NTMap; import static org.neo4j.logging.NullLogProvider.getInstance; @@ -76,7 +77,7 @@ public class GetServersProcedureV1Test private final ClusterId clusterId = new ClusterId( UUID.randomUUID() ); @Parameterized.Parameter( 0 ) - public String ignored; // <- JUnit is happy only if this is here! + public String description; @Parameterized.Parameter( 1 ) public Config config; @Parameterized.Parameter( 2 ) @@ -130,7 +131,8 @@ public void shouldHaveCorrectSignature() throws Exception ProcedureSignature signature = proc.signature(); // then - assertThat( signature.outputSignature(), containsInAnyOrder( new FieldSignature( "ttl", Neo4jTypes.NTInteger ), + assertThat( signature.outputSignature(), containsInAnyOrder( + new FieldSignature( "ttl", NTInteger ), new FieldSignature( "servers", NTMap ) ) ); } @@ -157,8 +159,8 @@ public void shouldProvideReaderAndRouterForSingleCoreSetup() throws Exception // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -189,12 +191,12 @@ public void shouldReturnCoreServersWithRouteAllCoresButLeaderAsReadAndSingleWrit // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.writeAddress( adressesForCore( 0 ).getRaftServer() ); - builder.readAddress( adressesForCore( 1 ).getRaftServer() ); - builder.readAddress( adressesForCore( 2 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 1 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 2 ).getRaftServer() ); + builder.writeAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.readAddress( adressesForCore( 1 ).connectors().boltAddress() ); + builder.readAddress( adressesForCore( 2 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 1 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 2 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -223,9 +225,9 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.writeAddress( adressesForCore( 0 ).getRaftServer() ); - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.writeAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -254,13 +256,13 @@ public void shouldReturnTheCoreLeaderForWriteAndReadReplicasAndCoresForReads() t // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.writeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.writeAddress( adressesForCore( 0 ).connectors().boltAddress() ); if ( expectFollowersAsReadEndPoints ) { - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); } - builder.readAddress( addressesForReadReplica( 1 ).getClientConnectorAddresses().getBoltAddress() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.readAddress( addressesForReadReplica( 1 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -289,9 +291,9 @@ public void shouldReturnCoreMemberAsReadServerIfNoReadReplicasAvailable() throws // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.writeAddress( adressesForCore( 0 ).getRaftServer() ); - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.writeAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -319,8 +321,8 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoLeader() throws Exception // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } @@ -349,8 +351,8 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws // then ClusterView.Builder builder = new ClusterView.Builder(); - builder.readAddress( adressesForCore( 0 ).getRaftServer() ); - builder.routeAddress( adressesForCore( 0 ).getRaftServer() ); + builder.readAddress( adressesForCore( 0 ).connectors().boltAddress() ); + builder.routeAddress( adressesForCore( 0 ).connectors().boltAddress() ); assertEquals( builder.build(), clusterView ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2Test.java new file mode 100644 index 0000000000000..0b1c3475230ff --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV2Test.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.procedure; + +import org.junit.Test; + +import java.util.Map; + +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.kernel.api.proc.FieldSignature; +import org.neo4j.kernel.api.proc.ProcedureSignature; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.helpers.collection.MapUtil.stringMap; +import static org.neo4j.kernel.api.proc.Neo4jTypes.NTInteger; +import static org.neo4j.kernel.api.proc.Neo4jTypes.NTMap; + +public class GetServersProcedureV2Test +{ + @Test + public void shouldHaveCorrectSignature() throws Exception + { + // given + GetServersProcedureV2 proc = new GetServersProcedureV2( null ); + + // when + ProcedureSignature signature = proc.signature(); + + // then + assertThat( signature.inputSignature(), containsInAnyOrder( + new FieldSignature( "context", NTMap ) ) ); + + assertThat( signature.outputSignature(), containsInAnyOrder( + new FieldSignature( "ttl", NTInteger ), + new FieldSignature( "servers", NTMap ) ) ); + } + + @Test + public void shouldPassClientContextToStrategy() throws Exception + { + // given + LoadBalancingStrategy strategy = mock( LoadBalancingStrategy.class ); + when( strategy.run( anyMap() ) ).thenReturn( mock( LoadBalancingStrategy.Result.class ) ); + GetServersProcedureV2 getServers = new GetServersProcedureV2( strategy ); + Map clientContext = stringMap( "key", "value", "key2", "value2" ); + + // when + getServers.apply( null, new Object[] { clientContext } ); + + // then + verify( strategy ).run( clientContext ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1Test.java new file mode 100644 index 0000000000000..2be1a44cd7d6f --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/ResultFormatV1Test.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.procedure; + +import org.junit.Test; + +import java.util.List; + +import org.neo4j.causalclustering.load_balancing.EndPoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.collection.RawIterator; +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.kernel.api.exceptions.ProcedureException; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ResultFormatV1Test +{ + @Test + public void shouldSerializeToAndFromRecordFormat() throws Exception + { + // given + List writers = asList( + EndPoint.write( new AdvertisedSocketAddress( "write", 1 ) ), + EndPoint.write( new AdvertisedSocketAddress( "write", 2 ) ), + EndPoint.write( new AdvertisedSocketAddress( "write", 3 ) ) ); + List readers = asList( + EndPoint.read( new AdvertisedSocketAddress( "read", 4 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 5 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 6 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 7 ) ) ); + List routers = singletonList( + EndPoint.route( new AdvertisedSocketAddress( "route", 8 ) ) + ); + + long ttlSeconds = 5; + LoadBalancingResult original = new LoadBalancingResult( routers, writers, readers, ttlSeconds * 1000 ); + + // when + RawIterator records = ResultFormatV1.build( original ); + + // then + assertTrue( records.hasNext() ); + LoadBalancingResult parsed = ResultFormatV1.parse( records ); + + assertFalse( records.hasNext() ); + assertEquals( original, parsed ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategyTest.java new file mode 100644 index 0000000000000..9f6453e9a51dd --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/strategy/ServerShufflingStrategyTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.load_balancing.strategy; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.neo4j.causalclustering.load_balancing.EndPoint; +import org.neo4j.causalclustering.load_balancing.LoadBalancingResult; +import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy; +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ServerShufflingStrategyTest +{ + @Test + public void shouldShuffleServers() throws Exception + { + // given + LoadBalancingStrategy delegate = mock( LoadBalancingStrategy.class ); + + List routers = asList( + EndPoint.route( new AdvertisedSocketAddress( "route", 1 ) ), + EndPoint.route( new AdvertisedSocketAddress( "route", 2 ) ) ); + List writers = asList( + EndPoint.write( new AdvertisedSocketAddress( "write", 3 ) ), + EndPoint.write( new AdvertisedSocketAddress( "write", 4 ) ), + EndPoint.write( new AdvertisedSocketAddress( "write", 5 ) ) ); + List readers = asList( + EndPoint.read( new AdvertisedSocketAddress( "read", 6 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 7 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 8 ) ), + EndPoint.read( new AdvertisedSocketAddress( "read", 9 ) ) ); + + long ttl = 1000; + LoadBalancingStrategy.Result result = new LoadBalancingResult( + new ArrayList<>( routers ), + new ArrayList<>( writers ), + new ArrayList<>( readers ), + ttl ); + + when( delegate.run( any() ) ).thenReturn( result ); + + ServerShufflingStrategy strategy = new ServerShufflingStrategy( delegate ); + + boolean completeShuffle = false; + for ( int i = 0; i < 1000; i++ ) // we try many times to make false negatives extremely unlikely + { + // when + LoadBalancingStrategy.Result shuffledResult = strategy.run( Collections.emptyMap() ); + + // then: should still contain the same endpoints + assertThat( shuffledResult.routeEndpoints(), containsInAnyOrder( routers.toArray() ) ); + assertThat( shuffledResult.writeEndpoints(), containsInAnyOrder( writers.toArray() ) ); + assertThat( shuffledResult.readEndpoints(), containsInAnyOrder( readers.toArray() ) ); + assertEquals( shuffledResult.getTimeToLiveMillis(), ttl ); + + // but possibly in a different order + boolean readersEqual = shuffledResult.readEndpoints().equals( readers ); + boolean writersEqual = shuffledResult.writeEndpoints().equals( writers ); + boolean routersEqual = shuffledResult.routeEndpoints().equals( routers ); + + if ( !readersEqual && !writersEqual && !routersEqual ) + { + // we don't stop until it is completely different + completeShuffle = true; + break; + } + } + + assertTrue( completeShuffle ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterDiscoveryIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterDiscoveryIT.java index 87a7c16b0ac54..8f4be27e26180 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterDiscoveryIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterDiscoveryIT.java @@ -47,7 +47,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; -import static org.neo4j.causalclustering.load_balancing.ProcedureNames.GET_SERVERS_V1; +import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V1; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterMembershipChangeIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterMembershipChangeIT.java index 8ce2b26bc512b..ad395aebd3405 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterMembershipChangeIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterMembershipChangeIT.java @@ -41,7 +41,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.neo4j.causalclustering.load_balancing.ProcedureNames.GET_SERVERS_V1; +import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V1; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java index 65212215caf7d..f43b190f70c5c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterOverviewIT.java @@ -305,7 +305,7 @@ protected boolean matchesSafely( MemberInfo item ) public void describeTo( Description description ) { description.appendText( "MemberInfo with addresses: " ) - .appendValue( coreClusterMember.clientConnectorAddresses().getBoltAddress() ); + .appendValue( coreClusterMember.clientConnectorAddresses().boltAddress() ); } } ).collect( toList() ) );