diff --git a/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java b/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java index 9e35ed5ac7ea7..f8f171dab1898 100644 --- a/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java +++ b/community/common/src/main/java/org/neo4j/kernel/lifecycle/LifeSupport.java @@ -435,6 +435,8 @@ public void start() } catch ( Throwable e ) { + System.out.println("problems --> "); + e.printStackTrace(); currentStatus = changedStatus( instance, currentStatus, LifecycleStatus.STOPPED ); if( e instanceof LifecycleException ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index 18502aeb478cb..b718b2117942c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -31,8 +31,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import org.neo4j.causalclustering.discovery.CoreAddresses; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.CatchupServerAddress; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -48,7 +48,7 @@ public class CatchUpClient extends LifecycleAdapter { private final LogProvider logProvider; - private final TopologyService discoveryService; + private final ReadReplicaTopologyService discoveryService; private final Log log; private final Clock clock; private final Monitors monitors; @@ -57,7 +57,7 @@ public class CatchUpClient extends LifecycleAdapter private NioEventLoopGroup eventLoopGroup; - public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, Clock clock, + public CatchUpClient( ReadReplicaTopologyService discoveryService, LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors ) { this.logProvider = logProvider; @@ -68,12 +68,13 @@ public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, this.monitors = monitors; } - public T makeBlockingRequest( MemberId target, CatchUpRequest request, + public T makeBlockingRequest( MemberId upstream, CatchUpRequest request, CatchUpResponseCallback responseHandler ) throws CatchUpClientException { CompletableFuture future = new CompletableFuture<>(); Optional catchUpAddress = - discoveryService.coreServers().find( target ).map( CoreAddresses::getCatchupServer ); + discoveryService.allServers().find( upstream ).map( CatchupServerAddress::getCatchupServer ); + CatchUpChannel channel = pool.acquire( catchUpAddress.orElseThrow( () -> new CatchUpClientException( "Cannot find the target member socket address" ) ) ); @@ -93,7 +94,7 @@ public T makeBlockingRequest( MemberId target, CatchUpRequest request, channel.send( request ); String operation = String.format( "Timed out executing operation %s on %s (%s)", - request, target, catchUpAddress.get() ); + request, upstream, catchUpAddress.get() ); return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java index ad03225ece2c1..492673390dfa9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java @@ -137,11 +137,8 @@ public synchronized void start() throws Throwable workerGroup = new NioEventLoopGroup( 0, threadFactory ); - ServerBootstrap bootstrap = new ServerBootstrap() - .group( workerGroup ) - .channel( NioServerSocketChannel.class ) - .localAddress( listenAddress.socketAddress() ) - .childHandler( new ChannelInitializer() + ServerBootstrap bootstrap = new ServerBootstrap().group( workerGroup ).channel( NioServerSocketChannel.class ) + .localAddress( listenAddress.socketAddress() ).childHandler( new ChannelInitializer() { @Override protected void initChannel( SocketChannel ch ) @@ -175,14 +172,21 @@ protected void initChannel( SocketChannel ch ) transactionIdStoreSupplier, logicalTransactionStoreSupplier, txPullBatchSize, monitors, logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() ); + pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier, fs, pageCache, logProvider, storeCopyCheckPointMutex ) ); + pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) ); - pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) ); + + if ( coreState != null ) + { + pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) ); + } pipeline.addLast( new ExceptionLoggingHandler( log ) ); pipeline.addLast( new ExceptionMonitoringHandler( - monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class, CatchupServer.class ) ) ); + monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class, + CatchupServer.class ) ) ); pipeline.addLast( new ExceptionSwallowingHandler() ); } } ); @@ -191,15 +195,19 @@ protected void initChannel( SocketChannel ch ) { channel = bootstrap.bind().syncUninterruptibly().channel(); } - catch( Exception e ) + catch ( Exception e ) { // thanks to netty we need to catch everything and do an instanceof because it does not declare properly // checked exception but it still throws them with some black magic at runtime. //noinspection ConstantConditions if ( e instanceof BindException ) { - userLog.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress ); - log.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress, e ); + userLog.error( + "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + + " with value: " + listenAddress ); + log.error( + "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + + " with value: " + listenAddress, e ); throw e; } } @@ -207,12 +215,11 @@ protected void initChannel( SocketChannel ch ) private ChannelInboundHandler decoders( CatchupServerProtocol protocol ) { - RequestDecoderDispatcher decoderDispatcher = - new RequestDecoderDispatcher<>( protocol, logProvider ); + RequestDecoderDispatcher decoderDispatcher = new RequestDecoderDispatcher<>( protocol, logProvider ); decoderDispatcher.register( State.TX_PULL, new TxPullRequestDecoder() ); decoderDispatcher.register( State.GET_STORE, new GetStoreRequestDecoder() ); decoderDispatcher.register( State.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) ); - decoderDispatcher.register( State.GET_CORE_SNAPSHOT, new SimpleRequestDecoder( CoreSnapshotRequest::new) ); + decoderDispatcher.register( State.GET_CORE_SNAPSHOT, new SimpleRequestDecoder( CoreSnapshotRequest::new ) ); return decoderDispatcher; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index f7451523cc0a4..dcaf90921ab7d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -287,10 +287,10 @@ public void onTxStreamFinishedResponse( CompletableFuture signal, private void copyStore() { - MemberId core; + MemberId upstream; try { - core = selectionStrategyPipeline.bestUpstreamDatabase(); + upstream = selectionStrategyPipeline.bestUpstreamDatabase(); } catch ( UpstreamDatabaseSelectionException e ) { @@ -299,10 +299,10 @@ private void copyStore() } StoreId localStoreId = localDatabase.storeId(); - downloadDatabase( core, localStoreId ); + downloadDatabase( upstream, localStoreId ); } - private void downloadDatabase( MemberId core, StoreId localStoreId ) + private void downloadDatabase( MemberId upstream, StoreId localStoreId ) { try { @@ -316,11 +316,11 @@ private void downloadDatabase( MemberId core, StoreId localStoreId ) try { - storeCopyProcess.replaceWithStoreFrom( core, localStoreId ); + storeCopyProcess.replaceWithStoreFrom( upstream, localStoreId ); } catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e ) { - log.warn( String.format( "Error copying store from: %s. Will retry shortly.", core ) ); + log.warn( String.format( "Error copying store from: %s. Will retry shortly.", upstream ) ); return; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CatchupServerAddress.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CatchupServerAddress.java new file mode 100644 index 0000000000000..d33a7efba058d --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CatchupServerAddress.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import org.neo4j.helpers.AdvertisedSocketAddress; + +public interface CatchupServerAddress +{ + AdvertisedSocketAddress getCatchupServer(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java new file mode 100644 index 0000000000000..c749dbeb3a005 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ClusterTopology.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Optional; + +import org.neo4j.causalclustering.identity.MemberId; + +public class ClusterTopology +{ + private final CoreTopology coreTopology; + private final ReadReplicaTopology readReplicaTopology; + + public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readReplicaTopology ) + { + this.coreTopology = coreTopology; + this.readReplicaTopology = readReplicaTopology; + } + + public Optional find( MemberId upstream ) + { + Optional coreAddresses = coreTopology.find( upstream ); + Optional readReplicaAddresses = readReplicaTopology.find( upstream ); + + if ( coreAddresses.isPresent() ) + { + return Optional.of( coreAddresses.get() ); + } + else if ( readReplicaAddresses.isPresent() ) + { + return Optional.of( readReplicaAddresses.get() ); + } + else + { + return Optional.empty(); + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java index b6003d26fef03..60e785dcdba61 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreAddresses.java @@ -21,7 +21,7 @@ import org.neo4j.helpers.AdvertisedSocketAddress; -public class CoreAddresses implements ClientConnector +public class CoreAddresses implements CatchupServerAddress, ClientConnector { private final AdvertisedSocketAddress raftServer; private final AdvertisedSocketAddress catchupServer; @@ -40,6 +40,7 @@ public AdvertisedSocketAddress getRaftServer() return raftServer; } + @Override public AdvertisedSocketAddress getCatchupServer() { return catchupServer; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java index d4ba29d069acb..5f63028176493 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java @@ -21,10 +21,8 @@ import org.neo4j.causalclustering.identity.ClusterId; -public interface CoreTopologyService extends TopologyService +public interface CoreTopologyService extends ReadReplicaTopologyService { - ReadReplicaTopology readReplicas(); - void addCoreTopologyListener( Listener listener ); /** diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java index e7dd06469915e..64b2ee2bfb11a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java @@ -30,6 +30,7 @@ public interface DiscoveryServiceFactory CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider ); - TopologyService readReplicaDiscoveryService( Config config, LogProvider logProvider, - DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate ); + ReadReplicaTopologyService readReplicaTopologyService( Config config, LogProvider logProvider, + DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, + long readReplicaRefreshRate, MemberId myself ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index ed910f78c5142..3909233938c8d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -24,7 +24,10 @@ import java.util.function.Function; +import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; @@ -32,28 +35,35 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME; -class HazelcastClient extends LifecycleAdapter implements TopologyService +class HazelcastClient extends LifecycleAdapter implements ReadReplicaTopologyService { static final RenewableTimeoutService.TimeoutName REFRESH_READ_REPLICA = () -> "Refresh Read Replica"; private final Log log; private final ClientConnectorAddresses connectorAddresses; private final HazelcastConnector connector; private final RenewableTimeoutService renewableTimeoutService; + private final AdvertisedSocketAddress transactionSource; private HazelcastInstance hazelcastInstance; private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer; private final long readReplicaTimeToLiveTimeout; private final long readReplicaRefreshRate; + private MemberId myself; HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config, - RenewableTimeoutService renewableTimeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate ) + RenewableTimeoutService renewableTimeoutService, long readReplicaTimeToLiveTimeout, + long readReplicaRefreshRate, MemberId myself ) { this.connector = connector; this.renewableTimeoutService = renewableTimeoutService; this.readReplicaRefreshRate = readReplicaRefreshRate; this.log = logProvider.getLog( getClass() ); this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); + this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address ); this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout; + this.myself = myself; } @Override @@ -61,35 +71,68 @@ public CoreTopology coreServers() { try { - return retry( ( hazelcastInstance ) -> - HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ) ); + return retry( ( hazelcastInstance ) -> HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ) ); } catch ( Exception e ) { - log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " - + "Connection will be reattempted on next polling attempt.", e ); + log.info( + "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " + + "Connection will be reattempted on next polling attempt.", e ); return CoreTopology.EMPTY; } } + @Override + public ReadReplicaTopology readReplicas() + { + try + { + return retry( ( hazelcastInstance ) -> HazelcastClusterTopology + .getReadReplicaTopology( hazelcastInstance, log ) ); + } + catch ( Exception e ) + { + log.info( + "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " + + "Connection will be reattempted on next polling attempt.", e ); + return ReadReplicaTopology.EMPTY; + } + } + + @Override + public ClusterTopology allServers() + { + return new ClusterTopology( coreServers(), readReplicas() ); + } + @Override public void start() throws Throwable { - readReplicaRefreshTimer = renewableTimeoutService.create( REFRESH_READ_REPLICA, readReplicaRefreshRate, 0, timeout -> { - timeout.renew(); - retry( this::addReadReplica ); - } ); + readReplicaRefreshTimer = + renewableTimeoutService.create( REFRESH_READ_REPLICA, readReplicaRefreshRate, 0, timeout -> + { + timeout.renew(); + retry( this::addReadReplica ); + } ); } - private Object addReadReplica( HazelcastInstance hazelcastInstance ) + private Void addReadReplica( HazelcastInstance hazelcastInstance ) { String uuid = hazelcastInstance.getLocalEndpoint().getUuid(); String addresses = connectorAddresses.toString(); - log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses ); + log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses ); - return hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ) + hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ) .put( uuid, addresses, readReplicaTimeToLiveTimeout, MILLISECONDS ); + + hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ) + .put( uuid, transactionSource.toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); + + hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) + .put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); + + return null; // return value not used. } @Override @@ -114,7 +157,7 @@ public synchronized void stop() throws Throwable readReplicaRefreshTimer.cancel(); } - private synchronized T retry( Function hazelcastOperation ) + private synchronized T retry( Function hazelcastOperation ) { boolean attemptedConnection = false; HazelcastInstanceNotActiveException exception = null; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index a2861ac3a120f..369d7c87ecf88 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -19,18 +19,18 @@ */ package org.neo4j.causalclustering.discovery; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - import com.hazelcast.config.MemberAttributeConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IMap; import com.hazelcast.core.Member; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -40,15 +40,15 @@ import org.neo4j.logging.Log; import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toSet; - import static org.neo4j.helpers.SocketAddressFormat.socketAddress; class HazelcastClusterTopology { // hz client uuid string -> boltAddress string static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read-replicas"; + static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers"; + static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids"; private static final String CLUSTER_UUID = "cluster_uuid"; static final String MEMBER_UUID = "member_uuid"; static final String TRANSACTION_SERVER = "transaction_server"; @@ -58,7 +58,7 @@ class HazelcastClusterTopology static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log ) { - Set readReplicas = emptySet(); + Map readReplicas = emptyMap(); if ( hazelcastInstance != null ) { @@ -108,15 +108,26 @@ static boolean casClusterId( HazelcastInstance hazelcastInstance, ClusterId clus return uuidReference.compareAndSet( null, clusterId.uuid() ) || uuidReference.get().equals( clusterId.uuid() ); } - private static Set readReplicas( HazelcastInstance hazelcastInstance ) + private static Map readReplicas( HazelcastInstance hazelcastInstance ) { - IMap readReplicaMap = hazelcastInstance.getMap( - READ_REPLICA_BOLT_ADDRESS_MAP_NAME ); + IMap clientAddressMap = + hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ); + + IMap txServerMap = hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ); + + IMap memberIdMap = hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ); - return readReplicaMap - .entrySet().stream() - .map( entry -> new ReadReplicaAddresses( ClientConnectorAddresses.fromString( entry.getValue() ) ) ) - .collect( toSet() ); + Map result = new HashMap<>( ); + + for ( String hzUUID : clientAddressMap.keySet() ) + { + ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.fromString( clientAddressMap.get( hzUUID ) ); + AdvertisedSocketAddress catchupAddress = socketAddress( txServerMap.get( hzUUID ), AdvertisedSocketAddress::new ); + + result.put( new MemberId( UUID.fromString( memberIdMap.get( hzUUID ) ) ), + new ReadReplicaAddresses( clientConnectorAddresses, catchupAddress ) ) ; + } + return result; } private static boolean canBeBootstrapped( Set coreMembers ) @@ -133,7 +144,27 @@ static Map toCoreMemberMap( Set members, Log log { try { - Pair pair = extractMemberAttributes( member ); + Pair pair = extractMemberAttributesForCore( member ); + coreMembers.put( pair.first(), pair.other() ); + } + catch ( IllegalArgumentException e ) + { + log.warn( "Incomplete member attributes supplied from Hazelcast", e ); + } + } + + return coreMembers; + } + + static Map toReadReplicaMemberMap( Set members, Log log ) + { + Map coreMembers = new HashMap<>(); + + for ( Member member : members ) + { + try + { + Pair pair = extractMemberAttributesForCore( member ); coreMembers.put( pair.first(), pair.other() ); } catch ( IllegalArgumentException e ) @@ -145,13 +176,12 @@ static Map toCoreMemberMap( Set members, Log log return coreMembers; } - static MemberAttributeConfig buildMemberAttributes( MemberId myself, Config config ) + static MemberAttributeConfig buildMemberAttributesForCore( MemberId myself, Config config ) { MemberAttributeConfig memberAttributeConfig = new MemberAttributeConfig(); memberAttributeConfig.setStringAttribute( MEMBER_UUID, myself.getUuid().toString() ); - AdvertisedSocketAddress discoveryAddress = - config.get( CausalClusteringSettings.discovery_advertised_address ); + AdvertisedSocketAddress discoveryAddress = config.get( CausalClusteringSettings.discovery_advertised_address ); memberAttributeConfig.setStringAttribute( DISCOVERY_SERVER, discoveryAddress.toString() ); AdvertisedSocketAddress transactionSource = @@ -166,14 +196,13 @@ static MemberAttributeConfig buildMemberAttributes( MemberId myself, Config conf return memberAttributeConfig; } - static Pair extractMemberAttributes( Member member ) + static Pair extractMemberAttributesForCore( Member member ) { MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) ); return Pair.of( memberId, new CoreAddresses( socketAddress( member.getStringAttribute( RAFT_SERVER ), AdvertisedSocketAddress::new ), socketAddress( member.getStringAttribute( TRANSACTION_SERVER ), AdvertisedSocketAddress::new ), - ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) - ) ); + ClientConnectorAddresses.fromString( member.getStringAttribute( CLIENT_CONNECTOR_ADDRESSES ) ) ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index a69b375b75754..092f0633523c7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -182,7 +182,7 @@ private HazelcastInstance createHazelcastInstance() c.setNetworkConfig( networkConfig ); - MemberAttributeConfig memberAttributeConfig = HazelcastClusterTopology.buildMemberAttributes( myself, config ); + MemberAttributeConfig memberAttributeConfig = HazelcastClusterTopology.buildMemberAttributesForCore( myself, config ); c.setMemberAttributeConfig( memberAttributeConfig ); userLog.info( "Waiting for other members to join cluster before continuing..." ); @@ -223,6 +223,12 @@ public ReadReplicaTopology readReplicas() return latestReadReplicaTopology; } + @Override + public ClusterTopology allServers() + { + return new ClusterTopology( coreServers(), readReplicas() ); + } + @Override public CoreTopology coreServers() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java index c73acb776777d..ea43bc7115e06 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java @@ -37,14 +37,14 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself, } @Override - public TopologyService readReplicaDiscoveryService( Config config, - LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, - long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate ) + public ReadReplicaTopologyService readReplicaTopologyService( Config config, LogProvider logProvider, + DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, + long readReplicaRefreshRate, MemberId myself ) { configureHazelcast( config ); return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, config, timeoutService, - readReplicaTimeToLiveTimeout, readReplicaRefreshRate ); + readReplicaTimeToLiveTimeout, readReplicaRefreshRate, myself ); } private static void configureHazelcast( Config config ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java index 33d7fa2ae0523..b9700ff7e3af2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaAddresses.java @@ -19,13 +19,17 @@ */ package org.neo4j.causalclustering.discovery; -public class ReadReplicaAddresses implements ClientConnector +import org.neo4j.helpers.AdvertisedSocketAddress; + +public class ReadReplicaAddresses implements CatchupServerAddress, ClientConnector { + private final AdvertisedSocketAddress catchupServerAddress; private final ClientConnectorAddresses clientConnectorAddresses; - ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses ) + public ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses, AdvertisedSocketAddress catchupServerAddress ) { this.clientConnectorAddresses = clientConnectorAddresses; + this.catchupServerAddress = catchupServerAddress; } public ClientConnectorAddresses connectors() @@ -38,4 +42,10 @@ public String toString() { return String.format( "ReadReplicaAddresses{clientConnectorAddresses=%s}", clientConnectorAddresses ); } + + @Override + public AdvertisedSocketAddress getCatchupServer() + { + return catchupServerAddress; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index 2be2293ddace9..e3b2fb96efa98 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -19,36 +19,48 @@ */ package org.neo4j.causalclustering.discovery; -import java.util.HashSet; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.neo4j.causalclustering.identity.ClusterId; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.collection.Pair; +import static java.util.Collections.emptyMap; + + public class ReadReplicaTopology { - private final Set readReplicaMembers; + public static final ReadReplicaTopology EMPTY = new ReadReplicaTopology( emptyMap() ); + + private final Map readReplicaMembers; - public ReadReplicaTopology( Set readReplicaMembers ) + public ReadReplicaTopology( Map readReplicaMembers ) { this.readReplicaMembers = readReplicaMembers; } - public Set members() + public Collection members() { - return readReplicaMembers; + return readReplicaMembers.values(); } - public Set difference( ReadReplicaTopology other ) + public Optional find( MemberId memberId ) { - Pair, Set> split = split( readReplicaMembers, other.members() ); - Set big = split.first(); - Set small = split.other(); - - return big.stream().filter( n -> !small.contains( n ) ).collect( Collectors.toSet() ); + return Optional.ofNullable( readReplicaMembers.get( memberId ) ); } +// public Set difference( ReadReplicaTopology other ) +// { +// Pair, Set> split = split( readReplicaMembers, other.members() ); +// Set big = split.first(); +// Set small = split.other(); +// +// return big.stream().filter( n -> !small.contains( n ) ).collect( Collectors.toSet() ); +// } + private Pair, Set> split( Set one, Set two ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyService.java similarity index 87% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyService.java index c333e52c6e068..2845cf45513ac 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopologyService.java @@ -21,7 +21,11 @@ import org.neo4j.kernel.lifecycle.Lifecycle; -public interface TopologyService extends Lifecycle +public interface ReadReplicaTopologyService extends Lifecycle { CoreTopology coreServers(); + + ReadReplicaTopology readReplicas(); + + ClusterTopology allServers(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java index 40f7a253ade74..84dda0ecd2f67 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java @@ -38,7 +38,7 @@ public MemberId( UUID uuid ) { Objects.requireNonNull( uuid ); this.uuid = uuid; - shortName = uuid.toString().substring( 0, 8 ); + shortName = uuid.toString();//.substring( 0, 8 ); } public UUID getUuid() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomUpstreamCoreServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java similarity index 89% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomUpstreamCoreServer.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java index 8043a8ac06610..1739cba2a6677 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomUpstreamCoreServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java @@ -28,11 +28,11 @@ import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) -public class ConnectToRandomUpstreamCoreServer extends UpstreamDatabaseSelectionStrategy +public class ConnectToRandomCoreServer extends UpstreamDatabaseSelectionStrategy { private final Random random = new Random(); - public ConnectToRandomUpstreamCoreServer() + public ConnectToRandomCoreServer() { super( "random" ); } @@ -40,7 +40,7 @@ public ConnectToRandomUpstreamCoreServer() @Override public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException { - final CoreTopology coreTopology = topologyService.coreServers(); + final CoreTopology coreTopology = readReplicaTopologyService.coreServers(); if ( coreTopology.members().size() == 0 ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 2e6724767c960..f35908a9ef1ef 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -20,12 +20,15 @@ package org.neo4j.causalclustering.readreplica; import java.io.File; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.neo4j.backup.OnlineBackupKernelExtension; import org.neo4j.backup.OnlineBackupSettings; import org.neo4j.causalclustering.catchup.CatchUpClient; +import org.neo4j.causalclustering.catchup.CatchupServer; +import org.neo4j.causalclustering.catchup.CheckpointerSupplier; import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.RemoteStore; @@ -39,9 +42,10 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.discovery.procedures.ReadReplicaRoleProcedure; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.io.fs.FileSystemAbstraction; @@ -78,6 +82,7 @@ import org.neo4j.kernel.impl.store.id.IdReuseEligibility; import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; +import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; @@ -107,7 +112,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke } EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, - final DiscoveryServiceFactory discoveryServiceFactory ) + final DiscoveryServiceFactory discoveryServiceFactory, MemberId myself ) { LogService logging = platformModule.logging; @@ -172,14 +177,16 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke long readReplicaTimeToLiveTimeout = config.get( CausalClusteringSettings.read_replica_time_to_live ); long readReplicaRefreshRate = config.get( CausalClusteringSettings.read_replica_refresh_rate ); - TopologyService topologyService = discoveryServiceFactory - .readReplicaDiscoveryService( config, logProvider, refreshReadReplicaTimeoutService, - readReplicaTimeToLiveTimeout, readReplicaRefreshRate ); - life.add( dependencies.satisfyDependency( topologyService ) ); + logProvider.getLog( getClass() ).info( String.format( "Generated new id: %s", myself) ); + + ReadReplicaTopologyService readReplicaTopologyService = discoveryServiceFactory + .readReplicaTopologyService( config, logProvider, refreshReadReplicaTimeoutService, + readReplicaTimeToLiveTimeout, readReplicaRefreshRate, myself ); + life.add( dependencies.satisfyDependency( readReplicaTopologyService ) ); long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ); CatchUpClient catchUpClient = life.add( - new CatchUpClient( topologyService, logProvider, Clocks.systemClock(), inactivityTimeoutMillis, + new CatchUpClient( readReplicaTopologyService, logProvider, Clocks.systemClock(), inactivityTimeoutMillis, monitors ) ); @@ -241,16 +248,16 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, pageCache, localDatabase, copiedStoreRecovery, remoteStore, logProvider ); - ConnectToRandomUpstreamCoreServer defaultStrategy = new ConnectToRandomUpstreamCoreServer(); - defaultStrategy.setDiscoveryService( topologyService ); + ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); + defaultStrategy.setDiscoveryService( readReplicaTopologyService ); - UpstreamDatabaseStrategySelector selectionStrategyPipeline = + UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector = new UpstreamDatabaseStrategySelector( defaultStrategy, - new UpstreamDatabaseStrategiesLoader( topologyService, config ) ); + new UpstreamDatabaseStrategiesLoader( readReplicaTopologyService, config ), myself ); CatchupPollingProcess catchupProcess = new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient, - selectionStrategyPipeline, catchupTimeoutService, + upstreamDatabaseStrategySelector, catchupTimeoutService, config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier, platformModule.monitors, storeCopyProcess, databaseHealthSupplier ); dependencies.satisfyDependencies( catchupProcess ); @@ -261,10 +268,21 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data txPulling.add( new WaitForUpToDateStore( catchupProcess, logProvider ) ); ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ); - life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, selectionStrategyPipeline, + life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, upstreamDatabaseStrategySelector, retryStrategy, logProvider, platformModule.logging.getUserLogProvider(), storeCopyProcess ) ); + CatchupServer catchupServer = new CatchupServer( platformModule.logging.getInternalLogProvider(), + platformModule.logging.getUserLogProvider(), localDatabase::storeId, + platformModule.dependencies.provideDependency( TransactionIdStore.class ), + platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), + localDatabase::dataSource, localDatabase::isAvailable, null, config, platformModule.monitors, + new CheckpointerSupplier( platformModule.dependencies ), fileSystem ); + + servicesToStopOnStoreCopy.add( catchupServer ); + dependencies.satisfyDependency( createSessionTracker() ); + + life.add( catchupServer ); // must start last and stop first, since it handles external requests } private void registerRecovery( final DatabaseInfo databaseInfo, LifeSupport life, diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java index 1df88416d57b6..2be51248c2790 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java @@ -20,10 +20,12 @@ package org.neo4j.causalclustering.readreplica; import java.io.File; +import java.util.UUID; import java.util.function.Function; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.EditionModule; @@ -36,14 +38,15 @@ public class ReadReplicaGraphDatabase extends GraphDatabaseFacade { public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies ) { - this( storeDir, config, dependencies, new HazelcastDiscoveryServiceFactory() ); + this( storeDir, config, dependencies, new HazelcastDiscoveryServiceFactory(), new MemberId( UUID.randomUUID() ) ); } public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies, - DiscoveryServiceFactory discoveryServiceFactory ) + DiscoveryServiceFactory discoveryServiceFactory, MemberId memberId ) { Function factory = - ( platformModule ) -> new EnterpriseReadReplicaEditionModule( platformModule, discoveryServiceFactory ); + ( platformModule ) -> new EnterpriseReadReplicaEditionModule( platformModule, + discoveryServiceFactory, memberId ); new GraphDatabaseFacadeFactory( DatabaseInfo.READ_REPLICA, factory ).initFacade( storeDir, config, dependencies, this ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java index 26557fefd570b..c37d6a8bd41d5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java @@ -79,22 +79,22 @@ private String issueOf( String operation, int attempt ) @Override public void start() throws IOException { - boolean syncedWithCore = false; + boolean syncedWithUpstream = false; RetryStrategy.Timeout timeout = retryStrategy.newTimeout(); int attempt = 0; - while ( !syncedWithCore ) + while ( !syncedWithUpstream ) { attempt++; MemberId source = null; try { source = selectionStrategyPipeline.bestUpstreamDatabase(); - syncStoreWithCore( source ); - syncedWithCore = true; + syncStoreWithUpstream( source ); + syncedWithUpstream = true; } catch ( UpstreamDatabaseSelectionException e ) { - lastIssue = issueOf( "finding core member", attempt ); + lastIssue = issueOf( "finding upstream member", attempt ); debugLog.warn( lastIssue ); } catch ( StoreCopyFailedException e ) @@ -127,7 +127,9 @@ public void start() throws IOException } } - if ( !syncedWithCore ) + System.out.println("--> Got out of this loop"); + + if ( !syncedWithUpstream ) { userLog.error( lastIssue ); throw new RuntimeException( lastIssue ); @@ -144,18 +146,18 @@ public void start() throws IOException } } - private void syncStoreWithCore( MemberId source ) + private void syncStoreWithUpstream( MemberId source ) throws IOException, StoreIdDownloadFailedException, StoreCopyFailedException, StreamingTransactionsFailedException { if ( localDatabase.isEmpty() ) { - debugLog.info( "Local database is empty, attempting to replace with copy from core server %s", source ); + debugLog.info( "Local database is empty, attempting to replace with copy from upstream server %s", source ); - debugLog.info( "Finding store id of core server %s", source ); + debugLog.info( "Finding store id of upstream server %s", source ); StoreId storeId = remoteStore.getStoreId( source ); - debugLog.info( "Copying store from core server %s", source ); + debugLog.info( "Copying store from upstream server %s", source ); localDatabase.delete(); storeCopyProcess.replaceWithStoreFrom( source, storeId ); @@ -167,10 +169,10 @@ private void syncStoreWithCore( MemberId source ) } } - private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFailedException + private void ensureSameStoreIdAs( MemberId upstream ) throws StoreIdDownloadFailedException { StoreId localStoreId = localDatabase.storeId(); - StoreId remoteStoreId = remoteStore.getStoreId( remoteCore ); + StoreId remoteStoreId = remoteStore.getStoreId( upstream ); if ( !localStoreId.equals( remoteStoreId ) ) { throw new IllegalStateException( format( "This read replica cannot join the cluster. " + diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java index fd3b58968489b..7ee4f3dad74ca 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java @@ -21,13 +21,13 @@ import java.util.Optional; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; public abstract class UpstreamDatabaseSelectionStrategy extends Service { - protected TopologyService topologyService; + protected ReadReplicaTopologyService readReplicaTopologyService; public UpstreamDatabaseSelectionStrategy( String key, String... altKeys ) { @@ -35,9 +35,9 @@ public UpstreamDatabaseSelectionStrategy( String key, String... altKeys ) } // Service loaded can't inject this via the constructor - void setDiscoveryService( TopologyService topologyService ) + void setDiscoveryService( ReadReplicaTopologyService readReplicaTopologyService ) { - this.topologyService = topologyService; + this.readReplicaTopologyService = readReplicaTopologyService; } public abstract Optional upstreamDatabase() throws UpstreamDatabaseSelectionException; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java index f89b9b8b81c14..951698046e4f2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java @@ -23,22 +23,22 @@ import java.util.LinkedHashSet; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.helpers.Service; import org.neo4j.kernel.configuration.Config; /** * Loads and initialises any service implementations of UpstreamDatabaseSelectionStrategy. - * Exposes configured instances of that interface via an interator. + * Exposes configured instances of that interface via an iterator. */ public class UpstreamDatabaseStrategiesLoader implements Iterable { - private final TopologyService topologyService; + private final ReadReplicaTopologyService readReplicaTopologyService; private final Config config; - public UpstreamDatabaseStrategiesLoader( TopologyService topologyService, Config config ) + public UpstreamDatabaseStrategiesLoader( ReadReplicaTopologyService readReplicaTopologyService, Config config ) { - this.topologyService = topologyService; + this.readReplicaTopologyService = readReplicaTopologyService; this.config = config; } @@ -50,14 +50,22 @@ public Iterator iterator() LinkedHashSet candidates = new LinkedHashSet<>(); + System.out.println("Registered strategies --> "); + for ( UpstreamDatabaseSelectionStrategy upstreamDatabaseSelectionStrategy : allImplementationsOnClasspath ) + { + System.out.println(upstreamDatabaseSelectionStrategy.getClass()); + } + for ( String key : config.get( CausalClusteringSettings.upstream_selection_strategy ) ) { + System.out.println( "Looking for key --> " + key ); for ( UpstreamDatabaseSelectionStrategy candidate : allImplementationsOnClasspath ) { if ( candidate.getKeys().iterator().next().equals( key ) ) { - candidate.setDiscoveryService( topologyService ); + candidate.setDiscoveryService( readReplicaTopologyService ); candidates.add( candidate ); + System.out.println( "Added strategy --> " + candidate.getClass() ); } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java index d76f2f0c9a31b..e6889ac226660 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java @@ -20,41 +20,53 @@ package org.neo4j.causalclustering.readreplica; -import java.util.LinkedList; +import java.util.LinkedHashSet; import java.util.NoSuchElementException; import org.neo4j.causalclustering.identity.MemberId; +import static org.neo4j.helpers.collection.Iterables.count; + public class UpstreamDatabaseStrategySelector { - private LinkedList strategies = new LinkedList<>(); + private LinkedHashSet strategies = new LinkedHashSet<>(); + private MemberId myself; UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy ) { - this( defaultStrategy, null ); + this( defaultStrategy, null, null ); } UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy, - Iterable otherStrategies ) + Iterable otherStrategies, MemberId myself ) { - strategies.push( defaultStrategy ); + this.myself = myself; + System.out.println( "No of additional strategies --> " + count( otherStrategies ) ); if ( otherStrategies != null ) { for ( UpstreamDatabaseSelectionStrategy otherStrategy : otherStrategies ) { - strategies.push( otherStrategy ); + strategies.add( otherStrategy ); } } + strategies.add( defaultStrategy ); + + System.out.println( "Loaded strategies --> " ); + for ( UpstreamDatabaseSelectionStrategy strategy : strategies ) + { + System.out.println( strategy.getClass() ); + } } public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException { MemberId result = null; - - for ( UpstreamDatabaseSelectionStrategy strategy : strategies ) + System.out.println("--------------------------------------------------------------------------------"); + for ( UpstreamDatabaseSelectionStrategy strategy : this.strategies ) { try { + System.out.println( "Trying " + myself + " --> " + strategy.getClass() ); result = strategy.upstreamDatabase().get(); break; } @@ -63,6 +75,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException // Do nothing, this strategy failed } } + System.out.println("--------------------------------------------------------------------------------"); if ( result == null ) { diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy index 27053e883f838..d1c1cddf7b9a4 100644 --- a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy @@ -1 +1 @@ -org.neo4j.causalclustering.readreplica.ConnectToRandomUpstreamCoreServer +org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java index 1e0cdac1cd556..758ac08e6f95c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java @@ -71,6 +71,7 @@ import java.util.function.Function; import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.BoltConnector; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; @@ -96,6 +97,8 @@ public class HazelcastClientTest { + private MemberId myself = new MemberId( UUID.randomUUID() ); + private Config config() { Config defaults = Config.defaults(); @@ -118,7 +121,7 @@ public void shouldReturnTopologyUsingHazelcastMembers() throws Exception // given HazelcastConnector connector = mock( HazelcastConnector.class ); HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000 ); + ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -146,7 +149,7 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception // given HazelcastConnector connector = mock( HazelcastConnector.class ); HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000 ); + ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -189,7 +192,7 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Excep when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); HazelcastClient client = new HazelcastClient( connector, logProvider, config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000 ); + ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -211,7 +214,7 @@ public void shouldReturnEmptyTopologyIfInitiallyConnectedToHazelcastButItsNowUna // given HazelcastConnector connector = mock( HazelcastConnector.class ); HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000 ); + ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -233,7 +236,7 @@ public void shouldReconnectIfHazelcastUnavailable() throws Exception // given HazelcastConnector connector = mock( HazelcastConnector.class ); HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000 ); + ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); HazelcastInstance hazelcastInstance1 = mock( HazelcastInstance.class ); HazelcastInstance hazelcastInstance2 = mock( HazelcastInstance.class ); @@ -303,7 +306,7 @@ public void shouldRegisterReadReplicaInTopology() throws Throwable ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000 ); + renewableTimeoutService, 60_000, 5_000, myself ); hazelcastClient.start(); @@ -347,7 +350,7 @@ public void shouldRemoveReadReplicasOnGracefulShutdown() throws Throwable ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000 ); + renewableTimeoutService, 60_000, 5_000, myself ); hazelcastClient.start(); @@ -378,7 +381,7 @@ public void shouldSwallowNPEFromHazelcast() throws Throwable ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000 ); + renewableTimeoutService, 60_000, 5_000, myself ); hazelcastClient.start(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java index 3a01a0458dd26..e5f5af78a6a6e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java @@ -45,8 +45,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributes; -import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractMemberAttributes; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributesForCore; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractMemberAttributesForCore; public class HazelcastClusterTopologyTest { @@ -68,8 +68,8 @@ public void shouldStoreMemberIdentityAndAddressesAsMemberAttributes() throws Exc config.augment( settings ); // when - Map attributes = buildMemberAttributes( memberId, config ).getAttributes(); - Pair extracted = extractMemberAttributes( new MemberImpl( null, null, attributes, + Map attributes = buildMemberAttributesForCore( memberId, config ).getAttributes(); + Pair extracted = extractMemberAttributesForCore( new MemberImpl( null, null, attributes, false ) ); // then @@ -102,7 +102,7 @@ public void shouldCollectMembersAsAMap() throws Exception settings.put( new BoltConnector( "http" ).advertised_address.name(), "http:" + (i + 1) ); config.augment( settings ); - Map attributes = buildMemberAttributes( memberId, config ).getAttributes(); + Map attributes = buildMemberAttributesForCore( memberId, config ).getAttributes(); hazelcastMembers.add( new MemberImpl( new Address( "localhost", i ), null, attributes, false ) ); } @@ -141,7 +141,7 @@ public void shouldLogAndExcludeMembersWithMissingAttributes() throws Exception settings.put( new BoltConnector( "http" ).advertised_address.name(), "http:" + (i + 1) ); config.augment( settings ); - Map attributes = buildMemberAttributes( memberId, config ).getAttributes(); + Map attributes = buildMemberAttributesForCore( memberId, config ).getAttributes(); if ( i == 2 ) { attributes.remove( HazelcastClusterTopology.RAFT_SERVER ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index 22121c20ee33d..4da2e9686ffd3 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -22,20 +22,23 @@ import java.io.File; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.UUID; import java.util.function.IntFunction; import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.GraphDatabaseDependencies; import org.neo4j.kernel.configuration.BoltConnector; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.configuration.HttpConnector; -import org.neo4j.logging.Level; import org.neo4j.kernel.configuration.HttpConnector.Encryption; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.Level; import static java.lang.String.format; import static java.util.stream.Collectors.joining; @@ -43,7 +46,7 @@ public class ReadReplica implements ClusterMember { - private final Map config = stringMap(); + private final Map config = stringMap(); private final DiscoveryServiceFactory discoveryServiceFactory; private final File neo4jHome; private final File storeDir; @@ -53,15 +56,16 @@ public class ReadReplica implements ClusterMember private Monitors monitors; public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory, - List coreMemberHazelcastAddresses, Map extraParams, - Map> instanceExtraParams, String recordFormat, Monitors monitors ) + List coreMemberHazelcastAddresses, Map extraParams, + Map> instanceExtraParams, String recordFormat, Monitors monitors ) { this.memberId = memberId; int boltPort = 9000 + memberId; int httpPort = 11000 + memberId; + int txPort = 20000 + memberId; - String initialHosts = coreMemberHazelcastAddresses.stream() - .map( AdvertisedSocketAddress::toString ).collect( joining( "," ) ); + String initialHosts = coreMemberHazelcastAddresses.stream().map( AdvertisedSocketAddress::toString ) + .collect( joining( "," ) ); config.put( "dbms.mode", "READ_REPLICA" ); config.put( CausalClusteringSettings.initial_discovery_members.name(), initialHosts ); @@ -71,7 +75,7 @@ public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discov config.put( GraphDatabaseSettings.auth_store.name(), new File( parentDir, "auth" ).getAbsolutePath() ); config.putAll( extraParams ); - for ( Map.Entry> entry : instanceExtraParams.entrySet() ) + for ( Map.Entry> entry : instanceExtraParams.entrySet() ) { config.put( entry.getKey(), entry.getValue().apply( memberId ) ); } @@ -88,6 +92,8 @@ public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discov this.neo4jHome = new File( parentDir, "read-replica-" + memberId ); config.put( GraphDatabaseSettings.neo4j_home.name(), neo4jHome.getAbsolutePath() ); + + config.put( CausalClusteringSettings.transaction_listen_address.name(), "127.0.0.1:" + txPort ); config.put( GraphDatabaseSettings.logs_directory.name(), new File( neo4jHome, "logs" ).getAbsolutePath() ); this.discoveryServiceFactory = discoveryServiceFactory; @@ -111,8 +117,9 @@ public String routingURI() public void start() { + System.out.println( config ); database = new ReadReplicaGraphDatabase( storeDir, Config.embeddedDefaults( config ), - GraphDatabaseDependencies.newDependencies().monitors( monitors ), discoveryServiceFactory ); + GraphDatabaseDependencies.newDependencies().monitors( monitors ), discoveryServiceFactory, memberId().get() ); } @Override @@ -157,8 +164,19 @@ public String directURI() return String.format( "bolt://%s", boltAdvertisedAddress ); } + public File homeDir() { return neo4jHome; } + + public void setUpstreamDatabaseSelectionStrategy( String key ) + { + config.put( CausalClusteringSettings.upstream_selection_strategy.name(), key ); + } + + public Optional memberId() + { + return Optional.of( new MemberId( new UUID( memberId, 0 ) ) ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index a41f7c0f09d29..73f89cc1f3e02 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -91,6 +91,12 @@ public ReadReplicaTopology readReplicas() return readReplicaTopology; } + @Override + public ClusterTopology allServers() + { + return new ClusterTopology( coreServers(), readReplicas() ); + } + @Override public CoreTopology coreServers() { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java index 5ee5a9d81e8b7..327f5546f4a10 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java @@ -19,22 +19,27 @@ */ package org.neo4j.causalclustering.discovery; +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -class SharedDiscoveryReadReplicaClient extends LifecycleAdapter implements TopologyService +import static org.neo4j.helpers.SocketAddressFormat.socketAddress; + +class SharedDiscoveryReadReplicaClient extends LifecycleAdapter implements ReadReplicaTopologyService { private final SharedDiscoveryService sharedDiscoveryService; private final ReadReplicaAddresses addresses; private final Log log; SharedDiscoveryReadReplicaClient( SharedDiscoveryService sharedDiscoveryService, Config config, - LogProvider logProvider ) + LogProvider logProvider ) { this.sharedDiscoveryService = sharedDiscoveryService; - this.addresses = new ReadReplicaAddresses( ClientConnectorAddresses.extractFromConfig( config ) ); + this.addresses = new ReadReplicaAddresses( ClientConnectorAddresses.extractFromConfig( config ), + socketAddress( config.get( CausalClusteringSettings.transaction_advertised_address ).toString(), AdvertisedSocketAddress::new ) ); this.log = logProvider.getLog( getClass() ); } @@ -58,4 +63,18 @@ public CoreTopology coreServers() log.info( "Core topology is %s", topology ); return topology; } + + @Override + public ReadReplicaTopology readReplicas() + { + ReadReplicaTopology topology = sharedDiscoveryService.readReplicaTopology(); + log.info( "Read replica topology is %s", topology ); + return topology; + } + + @Override + public ClusterTopology allServers() + { + return new ClusterTopology( coreServers(), readReplicas() ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index bf5b47bf75d4e..c7ddd30d36402 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -21,10 +21,9 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -38,12 +37,11 @@ import org.neo4j.logging.LogProvider; import static java.util.Collections.unmodifiableMap; -import static java.util.Collections.unmodifiableSet; public class SharedDiscoveryService implements DiscoveryServiceFactory { private final Map coreMembers = new HashMap<>(); - private final Set readReplicaAddresses = new HashSet<>(); + private final Map readReplicaAddresses = new HashMap<>(); private final List coreClients = new ArrayList<>(); private final Lock lock = new ReentrantLock(); @@ -54,16 +52,17 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory public CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider ) { - SharedDiscoveryCoreClient sharedDiscoveryCoreClient = new SharedDiscoveryCoreClient( this, myself, logProvider, config ); + SharedDiscoveryCoreClient sharedDiscoveryCoreClient = + new SharedDiscoveryCoreClient( this, myself, logProvider, config ); sharedDiscoveryCoreClient.onCoreTopologyChange( coreTopology( sharedDiscoveryCoreClient ) ); sharedDiscoveryCoreClient.onReadReplicaTopologyChange( readReplicaTopology() ); return sharedDiscoveryCoreClient; } @Override - public TopologyService readReplicaDiscoveryService( Config config, LogProvider logProvider, - DelayedRenewableTimeoutService timeoutService, - long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate ) + public ReadReplicaTopologyService readReplicaTopologyService( Config config, LogProvider logProvider, + DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, + long readReplicaRefreshRate, MemberId myself ) { return new SharedDiscoveryReadReplicaClient( this, config, logProvider ); } @@ -89,11 +88,8 @@ CoreTopology coreTopology( SharedDiscoveryCoreClient client ) lock.lock(); try { - return new CoreTopology( - clusterId, - coreClients.size() > 0 && coreClients.get( 0 ) == client, - unmodifiableMap( coreMembers ) - ); + return new CoreTopology( clusterId, coreClients.size() > 0 && coreClients.get( 0 ) == client, + unmodifiableMap( coreMembers ) ); } finally { @@ -101,12 +97,12 @@ CoreTopology coreTopology( SharedDiscoveryCoreClient client ) } } - private ReadReplicaTopology readReplicaTopology() + public ReadReplicaTopology readReplicaTopology() { lock.lock(); try { - return new ReadReplicaTopology( unmodifiableSet( readReplicaAddresses ) ); + return new ReadReplicaTopology( unmodifiableMap( readReplicaAddresses ) ); } finally { @@ -150,7 +146,7 @@ private void notifyCoreClients() for ( SharedDiscoveryCoreClient coreClient : coreClients ) { coreClient.onCoreTopologyChange( coreTopology( coreClient ) ); - coreClient.onReadReplicaTopologyChange( readReplicaTopology( ) ); + coreClient.onReadReplicaTopologyChange( readReplicaTopology() ); } } @@ -159,7 +155,7 @@ void registerReadReplica( ReadReplicaAddresses readReplicaAddresses ) lock.lock(); try { - this.readReplicaAddresses.add( readReplicaAddresses ); + this.readReplicaAddresses.put( new MemberId( UUID.randomUUID() ), readReplicaAddresses ); notifyCoreClients(); } finally diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java index 28aa216ca7d1a..f58532a083908 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedureTest.java @@ -62,7 +62,7 @@ public void shouldProvideOverviewOfCoreServersAndReadReplicas() throws Exception coreMembers.put( follower1, adressesForCore( 1 ) ); coreMembers.put( follower2, adressesForCore( 2 ) ); - Set readReplicas = addressesForReadReplicas( 4, 5 ); + Map readReplicas = addresses( 4, 5 ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( null, false, coreMembers ) ); when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( readReplicas ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java index 612b412473632..56e2c89a9d40f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1RoutingTest.java @@ -40,6 +40,7 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; +import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.junit.Assert.assertFalse; import static org.junit.runners.Parameterized.Parameters; @@ -81,7 +82,7 @@ public void shouldReturnEndpointsInDifferentOrders() throws Exception final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); when( coreTopologyService.coreServers() ).thenReturn( clusterTopology ); - when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); final GetServersProcedureV1 proc = new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java index f88d10c59df24..fd94838a3411c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1Test.java @@ -32,13 +32,17 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; import org.neo4j.causalclustering.discovery.CoreAddresses; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.CoreTopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaAddresses; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -50,7 +54,8 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; -import static java.util.Collections.emptySet; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; @@ -61,9 +66,9 @@ import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers; import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_routing_ttl; -import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplicas; -import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; +import static org.neo4j.causalclustering.discovery.ClientConnectorAddresses.Scheme.bolt; import static org.neo4j.causalclustering.discovery.TestTopology.addressesForReadReplica; +import static org.neo4j.causalclustering.discovery.TestTopology.adressesForCore; import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asList; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -104,7 +109,7 @@ public void ttlShouldBeInSeconds() throws Exception final CoreTopology clusterTopology = new CoreTopology( clusterId, false, new HashMap<>() ); when( coreTopologyService.coreServers() ).thenReturn( clusterTopology ); - when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); // set the TTL in minutes config = config.augment( stringMap( cluster_routing_ttl.name(), "10m" ) ); @@ -149,7 +154,7 @@ public void shouldProvideReaderAndRouterForSingleCoreSetup() throws Exception final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); when( coreTopologyService.coreServers() ).thenReturn( clusterTopology ); - when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); final GetServersProcedureV1 proc = new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() ); @@ -181,7 +186,7 @@ public void shouldReturnCoreServersWithRouteAllCoresButLeaderAsReadAndSingleWrit final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); when( coreTopologyService.coreServers() ).thenReturn( clusterTopology ); - when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); final GetServersProcedureV1 proc = new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() ); @@ -215,7 +220,7 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers ); when( coreTopologyService.coreServers() ).thenReturn( clusterTopology ); - when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); final GetServersProcedureV1 proc = new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() ); @@ -243,7 +248,7 @@ public void shouldReturnTheCoreLeaderForWriteAndReadReplicasAndCoresForReads() t coreMembers.put( theLeader, adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( addressesForReadReplicas( 1 ) ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( addresses( 1 ) ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( theLeader ); @@ -278,7 +283,7 @@ public void shouldReturnCoreMemberAsReadServerIfNoReadReplicasAvailable() throws coreMembers.put( theLeader, adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( theLeader ); @@ -308,7 +313,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoLeader() throws Exception coreMembers.put( member( 0 ), adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenThrow( new NoLeaderFoundException() ); @@ -337,7 +342,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws coreMembers.put( member( 0 ), adressesForCore( 0 ) ); when( topologyService.coreServers() ).thenReturn( new CoreTopology( clusterId, false, coreMembers ) ); - when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptySet() ) ); + when( topologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) ); LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( member( 1 ) ); @@ -365,6 +370,27 @@ private ClusterView run( GetServersProcedureV1 proc ) throws ProcedureException return ClusterView.parse( (List>) rows[1] ); } + static Map addresses( int... ids ) + { + return Arrays.stream( ids ).mapToObj( GetServersProcedureV1Test::readReplicaAddresses ).collect( Collectors + .toMap( (p) -> new MemberId( UUID.randomUUID() ), Function.identity() ) ); + } + + static CoreAddresses coreAddresses( int id ) + { + AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); + return new CoreAddresses( advertisedSocketAddress, advertisedSocketAddress, + new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ) ); + } + + private static ReadReplicaAddresses readReplicaAddresses( int id ) + { + AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress( "localhost", (3000 + id) ); + return new ReadReplicaAddresses( + new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( bolt, advertisedSocketAddress ) ) ), + new AdvertisedSocketAddress( "localhost", 4000 + id )); + } + private static class ClusterView { private final Map> clusterView; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java index b06713c8320d0..a34ae15527e56 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java @@ -33,7 +33,7 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException; import org.neo4j.causalclustering.discovery.CoreTopology; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; @@ -62,7 +62,7 @@ public class ReadReplicaStartupProcessTest private FileSystemAbstraction fs = mock( FileSystemAbstraction.class ); private final PageCache pageCache = mock( PageCache.class ); private LocalDatabase localDatabase = mock( LocalDatabase.class ); - private TopologyService topologyService = mock( TopologyService.class ); + private ReadReplicaTopologyService readReplicaTopologyService = mock( ReadReplicaTopologyService.class ); private CoreTopology clusterTopology = mock( CoreTopology.class ); private Lifecycle txPulling = mock( Lifecycle.class ); @@ -77,7 +77,7 @@ public void commonMocking() throws StoreIdDownloadFailedException, IOException when( pageCache.streamFilesRecursive( any(File.class) ) ).thenAnswer( ( f ) -> Stream.empty() ); when( localDatabase.storeDir() ).thenReturn( storeDir ); when( localDatabase.storeId() ).thenReturn( localStoreId ); - when( topologyService.coreServers() ).thenReturn( clusterTopology ); + when( readReplicaTopologyService.coreServers() ).thenReturn( clusterTopology ); when( clusterTopology.members() ).thenReturn( asSet( memberId ) ); } @@ -104,7 +104,7 @@ public void shouldReplaceEmptyStoreWithRemote() throws Throwable private UpstreamDatabaseStrategySelector chooseFirstMember() { AlwaysChooseFirstMember firstMember = new AlwaysChooseFirstMember(); - firstMember.setDiscoveryService( topologyService ); + firstMember.setDiscoveryService( readReplicaTopologyService ); return new UpstreamDatabaseStrategySelector( firstMember ); } @@ -189,7 +189,7 @@ public AlwaysChooseFirstMember() @Override public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException { - CoreTopology coreTopology = topologyService.coreServers(); + CoreTopology coreTopology = readReplicaTopologyService.coreServers(); return Optional.ofNullable( coreTopology.members().iterator().next() ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java index 76e64df678a40..05510ea58556b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java @@ -23,7 +23,7 @@ import java.util.Set; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.kernel.configuration.Config; import static org.junit.Assert.assertEquals; @@ -41,7 +41,7 @@ public void shouldReturnConfiguredClassesOnly() throws Exception config.augment( stringMap( "causal_clustering.upstream_selection_strategy", "dummy" ) ); UpstreamDatabaseStrategiesLoader - strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config ); + strategies = new UpstreamDatabaseStrategiesLoader( mock( ReadReplicaTopologyService.class ), config ); // when Set upstreamDatabaseSelectionStrategies = asSet( strategies.iterator() ); @@ -62,7 +62,7 @@ public void shouldReturnTheFirstStrategyThatWorksFromThoseConfigured() throws Ex // when UpstreamDatabaseStrategiesLoader - strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config ); + strategies = new UpstreamDatabaseStrategiesLoader( mock( ReadReplicaTopologyService.class ), config ); // then assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java index 8119447d932cd..794bc8cc61d9b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java @@ -28,7 +28,7 @@ import org.neo4j.causalclustering.discovery.CoreAddresses; import org.neo4j.causalclustering.discovery.CoreTopology; -import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.Service; @@ -59,7 +59,7 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception when( goodOne.upstreamDatabase() ).thenReturn( Optional.of( theMemberId ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ) ); + new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null ); // when MemberId result = selector.bestUpstreamDatabase(); @@ -72,13 +72,13 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception public void shouldDefaultToRandomCoreServerIfNoOtherStrategySpecified() throws Exception { // given - TopologyService topologyService = mock( TopologyService.class ); + ReadReplicaTopologyService readReplicaTopologyService = mock( ReadReplicaTopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); - when( topologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, + when( readReplicaTopologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreAddresses.class ) ) ) ); - ConnectToRandomUpstreamCoreServer defaultStrategy = new ConnectToRandomUpstreamCoreServer(); - defaultStrategy.setDiscoveryService( topologyService ); + ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer(); + defaultStrategy.setDiscoveryService( readReplicaTopologyService ); UpstreamDatabaseStrategySelector selector = new UpstreamDatabaseStrategySelector( defaultStrategy ); @@ -93,18 +93,18 @@ public void shouldDefaultToRandomCoreServerIfNoOtherStrategySpecified() throws E public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception { // given - TopologyService topologyService = mock( TopologyService.class ); + ReadReplicaTopologyService readReplicaTopologyService = mock( ReadReplicaTopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); - when( topologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, + when( readReplicaTopologyService.coreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreAddresses.class ) ) ) ); - ConnectToRandomUpstreamCoreServer shouldNotUse = new ConnectToRandomUpstreamCoreServer(); + ConnectToRandomCoreServer shouldNotUse = new ConnectToRandomCoreServer(); UpstreamDatabaseSelectionStrategy mockStrategy = mock( UpstreamDatabaseSelectionStrategy.class ); when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ) ); + new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null ); // when selector.bestUpstreamDatabase(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java new file mode 100644 index 0000000000000..9123f4d4e85af --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.scenarios; + +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.discovery.Cluster; +import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; +import org.neo4j.causalclustering.discovery.ReadReplica; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy; +import org.neo4j.function.ThrowingSupplier; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Transaction; +import org.neo4j.helpers.Service; +import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.test.causalclustering.ClusterRule; + +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory; +import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME; +import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.test.assertion.Assert.assertEventually; + +public class ReadReplicaToReadReplicaCatchupIT +{ + @Rule + public final ClusterRule clusterRule = + new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 ) + .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) + .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); + + @Test + public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Exception + { + // given + Cluster cluster = clusterRule.startCluster(); + int nodesBeforeReadReplicaStarts = 1; + + cluster.coreTx( ( db, tx ) -> + { + db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); + tx.success(); + } ); + + for ( int i = 0; i < 100; i++ ) + { + cluster.coreTx( ( db, tx ) -> + { + createData( db, nodesBeforeReadReplicaStarts ); + tx.success(); + } ); + } + + AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); + Monitors monitors = new Monitors(); + ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, monitors ); + + File labelScanStore = LabelScanStoreProvider + .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); + + monitors.addMonitorListener( (FileCopyMonitor) file -> + { + if ( file.getParent().contains( labelScanStore.getPath() ) ) + { + labelScanStoreCorrectlyPlaced.set( true ); + } + } ); + + firstReadReplica.start(); + + for ( final ReadReplica server : cluster.readReplicas() ) + { + GraphDatabaseService readReplica = server.database(); + try ( Transaction tx = readReplica.beginTx() ) + { + ThrowingSupplier nodeCount = () -> count( readReplica.getAllNodes() ); + assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES ); + + for ( Node node : readReplica.getAllNodes() ) + { + assertThat( node.getProperty( "foobar" ).toString(), startsWith( "baz_bat" ) ); + } + + tx.success(); + } + } + +// System.out.println( "shutting down cores" ); +// +// for ( CoreClusterMember coreClusterMember : cluster.coreMembers() ) +// { +// coreClusterMember.shutdown(); +// } +// +// System.out.println( "shutdown cores" ); + + // when + upstreamFactory.setCurrent( firstReadReplica ); + ReadReplica secondReadReplica = cluster.addReadReplicaWithId( 202 ); + secondReadReplica.setUpstreamDatabaseSelectionStrategy( "specific" ); + + secondReadReplica.start(); + + System.out.println( "HERE" ); + + // then + + for ( final ReadReplica server : cluster.readReplicas() ) + { + GraphDatabaseService readReplica = server.database(); + try ( Transaction tx = readReplica.beginTx() ) + { + ThrowingSupplier nodeCount = () -> count( readReplica.getAllNodes() ); + assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES ); + + for ( Node node : readReplica.getAllNodes() ) + { + assertThat( node.getProperty( "foobar" ).toString(), startsWith( "baz_bat" ) ); + } + + tx.success(); + } + } + } + + private void createData( GraphDatabaseService db, int amount ) + { + for ( int i = 0; i < amount; i++ ) + { + Node node = db.createNode( Label.label( "Foo" ) ); + node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) ); + node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) ); + } + } + + @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) + public static class SpecificReplicaStrategy extends UpstreamDatabaseSelectionStrategy + { + // This because we need a stable point for config to inject into Service loader loaded classes + public static final UpstreamFactory upstreamFactory = new UpstreamFactory(); + + private ReadReplica upstream; + + public SpecificReplicaStrategy() + { + super( "specific" ); + this.upstream = upstreamFactory.current(); + } + + @Override + public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException + { + Optional memberId = upstream.memberId(); + System.out.println( "best upstream memberID --> " + memberId.get() ); + return memberId; + } + } + + private static class UpstreamFactory + { + private ReadReplica current; + + public void setCurrent( ReadReplica readReplica ) + { + this.current = readReplica; + } + + public ReadReplica current() + { + return current; + } + } +} diff --git a/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy index 030cae4b22b24..f0995b4b4b2ec 100644 --- a/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ b/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy @@ -2,3 +2,4 @@ org.neo4j.causalclustering.readreplica.ReadReplicaStartupProcessTest$AlwaysChoos org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$DummyUpstreamDatabaseSelectionStrategy org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$AnotherDummyUpstreamDatabaseSelectionStrategy org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$YetAnotherDummyUpstreamDatabaseSelectionStrategy +org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT$SpecificReplicaStrategy