From 9c448c534eb43089140b4d76aa5ddc3819e3598a Mon Sep 17 00:00:00 2001 From: jimwebber Date: Mon, 6 Feb 2017 16:18:42 +0000 Subject: [PATCH] Fixing the Shared Discovery Service IT. MemberId is now the primary identifier for cores and RRs across whole codebase. Retrofitted some unit level testing against RR-to-RR strategies. IDE reformatted some code sensibly. --- .../neo4j/kernel/lifecycle/LifeSupport.java | 2 - .../discovery/CoreTopology.java | 32 ++-- .../discovery/ReadReplicaTopology.java | 13 ++ .../ConnectToRandomCoreServer.java | 2 +- .../EnterpriseReadReplicaEditionModule.java | 4 +- .../TypicallyConnectToRandomReadReplica.java | 46 ++++-- .../causalclustering/discovery/Cluster.java | 116 +++++++------- .../discovery/ReadReplica.java | 5 +- .../SharedDiscoveryReadReplicaClient.java | 11 +- .../discovery/SharedDiscoveryService.java | 13 +- .../ConnectToRandomCoreServerTest.java | 21 ++- ...picallyConnectToRandomReadReplicaTest.java | 150 ++++++++++++++++++ 12 files changed, 304 insertions(+), 111 deletions(-) create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java diff --git a/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java b/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java index f8f171dab1898..9e35ed5ac7ea7 100644 --- a/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java +++ b/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java @@ -435,8 +435,6 @@ public void start() } catch ( Throwable e ) { - System.out.println("problems --> "); - e.printStackTrace(); currentStatus = changedStatus( instance, currentStatus, LifecycleStatus.STOPPED ); if( e instanceof LifecycleException ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index 86f75d15574b1..23e2b29043b87 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -22,29 +22,25 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.helpers.collection.Pair; import static java.lang.String.format; import static java.util.stream.Collectors.toSet; public class CoreTopology { - public static CoreTopology EMPTY = new CoreTopology( null, false, Collections.emptyMap() ); + static CoreTopology EMPTY = new CoreTopology( null, false, Collections.emptyMap() ); private final ClusterId clusterId; private final boolean canBeBootstrapped; - private final Map coreMembers; + private final Map coreMembers; - public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map coreMembers ) + public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map coreMembers ) { this.clusterId = clusterId; this.canBeBootstrapped = canBeBootstrapped; @@ -79,8 +75,8 @@ public Optional find( MemberId memberId ) @Override public String toString() { - return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", - clusterId, canBeBootstrapped(), coreMembers ); + return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), + coreMembers ); } TopologyDifference difference( CoreTopology other ) @@ -88,11 +84,11 @@ TopologyDifference difference( CoreTopology other ) Set members = coreMembers.keySet(); Set otherMembers = other.coreMembers.keySet(); - Set added = otherMembers.stream().filter( m -> !members.contains(m) ) + Set added = otherMembers.stream().filter( m -> !members.contains( m ) ) .map( memberId -> asDifference( other, memberId ) ).collect( toSet() ); - Set removed = members.stream().filter( m -> !otherMembers.contains(m) ) - .map( memberId -> asDifference(CoreTopology.this, memberId ) ).collect( toSet() ); + Set removed = members.stream().filter( m -> !otherMembers.contains( m ) ) + .map( memberId -> asDifference( CoreTopology.this, memberId ) ).collect( toSet() ); return new TopologyDifference( added, removed ); } @@ -102,6 +98,18 @@ private Difference asDifference( CoreTopology topology, MemberId memberId ) return new Difference( memberId, topology.find( memberId ).orElse( null ) ); } + public Optional anyCoreMemberId() + { + if ( coreMembers.keySet().size() == 0 ) + { + return Optional.empty(); + } + else + { + return coreMembers.keySet().stream().findAny(); + } + } + class TopologyDifference { private Set added; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index a67d6d221bb15..7ab0497b598ff 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -56,4 +56,17 @@ public String toString() { return String.format( "{readReplicas=%s}", readReplicaMembers ); } + + public Optional anyReadReplicaMemberId() + { + if ( readReplicaMembers.keySet().size() == 0 ) + { + return Optional.empty(); + } + else + { + return readReplicaMembers.keySet().stream().findAny(); + } + + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java index 1739cba2a6677..2d755da8bccf0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java @@ -34,7 +34,7 @@ public class ConnectToRandomCoreServer extends UpstreamDatabaseSelectionStrategy public ConnectToRandomCoreServer() { - super( "random" ); + super( "connect-to-random-core-server" ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index ab6d85fc78eca..bf7624f09ac36 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -186,9 +186,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ); CatchUpClient catchUpClient = life.add( new CatchUpClient( readReplicaTopologyService, logProvider, Clocks.systemClock(), - inactivityTimeoutMillis, - - monitors ) ); + inactivityTimeoutMillis, monitors ) ); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java index 4592be96432c6..e3b32cb75a0d6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplica.java @@ -19,45 +19,57 @@ */ package org.neo4j.causalclustering.readreplica; -import java.util.Iterator; import java.util.Optional; -import java.util.Random; -import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class TypicallyConnectToRandomReadReplica extends UpstreamDatabaseSelectionStrategy { - private final Random random = new Random(); + private final ModuloCounter counter = new ModuloCounter( 10 ); public TypicallyConnectToRandomReadReplica() { - super( "random" ); + super( "typically-connect-to-random-read-replica" ); } @Override public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException { - final CoreTopology coreTopology = readReplicaTopologyService.coreServers(); - - if ( coreTopology.members().size() == 0 ) + if ( counter.shouldReturnCoreMemberId() ) { - throw new UpstreamDatabaseSelectionException( "No core servers available" ); + return readReplicaTopologyService.coreServers().anyCoreMemberId(); } + else + { + return readReplicaTopologyService.readReplicas().anyReadReplicaMemberId(); + } + } - int skippedServers = random.nextInt( coreTopology.members().size() ); - - final Iterator iterator = coreTopology.members().iterator(); + private static class ModuloCounter + { + private final int modulo; + private int counter = 0; - MemberId member; - do + ModuloCounter( int modulo ) { - member = iterator.next(); + // e.g. every 10th means 0-9 + this.modulo = modulo -1; } - while ( skippedServers-- > 0 ); - return Optional.ofNullable( member ); + boolean shouldReturnCoreMemberId() + { + if ( counter == modulo ) + { + counter = 0; + return true; + } + else + { + counter++; + return false; + } + } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java index 80bc1b3715c78..7d5a52771c5a8 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java @@ -54,7 +54,6 @@ import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; -import org.neo4j.kernel.impl.store.format.standard.Standard; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; @@ -86,10 +85,9 @@ public class Cluster private Map readReplicas = new ConcurrentHashMap<>(); public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas, - DiscoveryServiceFactory discoveryServiceFactory, - Map coreParams, Map> instanceCoreParams, - Map readReplicaParams, Map> instanceReadReplicaParams, - String recordFormat ) + DiscoveryServiceFactory discoveryServiceFactory, Map coreParams, + Map> instanceCoreParams, Map readReplicaParams, + Map> instanceReadReplicaParams, String recordFormat ) { this.discoveryServiceFactory = discoveryServiceFactory; this.parentDir = parentDir; @@ -125,9 +123,8 @@ public void start() throws InterruptedException, ExecutionException public Set healthyCoreMembers() { - return coreMembers.values().stream() - .filter( db -> db.database().getDependencyResolver().resolveDependency( DatabaseHealth.class ) - .isHealthy() ) + return coreMembers.values().stream().filter( + db -> db.database().getDependencyResolver().resolveDependency( DatabaseHealth.class ).isHealthy() ) .collect( Collectors.toSet() ); } @@ -147,22 +144,19 @@ public CoreClusterMember addCoreMemberWithId( int memberId ) return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, advertisedAddress ); } - public CoreClusterMember addCoreMemberWithIdAndInitialMembers( - int memberId, List initialMembers ) + public CoreClusterMember addCoreMemberWithIdAndInitialMembers( int memberId, + List initialMembers ) { return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, initialMembers ); } - private CoreClusterMember addCoreMemberWithId( - int memberId, - Map extraParams, - Map> instanceExtraParams, - String recordFormat, + private CoreClusterMember addCoreMemberWithId( int memberId, Map extraParams, + Map> instanceExtraParams, String recordFormat, List advertisedAddress ) { - CoreClusterMember coreClusterMember = new CoreClusterMember( - memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory, recordFormat, parentDir, - extraParams, instanceExtraParams ); + CoreClusterMember coreClusterMember = + new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory, + recordFormat, parentDir, extraParams, instanceExtraParams ); coreMembers.put( memberId, coreClusterMember ); return coreClusterMember; } @@ -185,8 +179,9 @@ public ReadReplica addReadReplicaWithIdAndMonitors( int memberId, Monitors monit private ReadReplica addReadReplica( int memberId, String recordFormat, Monitors monitors ) { List hazelcastAddresses = buildAddresses( coreMembers.keySet() ); - ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory, - hazelcastAddresses, readReplicaParams, instanceReadReplicaParams, recordFormat, monitors ); + ReadReplica member = + new ReadReplica( parentDir, memberId, discoveryServiceFactory, hazelcastAddresses, readReplicaParams, + instanceReadReplicaParams, recordFormat, monitors ); readReplicas.put( memberId, member ); return member; } @@ -312,10 +307,10 @@ public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeU public int numberOfCoreMembersReportedByTopology() { - CoreClusterMember aCoreGraphDb = coreMembers.values().stream() - .filter( ( member ) -> member.database() != null ).findAny().get(); - CoreTopologyService coreTopologyService = aCoreGraphDb.database().getDependencyResolver() - .resolveDependency( CoreTopologyService.class ); + CoreClusterMember aCoreGraphDb = + coreMembers.values().stream().filter( ( member ) -> member.database() != null ).findAny().get(); + CoreTopologyService coreTopologyService = + aCoreGraphDb.database().getDependencyResolver().resolveDependency( CoreTopologyService.class ); return coreTopologyService.coreServers().members().size(); } @@ -371,31 +366,31 @@ private boolean isTransientFailure( Throwable e ) // TODO: This should really catch all cases of transient failures. Must be able to express that in a clearer // manner... return (e instanceof IdGenerationException) || isLockExpired( e ) || isLockOnFollower( e ) || - isWriteNotOnLeader( e ); + isWriteNotOnLeader( e ); } private boolean isWriteNotOnLeader( Throwable e ) { return e instanceof WriteOperationsNotAllowedException && - e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) ); + e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) ); } private boolean isLockOnFollower( Throwable e ) { return e instanceof AcquireLockTimeoutException && - (e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) || - e.getCause() instanceof NoLeaderFoundException); + (e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) || + e.getCause() instanceof NoLeaderFoundException); } private boolean isLockExpired( Throwable e ) { return e instanceof TransactionFailureException && - e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException && - ((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() == - LockSessionExpired; + e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException && + ((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() == + LockSessionExpired; } - public static List buildAddresses( Set coreServerIds ) + private static List buildAddresses( Set coreServerIds ) { return coreServerIds.stream().map( Cluster::socketAddressForServer ).collect( toList() ); } @@ -405,14 +400,14 @@ public static AdvertisedSocketAddress socketAddressForServer( int id ) return new AdvertisedSocketAddress( "localhost", (5000 + id) ); } - private void createCoreMembers( final int noOfCoreMembers, - List addresses, Map extraParams, - Map> instanceExtraParams, String recordFormat ) + private void createCoreMembers( final int noOfCoreMembers, List addresses, + Map extraParams, Map> instanceExtraParams, String recordFormat ) { for ( int i = 0; i < noOfCoreMembers; i++ ) { - CoreClusterMember coreClusterMember = new CoreClusterMember( i, noOfCoreMembers, addresses, - discoveryServiceFactory, recordFormat, parentDir, extraParams, instanceExtraParams ); + CoreClusterMember coreClusterMember = + new CoreClusterMember( i, noOfCoreMembers, addresses, discoveryServiceFactory, recordFormat, + parentDir, extraParams, instanceExtraParams ); coreMembers.put( i, coreClusterMember ); } } @@ -455,16 +450,14 @@ private void startReadReplicas( ExecutorService executor ) throws InterruptedExc } } - private void createReadReplicas( int noOfReadReplicas, - final List coreMemberAddresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) + private void createReadReplicas( int noOfReadReplicas, final List coreMemberAddresses, + Map extraParams, Map> instanceExtraParams, String recordFormat ) { for ( int i = 0; i < noOfReadReplicas; i++ ) { - readReplicas.put( i, new ReadReplica( parentDir, i, discoveryServiceFactory, coreMemberAddresses, - extraParams, instanceExtraParams, recordFormat, new Monitors() ) ); + readReplicas.put( i, + new ReadReplica( parentDir, i, discoveryServiceFactory, coreMemberAddresses, extraParams, + instanceExtraParams, recordFormat, new Monitors() ) ); } } @@ -479,32 +472,31 @@ private void shutdownReadReplicas() * memberToLookLike are picked up. */ public static void dataOnMemberEventuallyLooksLike( CoreClusterMember memberThatChanges, - CoreClusterMember memberToLookLike ) - throws TimeoutException, InterruptedException + CoreClusterMember memberToLookLike ) throws TimeoutException, InterruptedException { - await( () -> { - try - { - // We recalculate the DbRepresentation of both source and target, so changes can be picked up - DbRepresentation representationToLookLike = DbRepresentation.of( memberToLookLike.database() ); - DbRepresentation representationThatChanges = DbRepresentation.of( memberThatChanges.database() ); - return representationToLookLike.equals( representationThatChanges ); - } - catch( DatabaseShutdownException e ) - { + await( () -> + { + try + { + // We recalculate the DbRepresentation of both source and target, so changes can be picked up + DbRepresentation representationToLookLike = DbRepresentation.of( memberToLookLike.database() ); + DbRepresentation representationThatChanges = DbRepresentation.of( memberThatChanges.database() ); + return representationToLookLike.equals( representationThatChanges ); + } + catch ( DatabaseShutdownException e ) + { /* * This can happen if the database is still in the process of starting. Yes, the naming * of the exception is unfortunate, since it is thrown when the database lifecycle is not * in RUNNING state and therefore signals general unavailability (e.g still starting) and not * necessarily a database that is shutting down. */ - } - return false; - }, - DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); + } + return false; + }, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); } - public static void dataMatchesEventually( ClusterMember source, Collection targets ) + public static void dataMatchesEventually( ClusterMember source, Collection targets ) throws TimeoutException, InterruptedException { dataMatchesEventually( DbRepresentation.of( source.database() ), targets ); @@ -515,7 +507,7 @@ public static void dataMatchesEventually( ClusterMembe * member. Changes in the member database contents after this method is called do not get * picked up and are not part of the comparison. * - * @param source The database to check against + * @param source The database to check against * @param targets The databases expected to match the contents of member */ public static void dataMatchesEventually( DbRepresentation source, Collection targets ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index ab81100df8763..ded9ff28112b7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -118,7 +118,8 @@ public String routingURI() public void start() { database = new ReadReplicaGraphDatabase( storeDir, Config.embeddedDefaults( config ), - GraphDatabaseDependencies.newDependencies().monitors( monitors ), discoveryServiceFactory, memberId().get() ); + GraphDatabaseDependencies.newDependencies().monitors( monitors ), discoveryServiceFactory, + memberId().get() ); } @Override @@ -127,8 +128,8 @@ public void shutdown() if ( database != null ) { database.shutdown(); + database = null; } - database = null; } public CatchupPollingProcess txPollingClient() diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java index 327f5546f4a10..7e12d162fc85a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.discovery; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -32,28 +33,30 @@ class SharedDiscoveryReadReplicaClient extends LifecycleAdapter implements ReadR { private final SharedDiscoveryService sharedDiscoveryService; private final ReadReplicaAddresses addresses; + private final MemberId memberId; private final Log log; - SharedDiscoveryReadReplicaClient( SharedDiscoveryService sharedDiscoveryService, Config config, + SharedDiscoveryReadReplicaClient( SharedDiscoveryService sharedDiscoveryService, Config config, MemberId memberId, LogProvider logProvider ) { this.sharedDiscoveryService = sharedDiscoveryService; this.addresses = new ReadReplicaAddresses( ClientConnectorAddresses.extractFromConfig( config ), socketAddress( config.get( CausalClusteringSettings.transaction_advertised_address ).toString(), AdvertisedSocketAddress::new ) ); + this.memberId = memberId; this.log = logProvider.getLog( getClass() ); } @Override public void start() throws Throwable { - sharedDiscoveryService.registerReadReplica( addresses ); - log.info( "Registered read replica at %s", addresses ); + sharedDiscoveryService.registerReadReplica( memberId, addresses ); + log.info( "Registered read replica member id: %s at %s", memberId, addresses ); } @Override public void stop() throws Throwable { - sharedDiscoveryService.unRegisterReadReplica( addresses ); + sharedDiscoveryService.unRegisterReadReplica( memberId); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index c7ddd30d36402..b6662f5f9d2aa 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -64,7 +63,7 @@ public ReadReplicaTopologyService readReplicaTopologyService( Config config, Log DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate, MemberId myself ) { - return new SharedDiscoveryReadReplicaClient( this, config, logProvider ); + return new SharedDiscoveryReadReplicaClient( this, config, myself, logProvider ); } void waitForClusterFormation() throws InterruptedException @@ -97,7 +96,7 @@ CoreTopology coreTopology( SharedDiscoveryCoreClient client ) } } - public ReadReplicaTopology readReplicaTopology() + ReadReplicaTopology readReplicaTopology() { lock.lock(); try @@ -150,12 +149,12 @@ private void notifyCoreClients() } } - void registerReadReplica( ReadReplicaAddresses readReplicaAddresses ) + void registerReadReplica( MemberId memberId, ReadReplicaAddresses readReplicaAddresses ) { lock.lock(); try { - this.readReplicaAddresses.put( new MemberId( UUID.randomUUID() ), readReplicaAddresses ); + this.readReplicaAddresses.put( memberId, readReplicaAddresses ); notifyCoreClients(); } finally @@ -164,12 +163,12 @@ void registerReadReplica( ReadReplicaAddresses readReplicaAddresses ) } } - void unRegisterReadReplica( ReadReplicaAddresses readReplicaAddresses ) + void unRegisterReadReplica( MemberId memberId ) { lock.lock(); try { - this.readReplicaAddresses.remove( readReplicaAddresses ); + ReadReplicaAddresses removed = this.readReplicaAddresses.remove( memberId ); notifyCoreClients(); } finally diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java index 824f16bb60ca0..3e8a14a1d8162 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerTest.java @@ -1,3 +1,22 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package org.neo4j.causalclustering.readreplica; import org.junit.Test; @@ -46,7 +65,7 @@ public void shouldConnectToRandomCoreServer() throws Exception assertThat( memberId, anyOf( equalTo( memberId1 ), equalTo( memberId2 ), equalTo( memberId3 ) ) ); } - private CoreTopology fakeCoreTopology( MemberId... memberIds ) + static CoreTopology fakeCoreTopology( MemberId... memberIds ) { assert memberIds.length > 0; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java new file mode 100644 index 0000000000000..ce24cb83e5e4c --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.readreplica; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; +import org.neo4j.causalclustering.discovery.ClusterTopology; +import org.neo4j.causalclustering.discovery.CoreTopology; +import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; +import org.neo4j.causalclustering.discovery.ReadReplicaTopology; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; + +import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.junit.Assert.assertThat; +import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerTest.fakeCoreTopology; + +public class TypicallyConnectToRandomReadReplicaTest +{ + @Test + public void shouldConnectToCoreOneInTenTimesByDefault() throws Exception + { + // given + MemberId theCoreMemberId = new MemberId( UUID.randomUUID() ); + ReadReplicaTopologyService readReplicaTopologyService = + fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ) ) ); + + TypicallyConnectToRandomReadReplica connectionStrategy = new TypicallyConnectToRandomReadReplica(); + connectionStrategy.setDiscoveryService( readReplicaTopologyService ); + + List responses = new ArrayList<>(); + + // when + for ( int i = 0; i < 10; i++ ) + { + responses.add( connectionStrategy.upstreamDatabase().get() ); + } + + // then + assertThat( responses, hasItem( theCoreMemberId ) ); + } + + private ReadReplicaTopologyService fakeTopologyService( CoreTopology coreTopology, + ReadReplicaTopology readReplicaTopology ) + { + return new ReadReplicaTopologyService() + { + @Override + public CoreTopology coreServers() + { + return coreTopology; + } + + @Override + public ReadReplicaTopology readReplicas() + { + return readReplicaTopology; + } + + @Override + public ClusterTopology allServers() + { + return new ClusterTopology( coreTopology, readReplicaTopology ); + } + + @Override + public void init() throws Throwable + { + + } + + @Override + public void start() throws Throwable + { + + } + + @Override + public void stop() throws Throwable + { + + } + + @Override + public void shutdown() throws Throwable + { + + } + }; + } + + private MemberId[] memberIDs( int howMany ) + { + MemberId[] result = new MemberId[howMany]; + + for ( int i = 0; i < howMany; i++ ) + { + result[i] = new MemberId( UUID.randomUUID() ); + } + + return result; + } + + private ReadReplicaTopology fakeReadReplicaTopology( MemberId... readReplicaIds ) + { + assert readReplicaIds.length > 0; + + Map readReplicas = new HashMap<>(); + + int offset = 0; + + for ( MemberId memberId : readReplicaIds ) + { + readReplicas.put( memberId, new ReadReplicaAddresses( new ClientConnectorAddresses( singletonList( + new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, + new AdvertisedSocketAddress( "localhost", 11000 + offset ) ) ) ), + new AdvertisedSocketAddress( "localhost", 10000 + offset ) ) ); + + offset++; + } + + return new ReadReplicaTopology( readReplicas ); + } +}