diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java index 3e60f0d255b2..74c9e183ded0 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java @@ -26,6 +26,7 @@ import java.time.Clock; import java.time.Duration; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -40,6 +41,7 @@ import org.neo4j.causalclustering.core.SupportedProtocolCreator; import org.neo4j.causalclustering.handlers.PipelineWrapper; import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory; +import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols; @@ -112,7 +114,9 @@ private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config c { CatchUpClient catchUpClient = new CatchUpClient( logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, channelInitializer( config ) ); TxPullClient txPullClient = new TxPullClient( catchUpClient, monitors ); - StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider ); + ExponentialBackoffStrategy backOffStrategy = + new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS ); + StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider, backOffStrategy ); RemoteStore remoteStore = new RemoteStore( logProvider, fileSystemAbstraction, pageCache, storeCopyClient, diff --git a/enterprise/backup/src/test/java/org/neo4j/causalclustering/ClusterSeedingIT.java b/enterprise/backup/src/test/java/org/neo4j/causalclustering/ClusterSeedingIT.java index 1b7338610613..b3c9312c0efc 100644 --- a/enterprise/backup/src/test/java/org/neo4j/causalclustering/ClusterSeedingIT.java +++ b/enterprise/backup/src/test/java/org/neo4j/causalclustering/ClusterSeedingIT.java @@ -39,10 +39,6 @@ import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.causalclustering.discovery.IpFamily; import org.neo4j.causalclustering.discovery.SharedDiscoveryServiceFactory; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.impl.store.format.standard.Standard; import org.neo4j.test.DbRepresentation; import org.neo4j.test.rule.SuppressOutput; @@ -76,14 +72,14 @@ public class ClusterSeedingIT private File baseBackupDir; @Parameterized.Parameters( name = "{0}" ) - public static Object[][] data() throws Exception + public static Object[][] data() { return new Object[][]{{new NoStore(), true}, {new EmptyBackupStore(), false}, {new BackupStoreWithSomeData(), false}, {new BackupStoreWithSomeDataButNoTransactionLogs(), false}}; } @Before - public void setup() throws Exception + public void setup() { this.fileCopyDetector = new FileCopyDetector(); backupCluster = new Cluster( testDir.directory( "cluster-for-backup" ), 3, 0, diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpChannelPool.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpChannelPool.java index f1c708814e82..dc3606e2b906 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpChannelPool.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpChannelPool.java @@ -19,6 +19,7 @@ */ package org.neo4j.causalclustering.catchup; +import java.net.ConnectException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -43,13 +44,23 @@ class CatchUpChannelPool this.factory = factory; } - CHANNEL acquire( AdvertisedSocketAddress catchUpAddress ) + CHANNEL acquire( AdvertisedSocketAddress catchUpAddress ) throws Exception { CHANNEL channel = getIdleChannel( catchUpAddress ); if ( channel == null ) { channel = factory.apply( catchUpAddress ); + try + { + channel.connect(); + assertActive( channel, catchUpAddress ); + } + catch ( Exception e ) + { + channel.close(); + throw e; + } } addActiveChannel( channel ); @@ -57,13 +68,27 @@ CHANNEL acquire( AdvertisedSocketAddress catchUpAddress ) return channel; } + private void assertActive( CHANNEL channel, AdvertisedSocketAddress address ) throws ConnectException + { + if ( !channel.isActive() ) + { + throw new ConnectException( "Unable to connect to " + address ); + } + } + private synchronized CHANNEL getIdleChannel( AdvertisedSocketAddress catchUpAddress ) { CHANNEL channel = null; LinkedList channels = idleChannels.get( catchUpAddress ); if ( channels != null ) { - channel = channels.poll(); + while ( (channel = channels.poll()) != null ) + { + if ( channel.isActive() ) + { + break; + } + } if ( channels.isEmpty() ) { idleChannels.remove( catchUpAddress ); @@ -116,6 +141,10 @@ interface Channel { AdvertisedSocketAddress destination(); + void connect() throws Exception; + + boolean isActive(); + void close(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index 9ebc306ae2ce..9ac672da1183 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -22,15 +22,19 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import java.io.IOException; +import java.net.ConnectException; import java.time.Clock; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.function.BiConsumer; import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -68,26 +72,47 @@ public T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpReque { CompletableFuture future = new CompletableFuture<>(); - CatchUpChannel channel = pool.acquire( upstream ); - - future.whenComplete( ( result, e ) -> + CatchUpChannel channel = null; + try { - if ( e == null ) - { - pool.release( channel ); - } - else + channel = pool.acquire( upstream ); + channel.setResponseHandler( responseHandler, future ); + future.whenComplete( new ReleaseOnComplete( channel ) ); + channel.send( request ); + } + catch ( Exception e ) + { + if ( channel != null ) { pool.dispose( channel ); } - } ); + throw new CatchUpClientException( "Failed to send request", e ); + } + String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream ); + return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log ); + } - channel.setResponseHandler( responseHandler, future ); - channel.send( request ); + private class ReleaseOnComplete implements BiConsumer + { + private CatchUpChannel catchUpChannel; - String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream ); + ReleaseOnComplete( CatchUpChannel catchUpChannel ) + { + this.catchUpChannel = catchUpChannel; + } - return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log ); + @Override + public void accept( Object o, Throwable throwable ) + { + if ( throwable == null ) + { + pool.release( catchUpChannel ); + } + else + { + pool.dispose( catchUpChannel ); + } + } } private class CatchUpChannel implements CatchUpChannelPool.Channel @@ -95,18 +120,16 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel private final TrackingResponseHandler handler; private final AdvertisedSocketAddress destination; private Channel nettyChannel; + private final Bootstrap bootstrap; CatchUpChannel( AdvertisedSocketAddress destination ) { this.destination = destination; handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock ); - Bootstrap bootstrap = new Bootstrap() + bootstrap = new Bootstrap() .group( eventLoopGroup ) .channel( NioSocketChannel.class ) .handler( channelInitializer.apply( handler ) ); - - ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() ); - nettyChannel = channelFuture.awaitUninterruptibly().channel(); } void setResponseHandler( CatchUpResponseCallback responseHandler, CompletableFuture requestOutcomeSignal ) @@ -114,10 +137,14 @@ void setResponseHandler( CatchUpResponseCallback responseHandler, CompletableFut handler.setResponseHandler( responseHandler, requestOutcomeSignal ); } - void send( CatchUpRequest request ) + void send( CatchUpRequest request ) throws ConnectException { + if ( !isActive() ) + { + throw new ConnectException( "Channel is not connected" ); + } nettyChannel.write( request.messageType() ); - nettyChannel.writeAndFlush( request ); + nettyChannel.writeAndFlush( request ).addListener( ChannelFutureListener.CLOSE_ON_FAILURE ); } Optional millisSinceLastResponse() @@ -131,10 +158,26 @@ public AdvertisedSocketAddress destination() return destination; } + @Override + public void connect() throws Exception + { + ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() ); + nettyChannel = channelFuture.sync().channel(); + } + + @Override + public boolean isActive() + { + return nettyChannel.isActive(); + } + @Override public void close() { - nettyChannel.close(); + if ( nettyChannel != null ) + { + nettyChannel.close(); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java index b6a7a2842a90..9be2d15f8afc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java @@ -19,13 +19,13 @@ */ package org.neo4j.causalclustering.catchup; -import java.util.ArrayList; -import java.util.List; - +import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; -import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; +import org.neo4j.function.ThrowingSupplier; import org.neo4j.helpers.AdvertisedSocketAddress; /** @@ -47,6 +47,11 @@ public interface CatchupAddressProvider */ AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException; + static CatchupAddressProvider fromSingleAddress( AdvertisedSocketAddress advertisedSocketAddress ) + { + return new SingleAddressProvider( advertisedSocketAddress ); + } + class SingleAddressProvider implements CatchupAddressProvider { private final AdvertisedSocketAddress socketAddress; @@ -69,15 +74,46 @@ public AdvertisedSocketAddress secondary() } } - class TopologyBasedAddressProvider implements CatchupAddressProvider + /** + * Uses given strategy for both primary and secondary address. + */ + class UpstreamStrategyBoundAddressProvider implements CatchupAddressProvider { - private final RaftMachine raftMachine; + private final UpstreamStrategyAddressSupplier upstreamStrategyAddressSupplier; + + public UpstreamStrategyBoundAddressProvider( TopologyService topologyService, UpstreamDatabaseStrategySelector strategySelector ) + { + upstreamStrategyAddressSupplier = new UpstreamStrategyAddressSupplier( strategySelector, topologyService ); + } + + @Override + public AdvertisedSocketAddress primary() throws CatchupAddressResolutionException + { + return upstreamStrategyAddressSupplier.get(); + } + + @Override + public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException + { + return upstreamStrategyAddressSupplier.get(); + } + } + + /** + * Uses leader address as primary and given upstream strategy as secondary address. + */ + class PrioritisingUpstreamStrategyBasedAddressProvider implements CatchupAddressProvider + { + private final LeaderLocator leaderLocator; private final TopologyService topologyService; + private UpstreamStrategyAddressSupplier secondaryUpstreamStrategyAddressSupplier; - public TopologyBasedAddressProvider( RaftMachine raftMachine, TopologyService topologyService ) + public PrioritisingUpstreamStrategyBasedAddressProvider( LeaderLocator leaderLocator, TopologyService topologyService, + UpstreamDatabaseStrategySelector strategySelector ) { - this.raftMachine = raftMachine; + this.leaderLocator = leaderLocator; this.topologyService = topologyService; + this.secondaryUpstreamStrategyAddressSupplier = new UpstreamStrategyAddressSupplier( strategySelector, topologyService ); } @Override @@ -85,7 +121,7 @@ public AdvertisedSocketAddress primary() throws CatchupAddressResolutionExceptio { try { - MemberId leadMember = raftMachine.getLeader(); + MemberId leadMember = leaderLocator.getLeader(); return topologyService.findCatchupAddress( leadMember ).orElseThrow( () -> new CatchupAddressResolutionException( leadMember ) ); } catch ( NoLeaderFoundException e ) @@ -97,18 +133,33 @@ public AdvertisedSocketAddress primary() throws CatchupAddressResolutionExceptio @Override public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException { - List potentialCoresAndReplicas = new ArrayList<>( raftMachine.votingMembers() ); - potentialCoresAndReplicas.addAll( raftMachine.replicationMembers() ); - int accountForRoundingDown = 1; - MemberId randomlySelectedCatchupServer = - potentialCoresAndReplicas.get( (int) (Math.random() * potentialCoresAndReplicas.size() + accountForRoundingDown) ); - return topologyService.findCatchupAddress( randomlySelectedCatchupServer ).orElseThrow( - () -> new CatchupAddressResolutionException( randomlySelectedCatchupServer ) ); + return secondaryUpstreamStrategyAddressSupplier.get(); } } - static CatchupAddressProvider fromSingleAddress( AdvertisedSocketAddress advertisedSocketAddress ) + class UpstreamStrategyAddressSupplier implements ThrowingSupplier { - return new SingleAddressProvider( advertisedSocketAddress ); + private final UpstreamDatabaseStrategySelector strategySelector; + private final TopologyService topologyService; + + private UpstreamStrategyAddressSupplier( UpstreamDatabaseStrategySelector strategySelector, TopologyService topologyService ) + { + this.strategySelector = strategySelector; + this.topologyService = topologyService; + } + + @Override + public AdvertisedSocketAddress get() throws CatchupAddressResolutionException + { + try + { + MemberId upstreamMember = strategySelector.bestUpstreamDatabase(); + return topologyService.findCatchupAddress( upstreamMember ).orElseThrow( () -> new CatchupAddressResolutionException( upstreamMember ) ); + } + catch ( UpstreamDatabaseSelectionException e ) + { + throw new CatchupAddressResolutionException( e ); + } + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressResolutionException.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressResolutionException.java index 150fecda3f3d..ded0d6b68e61 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressResolutionException.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressResolutionException.java @@ -24,12 +24,12 @@ public class CatchupAddressResolutionException extends TopologyLookupException { - public CatchupAddressResolutionException( MemberId memberId ) + CatchupAddressResolutionException( MemberId memberId ) { super( memberId ); } - public CatchupAddressResolutionException( Exception e ) + CatchupAddressResolutionException( Exception e ) { super( e ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java index 66c96c861a6c..b74840854332 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java @@ -42,13 +42,13 @@ public class GetIndexFilesRequest implements CatchUpRequest { private final StoreId expectedStoreId; private final long indexId; - private final long lastTransactionId; + private final long requiredTransactionId; - public GetIndexFilesRequest( StoreId expectedStoreId, long indexId, long lastTransactionId ) + public GetIndexFilesRequest( StoreId expectedStoreId, long indexId, long requiredTransactionId ) { this.expectedStoreId = expectedStoreId; this.indexId = indexId; - this.lastTransactionId = lastTransactionId; + this.requiredTransactionId = requiredTransactionId; } public StoreId expectedStoreId() @@ -56,9 +56,9 @@ public StoreId expectedStoreId() return expectedStoreId; } - public long requiredTransactionId() + long requiredTransactionId() { - return lastTransactionId; + return requiredTransactionId; } public long indexId() @@ -110,4 +110,11 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) out.add( getIndexFilesRequest ); } } + + @Override + public String toString() + { + return "GetIndexFilesRequest{" + "expectedStoreId=" + expectedStoreId + ", indexId=" + indexId + ", requiredTransactionId=" + requiredTransactionId + + '}'; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java index 78dbf8731b08..584769fdd295 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java @@ -117,4 +117,11 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) out.add( getStoreFileRequest ); } } + + @Override + public String toString() + { + return "GetStoreFileRequest{" + "expectedStoreId=" + expectedStoreId + ", file=" + file.getName() + ", requiredTransactionId=" + requiredTransactionId + + '}'; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetries.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTime.java similarity index 50% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetries.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTime.java index cc57d2ce4175..e48777939a52 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetries.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTime.java @@ -20,45 +20,38 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.time.Clock; +import java.util.concurrent.TimeUnit; -public class MaximumTotalRetries implements TerminationCondition +import org.neo4j.time.Clocks; + +import static java.lang.String.format; + +public class MaximumTotalTime implements TerminationCondition { - private final int maxRetries; - private final long allowedInBetweenTimeMillis; + private final long endTime; private final Clock clock; - private int tries; - private long previousCheck; + private long time; + private TimeUnit timeUnit; - MaximumTotalRetries( int maxRetries, long allowedInBetweenTimeMillis ) + public MaximumTotalTime( long time, TimeUnit timeUnit ) { - this( maxRetries, allowedInBetweenTimeMillis, Clock.systemUTC() ); + this( time, timeUnit, Clocks.systemClock() ); } - MaximumTotalRetries( int maxRetries, long allowedInBetweenTimeMillis, Clock clock ) + MaximumTotalTime( long time, TimeUnit timeUnit, Clock clock ) { + this.endTime = clock.millis() + timeUnit.toMillis( time ); this.clock = clock; - this.maxRetries = maxRetries; - this.allowedInBetweenTimeMillis = allowedInBetweenTimeMillis; - this.previousCheck = 0; + this.time = time; + this.timeUnit = timeUnit; } @Override public void assertContinue() throws StoreCopyFailedException { - long currentTime = clock.millis(); - if ( timeHasExpired( previousCheck, currentTime ) ) - { - tries++; - previousCheck = currentTime; - } - if ( tries >= maxRetries ) + if ( clock.millis() > endTime ) { - throw new StoreCopyFailedException( "Maximum allowed retries exceeded: " + maxRetries ); + throw new StoreCopyFailedException( format( "Maximum time passed %d %s. Not allowed to continue", time, timeUnit ) ); } } - - private boolean timeHasExpired( long previousCheck, long currentTime ) - { - return (currentTime - previousCheck) > allowedInBetweenTimeMillis; - } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java index c91a46f1856e..dbece1331b14 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java @@ -21,16 +21,17 @@ import java.io.File; import java.io.IOException; -import java.util.function.Supplier; +import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.catchup.CatchUpClientException; +import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.CatchupAddressResolutionException; import org.neo4j.causalclustering.catchup.CatchupResult; -import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.TxPullRequestResult; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.causalclustering.catchup.tx.TxPullClient; +import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; @@ -50,7 +51,6 @@ */ public class RemoteStore { - private static final Supplier DEFAULT_TERMINATION_CONDITIONS = () -> new MaximumTotalRetries( 10, 10 ); private final Log log; private final Config config; private final Monitors monitors; @@ -125,7 +125,9 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI { long lastFlushedTxId; StreamToDiskProvider streamToDiskProvider = new StreamToDiskProvider( destDir, fs, pageCache, monitors ); - lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, streamToDiskProvider, DEFAULT_TERMINATION_CONDITIONS ); + lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, streamToDiskProvider, + () -> new MaximumTotalTime( config.get( CausalClusteringSettings.store_copy_max_retry_time_per_request ).getSeconds(), + TimeUnit.SECONDS ) ); log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java index e83cf93bcb9e..6c1a4e8b6216 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java @@ -28,23 +28,29 @@ import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.CatchupAddressResolutionException; +import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.lang.String.format; +import static org.neo4j.causalclustering.catchup.storecopy.StoreCopyResponseAdaptors.filesCopyAdaptor; +import static org.neo4j.causalclustering.catchup.storecopy.StoreCopyResponseAdaptors.prepareStoreCopyAdaptor; public class StoreCopyClient { private final CatchUpClient catchUpClient; private final Log log; + private TimeoutStrategy backOffStrategy; - public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider ) + public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider, TimeoutStrategy backOffStrategy ) { this.catchUpClient = catchUpClient; log = logProvider.getLog( getClass() ); + this.backOffStrategy = backOffStrategy; } long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expectedStoreId, StoreFileStreamProvider storeFileStreamProvider, @@ -72,66 +78,67 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); for ( File file : prepareStoreCopyResponse.getFiles() ) { - TerminationCondition terminationCondition = terminationConditions.get(); - boolean successful; - do - { - try - { - AdvertisedSocketAddress from = addressProvider.primary(); - log.info( format( "Downloading file '%s' from '%s'", file, from ) ); - StoreCopyFinishedResponse response = - catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), - StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) ); - successful = successfulFileDownload( response ); - } - catch ( CatchUpClientException | CatchupAddressResolutionException e ) - { - successful = false; - } - if ( !successful ) - { - log.error( "Failed to download file '%s'", file ); - terminationCondition.assertContinue(); - } - } - while ( !successful ); + persistentCall( new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), filesCopyAdaptor( storeFileStream, log ), addressProvider, + terminationConditions.get() ); } } private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, - CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier terminationConditions ) - throws StoreCopyFailedException + CatchupAddressProvider addressProvider, + StoreFileStreamProvider storeFileStream, Supplier terminationConditions ) throws StoreCopyFailedException { long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator(); while ( indexIds.hasNext() ) { long indexId = indexIds.next(); - TerminationCondition terminationCondition = terminationConditions.get(); - boolean successful; - do + persistentCall( new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ), filesCopyAdaptor( storeFileStream, log ), addressProvider, + terminationConditions.get() ); + } + } + + private void persistentCall( CatchUpRequest request, CatchUpResponseAdaptor copyHandler, CatchupAddressProvider addressProvider, + TerminationCondition terminationCondition ) throws StoreCopyFailedException + { + TimeoutStrategy.Timeout timeout = backOffStrategy.newTimeout(); + boolean successful; + do + { + try { - try - { - AdvertisedSocketAddress from = addressProvider.primary(); - log.info( format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) ); - StoreCopyFinishedResponse response = - catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ), - StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) ); - successful = successfulFileDownload( response ); - } - catch ( CatchUpClientException | CatchupAddressResolutionException e ) - { - successful = false; - } - if ( !successful ) - { - log.error( "Failed to download files from index '%s'", indexId ); - terminationCondition.assertContinue(); - } + AdvertisedSocketAddress from = addressProvider.secondary(); + log.info( format( "Sending request '%s' to '%s'", request, from ) ); + StoreCopyFinishedResponse response = catchUpClient.makeBlockingRequest( from, request, copyHandler ); + successful = successfulFileDownload( response ); + } + catch ( CatchUpClientException | CatchupAddressResolutionException e ) + { + successful = false; + } + if ( !successful ) + { + log.error( format( "Request failed '%s'", request ) ); + terminationCondition.assertContinue(); + } + else + { + log.info( format( "Request was successful '%s'", request ) ); } - while ( !successful ); + awaitAndIncrementTimeout( timeout ); + } + while ( !successful ); + } + + private void awaitAndIncrementTimeout( TimeoutStrategy.Timeout timeout ) throws StoreCopyFailedException + { + try + { + Thread.sleep( timeout.getMillis() ); + timeout.increment(); + } + catch ( InterruptedException e ) + { + throw new StoreCopyFailedException( "Thread interrupted" ); } } @@ -139,8 +146,8 @@ private PrepareStoreCopyResponse prepareStoreCopy( AdvertisedSocketAddress from, throws CatchUpClientException, StoreCopyFailedException { log.info( "Requesting store listing from: " + from ); - PrepareStoreCopyResponse prepareStoreCopyResponse = catchUpClient.makeBlockingRequest( from, new PrepareStoreCopyRequest( expectedStoreId ), - StoreCopyResponseAdaptors.prepareStoreCopyAdaptor( storeFileStream, log ) ); + PrepareStoreCopyResponse prepareStoreCopyResponse = + catchUpClient.makeBlockingRequest( from, new PrepareStoreCopyRequest( expectedStoreId ), prepareStoreCopyAdaptor( storeFileStream, log ) ); if ( prepareStoreCopyResponse.status() != PrepareStoreCopyResponse.Status.SUCCESS ) { throw new StoreCopyFailedException( "Preparing store failed due to: " + prepareStoreCopyResponse.status() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index 2feb9762e99b..45bc6182c22a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -39,8 +39,8 @@ import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException; -import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.internal.DatabaseHealth; @@ -303,22 +303,11 @@ public void onTxStreamFinishedResponse( CompletableFuture new TopologyLookupException( upstream ) ); - storeCopyProcess.replaceWithStoreFrom( CatchupAddressProvider.fromSingleAddress( fromAddress ), localStoreId ); + CatchupAddressProvider.UpstreamStrategyBoundAddressProvider upstreamStrategyBoundAddressProvider = + new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline ); + storeCopyProcess.replaceWithStoreFrom( upstreamStrategyBoundAddressProvider, localStoreId ); } - catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException | TopologyLookupException e ) + catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e ) { - log.warn( format( "Error copying store from: %s. Will retry shortly.", upstream ) ); + log.warn( "Error copying store. Will retry shortly.", e ); return; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 34e8849971e2..dfe9e85ccc8b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -293,6 +293,15 @@ public enum DiscoveryType public static final Setting catch_up_client_inactivity_timeout = setting( "causal_clustering.catch_up_client_inactivity_timeout", DURATION, "20s" ); + @Description( "Maximum retry time per request during store copy. Regular store files and indexes are downloaded in separate requests during store copy." + + " This configures the maximum time failed requests are allowed to resend. " ) + public static final Setting store_copy_max_retry_time_per_request = + setting( "causal_clustering.store_copy_max_retry_time_per_request", DURATION, "20m" ); + + @Description( "Maximum backoff timeout for store copy requests" ) + @Internal + public static final Setting store_copy_backoff_max_wait = setting( "causal_clustering.store_copy_backoff_max_wait", DURATION, "5s" ); + @Description( "Throttle limit for logging unknown cluster member address" ) public static final Setting unknown_address_logging_throttle = setting( "causal_clustering.unknown_address_logging_throttle", DURATION, "10000ms" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 0714084779f2..0a270b336e31 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -31,6 +31,7 @@ import java.util.stream.Stream; import org.neo4j.causalclustering.ReplicationModule; +import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.StoreFiles; import org.neo4j.causalclustering.core.consensus.ConsensusModule; @@ -47,6 +48,7 @@ import org.neo4j.causalclustering.core.state.machines.id.FreeIdFilteredIdGeneratorFactory; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure; import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure; import org.neo4j.causalclustering.discovery.procedures.InstalledProtocolsProcedure; @@ -78,6 +80,11 @@ import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository; import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; +import org.neo4j.causalclustering.upstream.NoOpUpstreamDatabaseStrategiesLoader; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategiesLoader; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; +import org.neo4j.causalclustering.upstream.strategies.TypicallyConnectToRandomReadReplicaStrategy; import org.neo4j.causalclustering.routing.multi_cluster.procedure.GetSubClusterRoutersProcedure; import org.neo4j.causalclustering.routing.multi_cluster.procedure.GetSuperClusterRoutersProcedure; import org.neo4j.com.storecopy.StoreUtil; @@ -305,9 +312,17 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), clientPipelineBuilderFactory, serverPipelineBuilderFactory, serverInstalledProtocolHandler ); + TypicallyConnectToRandomReadReplicaStrategy defaultStrategy = new TypicallyConnectToRandomReadReplicaStrategy( 2 ); + defaultStrategy.inject( topologyService, config, logProvider, identityModule.myself() ); + UpstreamDatabaseStrategySelector catchupStrategySelector = + createUpstreamDatabaseStrategySelector( identityModule.myself(), config, logProvider, topologyService, defaultStrategy ); + + CatchupAddressProvider.PrioritisingUpstreamStrategyBasedAddressProvider catchupAddressProvider = + new CatchupAddressProvider.PrioritisingUpstreamStrategyBasedAddressProvider( consensusModule.raftMachine(), topologyService, + catchupStrategySelector ); RaftServerModule.createAndStart( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory, - messageLogger, topologyService, supportedRaftProtocols, supportedModifierProtocols, serverInstalledProtocolHandler ); - this.serverInstalledProtocols = serverInstalledProtocolHandler::installedProtocols; + messageLogger, catchupAddressProvider, supportedRaftProtocols, supportedModifierProtocols, serverInstalledProtocolHandler ); + serverInstalledProtocols = serverInstalledProtocolHandler::installedProtocols; editionInvariants( platformModule, dependencies, config, logging, life ); @@ -316,6 +331,23 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, life.add( coreServerModule.membershipWaiterLifecycle ); } + private UpstreamDatabaseStrategySelector createUpstreamDatabaseStrategySelector( MemberId myself, Config config, LogProvider logProvider, + TopologyService topologyService, UpstreamDatabaseSelectionStrategy defaultStrategy ) + { + UpstreamDatabaseStrategiesLoader loader; + if ( config.get( CausalClusteringSettings.multi_dc_license ) ) + { + loader = new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ); + logProvider.getLog( getClass() ).info( "Multi-Data Center option enabled." ); + } + else + { + loader = new NoOpUpstreamDatabaseStrategiesLoader(); + } + + return new UpstreamDatabaseStrategySelector( defaultStrategy, loader, logProvider ); + } + private LogFiles buildLocalDatabaseLogFiles( PlatformModule platformModule, FileSystemAbstraction fileSystem, File storeDir ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java index d56720ff0838..402463ab7041 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.function.Function; +import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.ConsensusModule; import org.neo4j.causalclustering.core.consensus.ContinuousJob; @@ -34,8 +35,6 @@ import org.neo4j.causalclustering.core.consensus.RaftProtocolServerInstaller; import org.neo4j.causalclustering.core.server.CoreServerModule; import org.neo4j.causalclustering.core.state.RaftMessageApplier; -import org.neo4j.causalclustering.discovery.CoreTopologyService; -import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.logging.MessageLogger; import org.neo4j.causalclustering.messaging.ComposableMessageHandler; @@ -69,12 +68,13 @@ class RaftServerModule private final MessageLogger messageLogger; private final LogProvider logProvider; private final NettyPipelineBuilderFactory pipelineBuilderFactory; - private final TopologyService topologyService; + private CatchupAddressProvider.PrioritisingUpstreamStrategyBasedAddressProvider catchupAddressProvider; private final Collection supportedModifierProtocols; private RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger messageLogger, - CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol, + CatchupAddressProvider.PrioritisingUpstreamStrategyBasedAddressProvider catchupAddressProvider, + ApplicationSupportedProtocols supportedApplicationProtocol, Collection supportedModifierProtocols, ChannelInboundHandler installedProtocolsHandler ) { this.platformModule = platformModule; @@ -85,7 +85,7 @@ private RaftServerModule( PlatformModule platformModule, ConsensusModule consens this.messageLogger = messageLogger; this.logProvider = platformModule.logging.getInternalLogProvider(); this.pipelineBuilderFactory = pipelineBuilderFactory; - this.topologyService = topologyService; + this.catchupAddressProvider = catchupAddressProvider; this.supportedModifierProtocols = supportedModifierProtocols; LifecycleMessageHandler> messageHandlerChain = createMessageHandlerChain( coreServerModule ); @@ -95,11 +95,12 @@ private RaftServerModule( PlatformModule platformModule, ConsensusModule consens static void createAndStart( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, - MessageLogger messageLogger, CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol, + MessageLogger messageLogger, CatchupAddressProvider.PrioritisingUpstreamStrategyBasedAddressProvider addressProvider, + ApplicationSupportedProtocols supportedApplicationProtocol, Collection supportedModifierProtocols, ChannelInboundHandler installedProtocolsHandler ) { new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineBuilderFactory, messageLogger, - topologyService, supportedApplicationProtocol, supportedModifierProtocols, installedProtocolsHandler ); + addressProvider, supportedApplicationProtocol, supportedModifierProtocols, installedProtocolsHandler ); } private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessageHandler> messageHandlerChain, @@ -138,7 +139,7 @@ private LifecycleMessageHandler> createM { RaftMessageApplier messageApplier = new RaftMessageApplier( localDatabase, logProvider, consensusModule.raftMachine(), coreServerModule.downloadService(), - coreServerModule.commandApplicationProcess(), topologyService ); + coreServerModule.commandApplicationProcess(), catchupAddressProvider ); ComposableMessageHandler monitoringHandler = RaftMessageMonitoringHandler.composable( platformModule.clock, platformModule.monitors ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 870cd12f93fd..8620ca2d6295 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -43,10 +44,10 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TxPullClient; -import org.neo4j.causalclustering.core.SupportedProtocolCreator; -import org.neo4j.causalclustering.core.TransactionBackupServiceProvider; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.IdentityModule; +import org.neo4j.causalclustering.core.SupportedProtocolCreator; +import org.neo4j.causalclustering.core.TransactionBackupServiceProvider; import org.neo4j.causalclustering.core.consensus.ConsensusModule; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler; @@ -243,8 +244,11 @@ private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPip private CoreStateDownloader createCoreStateDownloader( LifeSupport servicesToStopOnStoreCopy, CatchUpClient catchUpClient ) { + ExponentialBackoffStrategy storeCopyBackoffStrategy = + new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS ); + RemoteStore remoteStore = new RemoteStore( logProvider, platformModule.fileSystem, platformModule.pageCache, - new StoreCopyClient( catchUpClient, logProvider ), + new StoreCopyClient( catchUpClient, logProvider, storeCopyBackoffStrategy ), new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), config, platformModule.monitors ); CopiedStoreRecovery copiedStoreRecovery = platformModule.life.add( diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java index 2aec823ea4b5..69ea83a55a27 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java @@ -25,7 +25,6 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome; import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService; -import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.logging.Log; @@ -38,17 +37,17 @@ public class RaftMessageApplier implements LifecycleMessageHandler anyCoreMemberId() + public Optional randomCoreMemberId() { - return coreMembers.keySet().stream().findAny(); + if ( coreMembers.isEmpty() ) + { + return Optional.empty(); + } + return coreMembers.keySet().stream().skip( ThreadLocalRandom.current().nextInt( coreMembers.size() ) ).findFirst(); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index c63c9d888285..2304227674e2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; import org.neo4j.causalclustering.identity.MemberId; @@ -55,9 +56,13 @@ public String toString() return String.format( "{readReplicas=%s}", readReplicaMembers ); } - public Optional anyReadReplicaMemberId() + public Optional randomReadReplicaMemberId() { - return readReplicaMembers.keySet().stream().findAny(); + if ( readReplicaMembers.isEmpty() ) + { + return Optional.empty(); + } + return readReplicaMembers.keySet().stream().skip( ThreadLocalRandom.current().nextInt( readReplicaMembers.size() ) ).findFirst(); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 6b1e574ede44..0861f4dab5d6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -27,9 +27,7 @@ import java.time.Duration; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -83,6 +81,10 @@ import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer; import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository; import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols; +import org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategy; +import org.neo4j.causalclustering.upstream.NoOpUpstreamDatabaseStrategiesLoader; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategiesLoader; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.com.storecopy.StoreUtil; import org.neo4j.function.Predicates; import org.neo4j.graphdb.DependencyResolver; @@ -137,7 +139,6 @@ import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifecycleStatus; import org.neo4j.logging.LogProvider; -import org.neo4j.logging.NullLogProvider; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.time.Clocks; import org.neo4j.udc.UsageData; @@ -276,8 +277,11 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, databaseHealthSupplier, watcherService, platformModule.availabilityGuard, logProvider ); + ExponentialBackoffStrategy storeCopyBackoffStrategy = + new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS ); + RemoteStore remoteStore = new RemoteStore( platformModule.logging.getInternalLogProvider(), fileSystem, platformModule.pageCache, - new StoreCopyClient( catchUpClient, logProvider ), + new StoreCopyClient( catchUpClient, logProvider, storeCopyBackoffStrategy ), new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), config, platformModule.monitors ); @@ -293,19 +297,8 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy(); defaultStrategy.inject( topologyService, config, logProvider, myself ); - UpstreamDatabaseStrategiesLoader loader; - if ( config.get( CausalClusteringSettings.multi_dc_license ) ) - { - loader = new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ); - logProvider.getLog( getClass() ).info( "Multi-Data Center option enabled." ); - } - else - { - loader = new NoOpUpstreamDatabaseStrategiesLoader(); - } - UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector = - new UpstreamDatabaseStrategySelector( defaultStrategy, loader, myself, logProvider ); + createUpstreamDatabaseStrategySelector( myself, config, logProvider, topologyService, defaultStrategy ); CatchupPollingProcess catchupProcess = new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient, upstreamDatabaseStrategySelector, @@ -353,6 +346,23 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, backupCatchupServer.ifPresent( life::add ); } + private UpstreamDatabaseStrategySelector createUpstreamDatabaseStrategySelector( MemberId myself, Config config, LogProvider logProvider, + TopologyService topologyService, ConnectToRandomCoreServerStrategy defaultStrategy ) + { + UpstreamDatabaseStrategiesLoader loader; + if ( config.get( CausalClusteringSettings.multi_dc_license ) ) + { + loader = new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ); + logProvider.getLog( getClass() ).info( "Multi-Data Center option enabled." ); + } + else + { + loader = new NoOpUpstreamDatabaseStrategiesLoader(); + } + + return new UpstreamDatabaseStrategySelector( defaultStrategy, loader, logProvider ); + } + protected void configureDiscoveryService( DiscoveryServiceFactory discoveryServiceFactory, Dependencies dependencies, Config config, LogProvider logProvider ) { @@ -407,33 +417,6 @@ public void setupSecurityModule( PlatformModule platformModule, Procedures proce EnterpriseEditionModule.setupEnterpriseSecurityModule( platformModule, procedures ); } - private class NoOpUpstreamDatabaseStrategiesLoader extends UpstreamDatabaseStrategiesLoader - { - NoOpUpstreamDatabaseStrategiesLoader() - { - super( null, null, null, NullLogProvider.getInstance() ); - } - - @Override - public Iterator iterator() - { - return new Iterator() - { - @Override - public boolean hasNext() - { - return false; - } - - @Override - public UpstreamDatabaseSelectionStrategy next() - { - throw new NoSuchElementException(); - } - }; - } - } - private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogProvider logProvider ) { long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java index dfe5aaf5df22..7b1730768ff1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java @@ -33,6 +33,8 @@ import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; @@ -169,7 +171,9 @@ private void syncStoreWithUpstream( MemberId source ) debugLog.info( "Copying store from upstream server %s", source ); localDatabase.delete(); - storeCopyProcess.replaceWithStoreFrom( CatchupAddressProvider.fromSingleAddress( fromAddress ), storeId ); + CatchupAddressProvider.UpstreamStrategyBoundAddressProvider addressProvider = + new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline ); + storeCopyProcess.replaceWithStoreFrom( addressProvider, storeId ); debugLog.info( "Restarting local database after copy.", source ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/NoOpUpstreamDatabaseStrategiesLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/NoOpUpstreamDatabaseStrategiesLoader.java new file mode 100644 index 000000000000..d057fa30b832 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/NoOpUpstreamDatabaseStrategiesLoader.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.upstream; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.neo4j.logging.NullLogProvider; + +public class NoOpUpstreamDatabaseStrategiesLoader extends UpstreamDatabaseStrategiesLoader +{ + public NoOpUpstreamDatabaseStrategiesLoader() + { + super( null, null, null, NullLogProvider.getInstance() ); + } + + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return false; + } + + @Override + public UpstreamDatabaseSelectionStrategy next() + { + throw new NoSuchElementException(); + } + }; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionException.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionException.java similarity index 89% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionException.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionException.java index 235d9625559a..3b6c13b365d8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionException.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionException.java @@ -17,11 +17,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream; public class UpstreamDatabaseSelectionException extends Exception { - UpstreamDatabaseSelectionException( String message ) + public UpstreamDatabaseSelectionException( String message ) { super( message ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionStrategy.java similarity index 91% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionStrategy.java index e641c03f6e1d..5c55a3a86361 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseSelectionStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseSelectionStrategy.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream; import java.util.Optional; import java.util.stream.Collectors; @@ -46,7 +46,7 @@ public UpstreamDatabaseSelectionStrategy( String key, String... altKeys ) } // Service loader can't inject via the constructor - void inject( TopologyService topologyService, Config config, LogProvider logProvider, MemberId myself ) + public void inject( TopologyService topologyService, Config config, LogProvider logProvider, MemberId myself ) { this.topologyService = topologyService; this.config = config; @@ -59,7 +59,9 @@ void inject( TopologyService topologyService, Config config, LogProvider logProv init(); } - void init() {} + public void init() + { + } public abstract Optional upstreamDatabase() throws UpstreamDatabaseSelectionException; @@ -71,7 +73,6 @@ public String toString() private static String nicelyCommaSeparatedList( Iterable keys ) { - return StreamSupport.stream( keys.spliterator(), false ) - .collect( Collectors.joining( ", " ) ); + return StreamSupport.stream( keys.spliterator(), false ).collect( Collectors.joining( ", " ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoader.java similarity index 85% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoader.java index 9c828b1524f5..a3ce7bf2a19a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoader.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream; import java.util.Collection; import java.util.Iterator; @@ -44,7 +44,7 @@ public class UpstreamDatabaseStrategiesLoader implements Iterable iterator() { - Iterable allImplementationsOnClasspath = - Service.load( UpstreamDatabaseSelectionStrategy.class ); + Iterable allImplementationsOnClasspath = Service.load( UpstreamDatabaseSelectionStrategy.class ); LinkedHashSet candidates = new LinkedHashSet<>(); - for ( String key : config.get( CausalClusteringSettings.upstream_selection_strategy ) ) { for ( UpstreamDatabaseSelectionStrategy candidate : allImplementationsOnClasspath ) @@ -80,14 +78,11 @@ public Iterator iterator() private void log( LinkedHashSet candidates ) { - log.debug( "Upstream database strategies loaded in order of precedence: " + - nicelyCommaSeparatedList( candidates ) ); + log.debug( "Upstream database strategies loaded in order of precedence: " + nicelyCommaSeparatedList( candidates ) ); } private static String nicelyCommaSeparatedList( Collection items ) { - return items.stream() - .map( UpstreamDatabaseSelectionStrategy::toString ) - .collect( Collectors.joining( ", " ) ); + return items.stream().map( UpstreamDatabaseSelectionStrategy::toString ).collect( Collectors.joining( ", " ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelector.java similarity index 77% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelector.java index b0b44d4c39d0..06c305d126b5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelector.java @@ -17,8 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; - +package org.neo4j.causalclustering.upstream; import java.util.LinkedHashSet; import java.util.NoSuchElementException; @@ -33,19 +32,16 @@ public class UpstreamDatabaseStrategySelector { private LinkedHashSet strategies = new LinkedHashSet<>(); - private MemberId myself; private Log log; - UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy ) + public UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy ) { - this( defaultStrategy, empty(), null, NullLogProvider.getInstance() ); + this( defaultStrategy, empty(), NullLogProvider.getInstance() ); } - UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy, - Iterable otherStrategies, MemberId myself, - LogProvider logProvider ) + public UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy, Iterable otherStrategies, + LogProvider logProvider ) { - this.myself = myself; this.log = logProvider.getLog( getClass() ); if ( otherStrategies != null ) @@ -80,8 +76,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException if ( result == null ) { - throw new UpstreamDatabaseSelectionException( - "Could not find an upstream database with which to connect." ); + throw new UpstreamDatabaseSelectionException( "Could not find an upstream database with which to connect." ); } log.debug( "Selected upstream database [%s]", result ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupImpl.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupImpl.java similarity index 78% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupImpl.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupImpl.java index c2e48dcb700c..b417082e63d4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupImpl.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupImpl.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.List; import java.util.Map; @@ -30,30 +30,27 @@ import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; -class ConnectRandomlyToServerGroupImpl +public class ConnectRandomlyToServerGroupImpl { private final List groups; private final TopologyService topologyService; private final MemberId myself; - private final String dbName; private final Random random = new Random(); - ConnectRandomlyToServerGroupImpl( List groups, TopologyService topologyService, MemberId myself, String dbName ) + ConnectRandomlyToServerGroupImpl( List groups, TopologyService topologyService, MemberId myself ) { this.groups = groups; this.topologyService = topologyService; this.myself = myself; - this.dbName = dbName; } public Optional upstreamDatabase() { - Map replicas = topologyService.localReadReplicas().members(); + Map replicas = topologyService.localReadReplicas().members(); - List choices = groups.stream() - .flatMap( group -> replicas.entrySet().stream().filter( isMyGroupAndNotMe( group ) ) ) - .map( Map.Entry::getKey ) - .collect( Collectors.toList() ); + List choices = + groups.stream().flatMap( group -> replicas.entrySet().stream().filter( isMyGroupAndNotMe( group ) ) ).map( Map.Entry::getKey ).collect( + Collectors.toList() ); if ( choices.isEmpty() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategy.java similarity index 88% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategy.java index a68b90d442e8..bf3ed8c8acb0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategy.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.List; import java.util.Optional; @@ -25,25 +25,25 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class ConnectRandomlyToServerGroupStrategy extends UpstreamDatabaseSelectionStrategy { - static final String NAME = "connect-randomly-to-server-group"; + static final String IDENTITY = "connect-randomly-to-server-group"; private ConnectRandomlyToServerGroupImpl strategyImpl; public ConnectRandomlyToServerGroupStrategy() { - super( NAME ); + super( IDENTITY ); } @Override - void init() + public void init() { List groups = config.get( CausalClusteringSettings.connect_randomly_to_server_group_strategy ); - strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService, - myself, dbName ); + strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService, myself ); if ( groups.isEmpty() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategy.java similarity index 82% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategy.java index 5378a2788cf8..3ad967e238ae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategy.java @@ -17,31 +17,33 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.List; import java.util.Optional; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class ConnectRandomlyWithinServerGroupStrategy extends UpstreamDatabaseSelectionStrategy { + public static final String IDENTITY = "connect-randomly-within-server-group"; private ConnectRandomlyToServerGroupImpl strategyImpl; public ConnectRandomlyWithinServerGroupStrategy() { - super( "connect-randomly-within-server-group" ); + super( IDENTITY ); } @Override - void init() + public void init() { List groups = config.get( CausalClusteringSettings.server_groups ); - strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService, myself, dbName ); - log.warn( "Upstream selection strategy " + readableName + " is deprecated. Consider using " + ConnectRandomlyToServerGroupStrategy.NAME + " instead." ); + strategyImpl = new ConnectRandomlyToServerGroupImpl( groups, topologyService, myself ); + log.warn( "Upstream selection strategy " + readableName + " is deprecated. Consider using " + IDENTITY + " instead." ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategy.java similarity index 85% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategy.java index 0a07fffb27ba..00f124545384 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategy.java @@ -17,24 +17,26 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.Optional; import java.util.Random; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class ConnectToRandomCoreServerStrategy extends UpstreamDatabaseSelectionStrategy { + public static final String IDENTITY = "connect-to-random-core-server"; private final Random random = new Random(); public ConnectToRandomCoreServerStrategy() { - super( "connect-to-random-core-server" ); + super( IDENTITY ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategy.java similarity index 61% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategy.java index 0496d2b5d83b..ec5c9493bba6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategy.java @@ -17,22 +17,29 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.Optional; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class TypicallyConnectToRandomReadReplicaStrategy extends UpstreamDatabaseSelectionStrategy { - private final ModuloCounter counter = new ModuloCounter( 10 ); + public static final String IDENTITY = "typically-connect-to-random-read-replica"; + private final ModuloCounter counter; public TypicallyConnectToRandomReadReplicaStrategy() { - super( "typically-connect-to-random-read-replica" ); + this( 10 ); + } + + public TypicallyConnectToRandomReadReplicaStrategy( int connectToCoreInterval ) + { + super( IDENTITY ); + this.counter = new ModuloCounter( connectToCoreInterval ); } @Override @@ -40,14 +47,29 @@ public Optional upstreamDatabase() { if ( counter.shouldReturnCoreMemberId() ) { - return topologyService.localCoreServers().anyCoreMemberId(); + return getCoreMemberId(); } else { - return topologyService.localReadReplicas().anyReadReplicaMemberId(); + Optional memberId = getReadReplicaMemberId(); + if ( !memberId.isPresent() ) + { + memberId = getCoreMemberId(); + } + return memberId; } } + private Optional getReadReplicaMemberId() + { + return topologyService.localReadReplicas().randomReadReplicaMemberId(); + } + + private Optional getCoreMemberId() + { + return topologyService.localCoreServers().randomCoreMemberId(); + } + private static class ModuloCounter { private final int modulo; @@ -55,8 +77,7 @@ private static class ModuloCounter ModuloCounter( int modulo ) { - // e.g. every 10th means 0-9 - this.modulo = modulo - 1; + this.modulo = modulo; } boolean shouldReturnCoreMemberId() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategy.java similarity index 79% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategy.java index e6875b5c1268..19b3d47e0aed 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategy.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import java.util.Map; import java.util.Objects; @@ -34,12 +34,14 @@ import org.neo4j.causalclustering.routing.load_balancing.plugins.server_policies.FilterConfigParser; import org.neo4j.causalclustering.routing.load_balancing.plugins.server_policies.InvalidFilterSpecification; import org.neo4j.causalclustering.routing.load_balancing.plugins.server_policies.ServerInfo; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.helpers.Service; @Service.Implementation( UpstreamDatabaseSelectionStrategy.class ) public class UserDefinedConfigurationStrategy extends UpstreamDatabaseSelectionStrategy { + public static final String IDENTITY = "user-defined"; // Empty if provided filter config cannot be parsed. // Ideally this class would not be created until config has been successfully parsed // in which case there would be no need for Optional @@ -47,11 +49,11 @@ public class UserDefinedConfigurationStrategy extends UpstreamDatabaseSelectionS public UserDefinedConfigurationStrategy() { - super( "user-defined" ); + super( IDENTITY ); } @Override - void init() + public void init() { String filterConfig = config.get( CausalClusteringSettings.user_defined_upstream_selection_strategy ); try @@ -63,8 +65,8 @@ void init() catch ( InvalidFilterSpecification invalidFilterSpecification ) { filters = Optional.empty(); - log.warn( "Cannot parse configuration '" + filterConfig + "' for upstream selection strategy " - + readableName + ". " + invalidFilterSpecification.getMessage() ); + log.warn( "Cannot parse configuration '" + filterConfig + "' for upstream selection strategy " + readableName + ". " + + invalidFilterSpecification.getMessage() ); } } @@ -75,27 +77,20 @@ public Optional upstreamDatabase() { Set possibleServers = possibleServers(); - return filters.apply( possibleServers ).stream() - .map( ServerInfo::memberId ) - .filter( memberId -> !Objects.equals( myself, memberId ) ) - .findFirst(); + return filters.apply( possibleServers ).stream().map( ServerInfo::memberId ).filter( memberId -> !Objects.equals( myself, memberId ) ).findFirst(); } ); } private Set possibleServers() { - Stream> infoMap = - Stream.of( topologyService.localReadReplicas(), topologyService.localCoreServers() ) - .map( Topology::members ) - .map( Map::entrySet ) - .flatMap( Set::stream ); + Stream> infoMap = + Stream.of( topologyService.localReadReplicas(), topologyService.localCoreServers() ).map( Topology::members ).map( Map::entrySet ).flatMap( + Set::stream ); - return infoMap - .map( this::toServerInfo ) - .collect( Collectors.toSet() ); + return infoMap.map( this::toServerInfo ).collect( Collectors.toSet() ); } - private ServerInfo toServerInfo( Map.Entry entry ) + private ServerInfo toServerInfo( Map.Entry entry ) { T server = entry.getValue(); MemberId memberId = entry.getKey(); diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy deleted file mode 100644 index 17ade9a169a7..000000000000 --- a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ /dev/null @@ -1,3 +0,0 @@ -org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategy -org.neo4j.causalclustering.readreplica.ConnectRandomlyWithinServerGroupStrategy -org.neo4j.causalclustering.readreplica.ConnectRandomlyToServerGroupStrategy diff --git a/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy new file mode 100644 index 000000000000..ca79c7d6ae20 --- /dev/null +++ b/enterprise/causal-clustering/src/main/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy @@ -0,0 +1,3 @@ +org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategy +org.neo4j.causalclustering.upstream.strategies.ConnectRandomlyWithinServerGroupStrategy +org.neo4j.causalclustering.upstream.strategies.ConnectRandomlyToServerGroupStrategy diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpChannelPoolTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpChannelPoolTest.java index d05c95ab9488..2416da0dd786 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpChannelPoolTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpChannelPoolTest.java @@ -21,16 +21,22 @@ import org.junit.Test; +import java.net.ConnectException; +import java.util.function.Function; + import org.neo4j.helpers.AdvertisedSocketAddress; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class CatchUpChannelPoolTest { @Test - public void shouldReUseAChannelThatWasReleased() + public void shouldReUseAChannelThatWasReleased() throws Exception { // given CatchUpChannelPool pool = new CatchUpChannelPool<>( TestChannel::new ); @@ -45,7 +51,7 @@ public void shouldReUseAChannelThatWasReleased() } @Test - public void shouldCreateANewChannelIfFirstChannelIsDisposed() + public void shouldCreateANewChannelIfFirstChannelIsDisposed() throws Exception { // given CatchUpChannelPool pool = new CatchUpChannelPool<>( TestChannel::new ); @@ -60,7 +66,7 @@ public void shouldCreateANewChannelIfFirstChannelIsDisposed() } @Test - public void shouldCreateANewChannelIfFirstChannelIsStillActive() + public void shouldCreateANewChannelIfFirstChannelIsStillActive() throws Exception { // given CatchUpChannelPool pool = new CatchUpChannelPool<>( TestChannel::new ); @@ -74,7 +80,7 @@ public void shouldCreateANewChannelIfFirstChannelIsStillActive() } @Test - public void shouldCleanUpOnClose() + public void shouldCleanUpOnClose() throws Exception { // given CatchUpChannelPool pool = new CatchUpChannelPool<>( TestChannel::new ); @@ -102,14 +108,86 @@ public void shouldCleanUpOnClose() assertTrue( channelF.closed ); } + @Test + public void shouldFailWithExceptionIsChannelIsNotActive() + { + CatchUpChannelPool pool = new CatchUpChannelPool<>( advertisedSocketAddress -> new TestChannel( advertisedSocketAddress, false ) ); + + try + { + pool.acquire( localAddress( 1 ) ); + } + catch ( Exception e ) + { + assertEquals( ConnectException.class, e.getClass() ); + assertEquals( "Unable to connect to localhost:1", e.getMessage() ); + return; + } + fail(); + } + + @Test + public void shouldCheckConnectionOnIdleChannelFirst() + { + // given + CatchUpChannelPool pool = new CatchUpChannelPool<>( new Function() + { + boolean firstIsActive = true; + + @Override + public TestChannel apply( AdvertisedSocketAddress address ) + { + TestChannel testChannel = new TestChannel( address, firstIsActive ); + firstIsActive = false; + return testChannel; + } + } ); + + TestChannel channel = null; + try + { + channel = pool.acquire( localAddress( 1 ) ); + assertNotNull( channel ); + } + catch ( Exception e ) + { + fail( "Not expected exception" ); + } + + // when channel loses connection in idle + channel.isActive = false; + pool.release( channel ); + + try + { + // then + pool.acquire( localAddress( 1 ) ); + } + catch ( Exception e ) + { + assertEquals( ConnectException.class, e.getClass() ); + assertEquals( "Unable to connect to localhost:1", e.getMessage() ); + return; + } + fail(); + } + private static class TestChannel implements CatchUpChannelPool.Channel { private final AdvertisedSocketAddress address; + private boolean isActive; private boolean closed; - TestChannel( AdvertisedSocketAddress address ) + TestChannel( AdvertisedSocketAddress address, boolean isActive ) { + this.address = address; + this.isActive = isActive; + } + + TestChannel( AdvertisedSocketAddress address ) + { + this( address, true ); } @Override @@ -118,6 +196,18 @@ public AdvertisedSocketAddress destination() return address; } + @Override + public void connect() + { + // do nothing + } + + @Override + public boolean isActive() + { + return isActive; + } + @Override public void close() { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetriesTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTimeTest.java similarity index 51% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetriesTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTimeTest.java index 58ae1c8c01c6..cc57cf1351c2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalRetriesTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/MaximumTotalTimeTest.java @@ -27,44 +27,24 @@ import org.neo4j.time.FakeClock; -public class MaximumTotalRetriesTest +public class MaximumTotalTimeTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @Test - public void shouldRetryUntilMaximumRetries() throws Exception + public void shouldFailWhenAllowedTimeHasPassed() throws StoreCopyFailedException { - FakeClock clock = new FakeClock(); - MaximumTotalRetries maximumTotalRetries = new MaximumTotalRetries( 4, -1, clock ); + TimeUnit timeUnit = TimeUnit.SECONDS; + FakeClock fakeClock = new FakeClock( 0, timeUnit ); - maximumTotalRetries.assertContinue(); - maximumTotalRetries.assertContinue(); - maximumTotalRetries.assertContinue(); - expectedException.expect( StoreCopyFailedException.class ); - maximumTotalRetries.assertContinue(); - } - - @Test - public void shouldContinueIfAllowedInBetweenTimeIsMet() throws Exception - { - // given - FakeClock clock = new FakeClock(); - MaximumTotalRetries maximumTotalRetries = new MaximumTotalRetries( 1, 0, clock ); + MaximumTotalTime maximumTotalTime = new MaximumTotalTime( 5, timeUnit, fakeClock ); - // when we retry - maximumTotalRetries.assertContinue(); - - // then we can retry again because in between time == 0 - maximumTotalRetries.assertContinue(); - - //when we increment clock - clock.forward( 1, TimeUnit.MILLISECONDS ); - - //then we expected exception thrown + maximumTotalTime.assertContinue(); + fakeClock.forward( 5, timeUnit ); + maximumTotalTime.assertContinue(); expectedException.expect( StoreCopyFailedException.class ); - - // when we retry - maximumTotalRetries.assertContinue(); + fakeClock.forward( 1, timeUnit ); + maximumTotalTime.assertContinue(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java index b7118b3384ba..183e72e9cedc 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchupAddressProvider; @@ -50,6 +51,7 @@ import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.net.Server; import org.neo4j.collection.primitive.base.Empty; +import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.collection.Iterators; @@ -119,7 +121,10 @@ public void setup() CatchUpClient catchUpClient = new CatchupClientBuilder().build(); catchUpClient.start(); - subject = new StoreCopyClient( catchUpClient, logProvider ); + + ConstantTimeTimeoutStrategy storeCopyBackoffStrategy = new ConstantTimeTimeoutStrategy( 1, TimeUnit.MILLISECONDS ); + + subject = new StoreCopyClient( catchUpClient, logProvider, storeCopyBackoffStrategy ); } @After @@ -180,8 +185,8 @@ public void reconnectingWorks() throws StoreCopyFailedException, IOException // given local client has a store InMemoryStoreStreamProvider storeFileStream = new InMemoryStoreStreamProvider(); - // and file B is broken once (after retry it works) - fileB.setRemainingNoResponse( 1 ); + // and file B is broken once (after retry it works) + fileB.setRemainingNoResponse( 1 ); // when catchup is performed for valid transactionId and StoreId CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress( from( catchupServer.address().getPort() ) ); @@ -353,8 +358,8 @@ static String fileContent( File file, FileSystemAbstraction fsa ) throws IOExcep return stringBuilder.toString(); } - private String clientFileContents( InMemoryStoreStreamProvider storeStreamProvider, String filename ) + private String clientFileContents( InMemoryStoreStreamProvider storeFileStreamsProvider, String filename ) { - return storeStreamProvider.fileStreams().get( filename ).toString(); + return storeFileStreamsProvider.fileStreams().get( filename ).toString(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientTest.java index c4d72a84bcbf..65a456667f1f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientTest.java @@ -27,6 +27,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,6 +35,7 @@ import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchupAddressProvider; +import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.collection.primitive.Primitive; @@ -73,12 +75,14 @@ public class StoreCopyClientTest // helpers private File[] serverFiles = new File[]{new File( "fileA.txt" ), new File( "fileB.bmp" )}; private PrimitiveLongSet indexIds = Primitive.longSet(); + private ConstantTimeTimeoutStrategy backOffStrategy; @Before public void setup() { indexIds.add( 13 ); - subject = new StoreCopyClient( catchUpClient, logProvider ); + backOffStrategy = new ConstantTimeTimeoutStrategy( 1, TimeUnit.MILLISECONDS ); + subject = new StoreCopyClient( catchUpClient, logProvider, backOffStrategy ); } @Test @@ -132,7 +136,7 @@ public void storeIdCanBeRetrieved() throws StoreIdDownloadFailedException, Catch public void shouldFailIfTerminationConditionFails() throws CatchUpClientException { // given a file will fail an expected number of times - subject = new StoreCopyClient( catchUpClient, logProvider ); + subject = new StoreCopyClient( catchUpClient, logProvider, backOffStrategy ); // and requesting the individual file will fail when( catchUpClient.makeBlockingRequest( any(), any(), any() ) ).thenReturn( diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java index ec9f491f2948..35e140b29411 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java @@ -38,7 +38,7 @@ import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.internal.DatabaseHealth; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java index 9306b8855683..0e50647b962b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java @@ -40,6 +40,8 @@ import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.Service; import org.neo4j.io.fs.FileSystemAbstraction; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java index b889851fbc74..3fa87190d898 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaToReadReplicaCatchupIT.java @@ -28,10 +28,9 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.CoreClusterMember; -import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.ReadReplica; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy; +import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy; import org.neo4j.function.ThrowingSupplier; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoaderTest.java similarity index 88% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoaderTest.java index 46d3d6cc6d82..fbc14bfb5932 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategiesLoaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategiesLoaderTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream; import org.junit.Test; @@ -46,8 +46,7 @@ public void shouldReturnConfiguredClassesOnly() Config config = Config.defaults( upstream_selection_strategy, "dummy" ); UpstreamDatabaseStrategiesLoader strategies = - new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, - myself, NullLogProvider.getInstance() ); + new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, myself, NullLogProvider.getInstance() ); // when Set upstreamDatabaseSelectionStrategies = asSet( strategies.iterator() ); @@ -66,11 +65,9 @@ public void shouldReturnTheFirstStrategyThatWorksFromThoseConfigured() // when UpstreamDatabaseStrategiesLoader strategies = - new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, - myself, NullLogProvider.getInstance() ); + new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config, myself, NullLogProvider.getInstance() ); // then - assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class, - strategies.iterator().next().getClass() ); + assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class, strategies.iterator().next().getClass() ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelectorTest.java similarity index 90% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelectorTest.java index 6be091599423..f6455652e90a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UpstreamDatabaseStrategySelectorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/UpstreamDatabaseStrategySelectorTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream; import org.junit.Test; @@ -26,12 +26,12 @@ import java.util.Optional; import java.util.UUID; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategy; import org.neo4j.helpers.Service; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.NullLogProvider; @@ -62,7 +62,7 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception when( goodOne.upstreamDatabase() ).thenReturn( Optional.of( theMemberId ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null, NullLogProvider.getInstance() ); + new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), NullLogProvider.getInstance() ); // when MemberId result = selector.bestUpstreamDatabase(); @@ -77,8 +77,8 @@ public void shouldDefaultToRandomCoreServerIfNoOtherStrategySpecified() throws E // given TopologyService topologyService = mock( TopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); - when( topologyService.localCoreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, - mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); + when( topologyService.localCoreServers() ).thenReturn( + new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); ConnectToRandomCoreServerStrategy defaultStrategy = new ConnectToRandomCoreServerStrategy(); defaultStrategy.inject( topologyService, Config.defaults(), NullLogProvider.getInstance(), null ); @@ -98,8 +98,8 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception // given TopologyService topologyService = mock( TopologyService.class ); MemberId memberId = new MemberId( UUID.randomUUID() ); - when( topologyService.localCoreServers() ).thenReturn( new CoreTopology( new ClusterId( UUID.randomUUID() ), false, - mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); + when( topologyService.localCoreServers() ).thenReturn( + new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) ); ConnectToRandomCoreServerStrategy shouldNotUse = new ConnectToRandomCoreServerStrategy(); @@ -107,7 +107,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) ); UpstreamDatabaseStrategySelector selector = - new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null, NullLogProvider.getInstance() ); + new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), NullLogProvider.getInstance() ); // when selector.bestUpstreamDatabase(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyImplTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyImplTest.java similarity index 92% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyImplTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyImplTest.java index c0ee37a75aa5..cc7111cf87ef 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyImplTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyImplTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -42,9 +42,9 @@ import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.isIn; -import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.fakeTopologyService; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.memberIDs; +import static org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.fakeTopologyService; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.memberIDs; public class ConnectRandomlyToServerGroupStrategyImplTest { @@ -57,13 +57,13 @@ public void shouldStayWithinGivenSingleServerGroup() MemberId[] myGroupMemberIds = memberIDs( 10 ); TopologyService topologyService = getTopologyService( myServerGroup, myGroupMemberIds, Collections.singletonList( "your_server_group" ) ); - ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroup, topologyService, myGroupMemberIds[0], "default" ); + ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroup, topologyService, myGroupMemberIds[0] ); // when Optional memberId = strategy.upstreamDatabase(); // then - assertThat( memberId, contains( isIn( myGroupMemberIds ) )); + assertThat( memberId, contains( isIn( myGroupMemberIds ) ) ); } @Test @@ -75,13 +75,13 @@ public void shouldSelectAnyFromMultipleServerGroups() MemberId[] myGroupMemberIds = memberIDs( 10 ); TopologyService topologyService = getTopologyService( myServerGroups, myGroupMemberIds, Arrays.asList( "x", "y", "z" ) ); - ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroups, topologyService, myGroupMemberIds[0], "default" ); + ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroups, topologyService, myGroupMemberIds[0] ); // when Optional memberId = strategy.upstreamDatabase(); // then - assertThat( memberId, contains( isIn( myGroupMemberIds ) )); + assertThat( memberId, contains( isIn( myGroupMemberIds ) ) ); } @Test @@ -91,7 +91,7 @@ public void shouldReturnEmptyIfNoGroupsInConfig() MemberId[] myGroupMemberIds = memberIDs( 10 ); TopologyService topologyService = getTopologyService( Collections.singletonList( "my_server_group" ), myGroupMemberIds, Arrays.asList( "x", "y", "z" ) ); - ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( Collections.emptyList(), topologyService, null, "default" ); + ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( Collections.emptyList(), topologyService, null ); // when Optional memberId = strategy.upstreamDatabase(); @@ -109,7 +109,7 @@ public void shouldReturnEmptyIfGroupOnlyContainsSelf() MemberId[] myGroupMemberIds = memberIDs( 1 ); TopologyService topologyService = getTopologyService( myServerGroup, myGroupMemberIds, Arrays.asList( "x", "y", "z" ) ); - ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroup, topologyService, myGroupMemberIds[0], "default" ); + ConnectRandomlyToServerGroupImpl strategy = new ConnectRandomlyToServerGroupImpl( myServerGroup, topologyService, myGroupMemberIds[0] ); // when Optional memberId = strategy.upstreamDatabase(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyTest.java similarity index 93% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyTest.java index 7298c024a269..936aa809b082 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyToServerGroupStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyToServerGroupStrategyTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -33,7 +33,7 @@ import static co.unruly.matchers.OptionalMatchers.contains; import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertThat; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.memberIDs; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.memberIDs; public class ConnectRandomlyToServerGroupStrategyTest { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategyTest.java similarity index 93% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategyTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategyTest.java index dd277437790b..88b2f18fb465 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectRandomlyWithinServerGroupStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectRandomlyWithinServerGroupStrategyTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -33,7 +33,7 @@ import static co.unruly.matchers.OptionalMatchers.contains; import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertThat; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.memberIDs; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.memberIDs; public class ConnectRandomlyWithinServerGroupStrategyTest { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategyTest.java similarity index 80% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategyTest.java index 079e216d283f..50ed4fc1b89a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServerStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/ConnectToRandomCoreServerStrategyTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.UUID; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; @@ -42,7 +41,6 @@ import static org.hamcrest.core.AnyOf.anyOf; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.Iterators.asSet; @@ -58,8 +56,7 @@ public void shouldConnectToRandomCoreServer() throws Exception MemberId memberId3 = new MemberId( UUID.randomUUID() ); TopologyService topologyService = mock( TopologyService.class ); - when( topologyService.localCoreServers() ) - .thenReturn( fakeCoreTopology( memberId1, memberId2, memberId3 ) ); + when( topologyService.localCoreServers() ).thenReturn( fakeCoreTopology( memberId1, memberId2, memberId3 ) ); ConnectToRandomCoreServerStrategy connectionStrategy = new ConnectToRandomCoreServerStrategy(); connectionStrategy.inject( topologyService, Config.defaults(), NullLogProvider.getInstance(), null ); @@ -83,10 +80,10 @@ static CoreTopology fakeCoreTopology( MemberId... memberIds ) for ( MemberId memberId : memberIds ) { - coreMembers.put( memberId, new CoreServerInfo( new AdvertisedSocketAddress( "localhost", 5000 + offset ), - new AdvertisedSocketAddress( "localhost", 6000 + offset ), new ClientConnectorAddresses( - singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, - new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ), asSet( "core" ), "default" ) ); + coreMembers.put( memberId, + new CoreServerInfo( new AdvertisedSocketAddress( "localhost", 5000 + offset ), new AdvertisedSocketAddress( "localhost", 6000 + offset ), + new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, + new AdvertisedSocketAddress( "localhost", 7000 + offset ) ) ) ), asSet( "core" ), "default" ) ); offset++; } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategyTest.java similarity index 66% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategyTest.java index 38b059b9fab6..b598138af3ab 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/TypicallyConnectToRandomReadReplicaStrategyTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -26,21 +26,19 @@ import java.util.UUID; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.NullLogProvider; -import org.neo4j.management.CausalClustering; import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.fakeReadReplicaTopology; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.fakeTopologyService; -import static org.neo4j.causalclustering.readreplica.UserDefinedConfigurationStrategyTest.memberIDs; +import static org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.fakeReadReplicaTopology; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.fakeTopologyService; +import static org.neo4j.causalclustering.upstream.strategies.UserDefinedConfigurationStrategyTest.memberIDs; public class TypicallyConnectToRandomReadReplicaStrategyTest { @@ -49,25 +47,27 @@ public void shouldConnectToCoreOneInTenTimesByDefault() { // given MemberId theCoreMemberId = new MemberId( UUID.randomUUID() ); - TopologyService topologyService = - fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ) ) ); + TopologyService topologyService = fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ) ) ); Config config = mock( Config.class ); when( config.get( CausalClusteringSettings.database ) ).thenReturn( "default" ); - TypicallyConnectToRandomReadReplicaStrategy connectionStrategy = - new TypicallyConnectToRandomReadReplicaStrategy(); + TypicallyConnectToRandomReadReplicaStrategy connectionStrategy = new TypicallyConnectToRandomReadReplicaStrategy( 2 ); connectionStrategy.inject( topologyService, config, NullLogProvider.getInstance(), null ); List responses = new ArrayList<>(); // when - for ( int i = 0; i < 10; i++ ) + for ( int i = 0; i < 3; i++ ) { - responses.add( connectionStrategy.upstreamDatabase().get() ); + for ( int j = 0; j < 2; j++ ) + { + responses.add( connectionStrategy.upstreamDatabase().get() ); + } + assertThat( responses, hasItem( theCoreMemberId ) ); + responses.clear(); } // then - assertThat( responses, hasItem( theCoreMemberId ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategyTest.java similarity index 93% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategyTest.java index 06294c2c6dd6..c6c993f31da4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/UserDefinedConfigurationStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/upstream/strategies/UserDefinedConfigurationStrategyTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.readreplica; +package org.neo4j.causalclustering.upstream.strategies; import org.junit.Test; @@ -54,7 +54,7 @@ import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertThat; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; -import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; +import static org.neo4j.causalclustering.upstream.strategies.ConnectToRandomCoreServerStrategyTest.fakeCoreTopology; import static org.neo4j.helpers.collection.Iterators.asSet; public class UserDefinedConfigurationStrategyTest @@ -68,7 +68,7 @@ public void shouldPickTheFirstMatchingServerIfCore() fakeTopologyService( fakeCoreTopology( theCoreMemberId ), fakeReadReplicaTopology( memberIDs( 100 ), this::noEastGroupGenerator ) ); UserDefinedConfigurationStrategy strategy = new UserDefinedConfigurationStrategy(); - Config config = Config.defaults( CausalClusteringSettings.user_defined_upstream_selection_strategy,"groups(east); groups(core); halt()" ); + Config config = Config.defaults( CausalClusteringSettings.user_defined_upstream_selection_strategy, "groups(east); groups(core); halt()" ); strategy.inject( topologyService, config, NullLogProvider.getInstance(), null ); @@ -183,15 +183,14 @@ static ReadReplicaTopology fakeReadReplicaTopology( MemberId[] readReplicaIds, F final AtomicInteger offset = new AtomicInteger( 10_000 ); - Function toReadReplicaInfo = memberId -> readReplicaInfo( memberId, offset, groupGenerator ); + Function toReadReplicaInfo = memberId -> readReplicaInfo( memberId, offset, groupGenerator ); - Map readReplicas = Stream.of( readReplicaIds ) - .collect( Collectors.toMap( Function.identity(), toReadReplicaInfo ) ); + Map readReplicas = Stream.of( readReplicaIds ).collect( Collectors.toMap( Function.identity(), toReadReplicaInfo ) ); return new ReadReplicaTopology( readReplicas ); } - private static ReadReplicaInfo readReplicaInfo( MemberId memberId, AtomicInteger offset, Function> groupGenerator ) + private static ReadReplicaInfo readReplicaInfo( MemberId memberId, AtomicInteger offset, Function> groupGenerator ) { return new ReadReplicaInfo( new ClientConnectorAddresses( singletonList( new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt, @@ -263,9 +262,7 @@ public void shutdown() static MemberId[] memberIDs( int howMany ) { - return Stream.generate( () -> new MemberId( UUID.randomUUID() ) ) - .limit( howMany ) - .toArray( MemberId[]::new ); + return Stream.generate( () -> new MemberId( UUID.randomUUID() ) ).limit( howMany ).toArray( MemberId[]::new ); } private final String northGroup = "north"; diff --git a/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy deleted file mode 100644 index f0995b4b4b2e..000000000000 --- a/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy +++ /dev/null @@ -1,5 +0,0 @@ -org.neo4j.causalclustering.readreplica.ReadReplicaStartupProcessTest$AlwaysChooseFirstMember -org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$DummyUpstreamDatabaseSelectionStrategy -org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$AnotherDummyUpstreamDatabaseSelectionStrategy -org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelectorTest$YetAnotherDummyUpstreamDatabaseSelectionStrategy -org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT$SpecificReplicaStrategy diff --git a/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy b/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy new file mode 100644 index 000000000000..ebba7555ea46 --- /dev/null +++ b/enterprise/causal-clustering/src/test/resources/META-INF/services/org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy @@ -0,0 +1,5 @@ +org.neo4j.causalclustering.readreplica.ReadReplicaStartupProcessTest$AlwaysChooseFirstMember +org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelectorTest$DummyUpstreamDatabaseSelectionStrategy +org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelectorTest$AnotherDummyUpstreamDatabaseSelectionStrategy +org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelectorTest$YetAnotherDummyUpstreamDatabaseSelectionStrategy +org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT$SpecificReplicaStrategy