diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index 514c6d7448192..c4c50462e416d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -31,8 +31,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import org.neo4j.causalclustering.discovery.TopologyService; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; @@ -42,13 +40,13 @@ import org.neo4j.logging.LogProvider; import org.neo4j.ssl.SslPolicy; +import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.neo4j.causalclustering.catchup.TimeoutLoop.waitForCompletion; public class CatchUpClient extends LifecycleAdapter { private final LogProvider logProvider; - private final TopologyService topologyService; private final Log log; private final Clock clock; private final Monitors monitors; @@ -58,11 +56,10 @@ public class CatchUpClient extends LifecycleAdapter private NioEventLoopGroup eventLoopGroup; - public CatchUpClient( TopologyService topologyService, LogProvider logProvider, Clock clock, + public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors, SslPolicy sslPolicy ) { this.logProvider = logProvider; - this.topologyService = topologyService; this.log = logProvider.getLog( getClass() ); this.clock = clock; this.inactivityTimeoutMillis = inactivityTimeoutMillis; @@ -70,18 +67,13 @@ public CatchUpClient( TopologyService topologyService, LogProvider logProvider, this.sslPolicy = sslPolicy; } - public T makeBlockingRequest( MemberId upstream, CatchUpRequest request, - CatchUpResponseCallback responseHandler ) throws CatchUpClientException + public T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, + CatchUpResponseCallback responseHandler ) + throws CatchUpClientException { CompletableFuture future = new CompletableFuture<>(); - Optional catchUpAddress = topologyService.findCatchupAddress( upstream ); - if ( !catchUpAddress.isPresent() ) - { - throw new CatchUpClientException( "Cannot find the target member socket address" ); - } - - CatchUpChannel channel = pool.acquire( catchUpAddress.get() ); + CatchUpChannel channel = pool.acquire( upstream ); future.whenComplete( ( result, e ) -> { @@ -98,8 +90,8 @@ public T makeBlockingRequest( MemberId upstream, CatchUpRequest request, channel.setResponseHandler( responseHandler, future ); channel.send( request ); - String operation = String.format( "Timed out executing operation %s on %s (%s)", - request, upstream, catchUpAddress.get() ); + String operation = format( "Timed out executing operation %s on %s", + request, upstream ); return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java index 5cacfa4ed772d..8b333f9cba356 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java @@ -30,6 +30,7 @@ import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; @@ -132,13 +133,13 @@ private long getPullIndex( File storeDir ) throws IOException } } - public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException + public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException { long pullIndex = getPullIndex( storeDir ); return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false ); } - public void copy( MemberId from, StoreId expectedStoreId, File destDir ) + public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File destDir ) throws StoreCopyFailedException, StreamingTransactionsFailedException { try @@ -164,7 +165,7 @@ public void copy( MemberId from, StoreId expectedStoreId, File destDir ) } } - private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy ) throws IOException, StoreCopyFailedException + private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy ) throws IOException, StoreCopyFailedException { try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider, fromTxId, asPartOfStoreCopy ) ) { @@ -189,7 +190,7 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, } } - public StoreId getStoreId( MemberId from ) throws StoreIdDownloadFailedException + public StoreId getStoreId( AdvertisedSocketAddress from ) throws StoreIdDownloadFailedException { return storeCopyClient.fetchStoreId( from ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java index d65ca2cc037f7..f1040b0fcd017 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java @@ -20,16 +20,22 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; +import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; + public class StoreCopyClient { private final CatchUpClient catchUpClient; @@ -41,7 +47,7 @@ public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider ) log = logProvider.getLog( getClass() ); } - long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) + long copyStoreFiles( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) throws StoreCopyFailedException { try @@ -82,7 +88,7 @@ public void onFileStreamingComplete( CompletableFuture signal, } } - StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException + StoreId fetchStoreId( AdvertisedSocketAddress fromAddress ) throws StoreIdDownloadFailedException { try { @@ -95,7 +101,7 @@ public void onGetStoreIdResponse( CompletableFuture signal, signal.complete( response.storeId() ); } }; - return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), responseHandler ); + return catchUpClient.makeBlockingRequest( fromAddress, new GetStoreIdRequest(), responseHandler ); } catch ( CatchUpClientException e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java index 1111f20d462ea..cde27d566c720 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java @@ -21,8 +21,8 @@ import java.io.IOException; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.logging.Log; @@ -48,7 +48,7 @@ public StoreCopyProcess( FileSystemAbstraction fs, PageCache pageCache, LocalDat this.log = logProvider.getLog( getClass() ); } - public void replaceWithStoreFrom( MemberId source, StoreId expectedStoreId ) + public void replaceWithStoreFrom( AdvertisedSocketAddress source, StoreId expectedStoreId ) throws IOException, StoreCopyFailedException, StreamingTransactionsFailedException { try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache, diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index a51b8cf1d2097..38f340ac9df78 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -34,10 +34,13 @@ import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName; +import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException; import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.Lifecycle; @@ -47,7 +50,6 @@ import org.neo4j.logging.LogProvider; import static java.lang.String.format; - import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.CANCELLED; import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC; import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING; @@ -88,6 +90,7 @@ enum State private final long txPullIntervalMillis; private final BatchingTxApplier applier; private final PullRequestMonitor pullRequestMonitor; + private final TopologyService topologyService; private RenewableTimeout timeout; private volatile State state = TX_PULLING; @@ -99,7 +102,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba Lifecycle startStopOnStoreCopy, CatchUpClient catchUpClient, UpstreamDatabaseStrategySelector selectionStrategy, RenewableTimeoutService timeoutService, long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, - StoreCopyProcess storeCopyProcess, Supplier databaseHealthSupplier ) + StoreCopyProcess storeCopyProcess, Supplier databaseHealthSupplier, TopologyService topologyService ) { this.localDatabase = localDatabase; @@ -113,6 +116,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); this.storeCopyProcess = storeCopyProcess; this.databaseHealthSupplier = databaseHealthSupplier; + this.topologyService = topologyService; } @Override @@ -246,10 +250,11 @@ private boolean pullAndApplyBatchOfTransactions( MemberId upstream, StoreId loca TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId ); log.debug( "Pull transactions from %s where tx id > %d [batch #%d]", upstream, lastQueuedTxId, batchCount ); + AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( upstream ).orElseThrow( () -> new TopologyLookupException( upstream ) ); TxStreamFinishedResponse response; try { - response = catchUpClient.makeBlockingRequest( upstream, txPullRequest, new CatchUpResponseAdaptor() + response = catchUpClient.makeBlockingRequest( fromAddress, txPullRequest, new CatchUpResponseAdaptor() { @Override public void onTxPullResponse( CompletableFuture signal, TxPullResponse response ) @@ -323,9 +328,10 @@ private void downloadDatabase( MemberId upstream, StoreId localStoreId ) throw new RuntimeException( throwable ); } + AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( upstream ).orElseThrow( () -> new TopologyLookupException( upstream ) ); try { - storeCopyProcess.replaceWithStoreFrom( upstream, localStoreId ); + storeCopyProcess.replaceWithStoreFrom( fromAddress, localStoreId ); } catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java index 0ee9f4ebed74a..0727e8d7c8750 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java @@ -25,8 +25,8 @@ import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.causalclustering.catchup.TxPullRequestResult; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.monitoring.Monitors; public class TxPullClient @@ -40,12 +40,11 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors ) this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); } - public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long previousTxId, - TxPullResponseListener txPullResponseListener ) + public TxPullRequestResult pullTransactions( AdvertisedSocketAddress fromAddress, StoreId storeId, long previousTxId, TxPullResponseListener txPullResponseListener ) throws CatchUpClientException { pullRequestMonitor.txPullRequest( previousTxId ); - return catchUpClient.makeBlockingRequest( from, new TxPullRequest( previousTxId, storeId ), + return catchUpClient.makeBlockingRequest( fromAddress, new TxPullRequest( previousTxId, storeId ), new CatchUpResponseAdaptor() { private long lastTxIdReceived = previousTxId; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 8302094d0f3ba..24456b71e026e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -27,7 +27,7 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.OperationContext; -import org.neo4j.causalclustering.helper.RetryStrategy; +import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.graphdb.DatabaseShutdownException; @@ -46,21 +46,21 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List private final Outbound outbound; private final ProgressTracker progressTracker; private final LocalSessionPool sessionPool; - private final RetryStrategy retryStrategy; + private final TimeoutStrategy timeoutStrategy; private final AvailabilityGuard availabilityGuard; private final LeaderLocator leaderLocator; private final Log log; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, - ProgressTracker progressTracker, RetryStrategy retryStrategy, AvailabilityGuard availabilityGuard, + ProgressTracker progressTracker, TimeoutStrategy timeoutStrategy, AvailabilityGuard availabilityGuard, LogProvider logProvider ) { this.me = me; this.outbound = outbound; this.progressTracker = progressTracker; this.sessionPool = sessionPool; - this.retryStrategy = retryStrategy; + this.timeoutStrategy = timeoutStrategy; this.availabilityGuard = availabilityGuard; this.leaderLocator = leaderLocator; @@ -76,7 +76,7 @@ public Future replicate( ReplicatedContent command, boolean trackResult DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() ); Progress progress = progressTracker.start( operation ); - RetryStrategy.Timeout timeout = retryStrategy.newTimeout(); + TimeoutStrategy.Timeout timeout = timeoutStrategy.newTimeout(); do { assertDatabaseNotShutdown(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index c274e953e53ee..b02fd15a18deb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -56,6 +56,7 @@ import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; import org.neo4j.causalclustering.core.state.storage.StateStorage; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.logging.MessageLogger; import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal; @@ -99,6 +100,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla final LifeSupport life = platformModule.life; final Monitors monitors = platformModule.monitors; final JobScheduler jobScheduler = platformModule.jobScheduler; + final TopologyService topologyService = clusteringModule.topologyService(); LogProvider logProvider = logging.getInternalLogProvider(); LogProvider userLogProvider = logging.getUserLogProvider(); @@ -120,13 +122,10 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis(); CatchUpClient catchUpClient = life - .add( new CatchUpClient( clusteringModule.topologyService(), logProvider, Clocks.systemClock(), - inactivityTimeoutMillis, monitors, sslPolicy ) ); + .add(new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMillis, monitors, sslPolicy ) ); - RemoteStore remoteStore = new RemoteStore( logProvider, fileSystem, platformModule.pageCache, - new StoreCopyClient( catchUpClient, logProvider ), - new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), - platformModule.monitors ); + RemoteStore remoteStore = new RemoteStore( logProvider, fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient, logProvider ), + new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), platformModule.monitors ); CopiedStoreRecovery copiedStoreRecovery = new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), platformModule.pageCache ); @@ -177,7 +176,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, servicesToStopOnStoreCopy, remoteStore, catchUpClient, logProvider, storeCopyProcess, coreStateMachinesModule.coreStateMachines, - snapshotService, commandApplicationProcess ); + snapshotService, commandApplicationProcess, topologyService ); RaftMessageHandler messageHandler = new RaftMessageHandler( localDatabase, logProvider, consensusModule.raftMachine(), downloader, commandApplicationProcess ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java index 5faef957510bd..65c8d4d95ed5f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java @@ -23,11 +23,14 @@ import java.util.Optional; import java.util.function.Supplier; +import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.state.storage.SimpleFileStorage; import org.neo4j.causalclustering.core.state.storage.SimpleStorage; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HostnameResolver; +import org.neo4j.causalclustering.discovery.TopologyServiceMultiRetryStrategy; +import org.neo4j.causalclustering.discovery.TopologyServiceRetryStrategy; import org.neo4j.causalclustering.identity.ClusterBinder; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -62,7 +65,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member topologyService = discoveryServiceFactory .coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider, - userLogProvider, hostnameResolver ); + userLogProvider, hostnameResolver, resolveStrategy( config ) ); life.add( topologyService ); @@ -79,6 +82,15 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member () -> sleep( 100 ), 300_000, coreBootstrapper ); } + private static TopologyServiceRetryStrategy resolveStrategy( Config config ) + { + long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); + int pollingFrequencyWithinRefreshWindow = 2; + int numberOfRetries = + pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); + } + public CoreTopologyService topologyService() { return topologyService; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java index fff214ffcc551..54230d982a67d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java @@ -31,12 +31,15 @@ import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CoreSnapshotService; import org.neo4j.causalclustering.core.state.machines.CoreStateMachines; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; import static org.neo4j.causalclustering.catchup.CatchupResult.E_TRANSACTION_PRUNED; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; @@ -53,11 +56,13 @@ public class CoreStateDownloader private final CoreStateMachines coreStateMachines; private final CoreSnapshotService snapshotService; private final CommandApplicationProcess applicationProcess; + private final TopologyService topologyService; public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy, RemoteStore remoteStore, CatchUpClient catchUpClient, LogProvider logProvider, StoreCopyProcess storeCopyProcess, CoreStateMachines coreStateMachines, - CoreSnapshotService snapshotService, CommandApplicationProcess applicationProcess ) + CoreSnapshotService snapshotService, CommandApplicationProcess applicationProcess, + TopologyService topologyService ) { this.localDatabase = localDatabase; this.startStopOnStoreCopy = startStopOnStoreCopy; @@ -68,6 +73,7 @@ public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnSt this.coreStateMachines = coreStateMachines; this.snapshotService = snapshotService; this.applicationProcess = applicationProcess; + this.topologyService = topologyService; } public void downloadSnapshot( MemberId source ) throws StoreCopyFailedException @@ -85,7 +91,8 @@ public void downloadSnapshot( MemberId source ) throws StoreCopyFailedException localDatabase.stop(); } - StoreId remoteStoreId = remoteStore.getStoreId( source ); + AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source )); + StoreId remoteStoreId = remoteStore.getStoreId( fromAddress ); if ( !isEmptyStore && !remoteStoreId.equals( localDatabase.storeId() ) ) { throw new StoreCopyFailedException( "StoreId mismatch and not empty" ); @@ -103,7 +110,7 @@ public void downloadSnapshot( MemberId source ) throws StoreCopyFailedException * are ahead, and the correct decisions for their applicability have already been taken as encapsulated * in the copied store. */ - CoreSnapshot coreSnapshot = catchUpClient.makeBlockingRequest( source, new CoreSnapshotRequest(), + CoreSnapshot coreSnapshot = catchUpClient.makeBlockingRequest( fromAddress, new CoreSnapshotRequest(), new CatchUpResponseAdaptor() { @Override @@ -115,19 +122,19 @@ public void onCoreSnapshot( CompletableFuture signal, CoreSnapshot if ( isEmptyStore ) { - storeCopyProcess.replaceWithStoreFrom( source, remoteStoreId ); + storeCopyProcess.replaceWithStoreFrom( fromAddress, remoteStoreId ); } else { StoreId localStoreId = localDatabase.storeId(); - CatchupResult catchupResult = remoteStore.tryCatchingUp( source, localStoreId, localDatabase.storeDir() ); + CatchupResult catchupResult = remoteStore.tryCatchingUp( fromAddress, localStoreId, localDatabase.storeDir() ); if ( catchupResult == E_TRANSACTION_PRUNED ) { - log.info( "Failed to pull transactions from " + source + ". They may have been pruned away." ); + log.info( format( "Failed to pull transactions from %s (%s). They may have been pruned away", source, fromAddress ) ); localDatabase.delete(); - storeCopyProcess.replaceWithStoreFrom( source, localStoreId ); + storeCopyProcess.replaceWithStoreFrom( fromAddress, localStoreId ); } else if ( catchupResult != SUCCESS_END_OF_STREAM ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/TopologyLookupException.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/TopologyLookupException.java new file mode 100644 index 0000000000000..b8fd949fd6f0d --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/TopologyLookupException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.state.snapshot; + +import org.neo4j.causalclustering.identity.MemberId; + +import static java.lang.String.format; + +public class TopologyLookupException extends RuntimeException +{ + public TopologyLookupException( Throwable cause ) + { + super( cause ); + } + + public TopologyLookupException( MemberId memberId ) + { + super( format( "Cannot find the target member %s socket address", memberId ) ); + } + + public TopologyLookupException( String message ) + { + super( message ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java index 6c79b926767bd..587deefadcec8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java @@ -29,8 +29,9 @@ public interface DiscoveryServiceFactory { CoreTopologyService coreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider, - HostnameResolver hostnameResolver ); + HostnameResolver hostnameResolver, TopologyServiceRetryStrategy topologyServiceRetryStrategy ); TopologyService topologyService( Config config, SslPolicy sslPolicy, LogProvider logProvider, - JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver ); + JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index ee20bf0a84a12..7f6fe798ee3c3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -19,6 +19,7 @@ */ package org.neo4j.causalclustering.discovery; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,11 +30,13 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.Integer.parseInt; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; @@ -44,6 +47,7 @@ import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups; +import static org.neo4j.kernel.configuration.Settings.DURATION; class HazelcastClient extends LifecycleAdapter implements TopologyService { @@ -57,6 +61,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService private final AdvertisedSocketAddress transactionSource; private final MemberId myself; private final List groups; + private final TopologyServiceRetryStrategy topologyServiceRetryStrategy; private JobScheduler.JobHandle keepAliveJob; private JobScheduler.JobHandle refreshTopologyJob; @@ -66,7 +71,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY; HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, - MemberId myself ) + MemberId myself, TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { this.hzInstance = new RobustHazelcastWrapper( connector ); this.config = config; @@ -78,6 +83,15 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); this.myself = myself; this.groups = config.get( CausalClusteringSettings.server_groups ); + this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod ); + } + + private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis ) + { + int pollingFrequencyWithinRefreshWindow = 2; + int numberOfRetries = + pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); } @Override @@ -94,6 +108,11 @@ public ReadReplicaTopology readReplicas() @Override public Optional findCatchupAddress( MemberId memberId ) + { + return topologyServiceRetryStrategy.apply( memberId, this::retrieveSocketAddress, Optional::isPresent ); + } + + private Optional retrieveSocketAddress( MemberId memberId ) { return Optional.ofNullable( catchupAddressMap.get( memberId ) ); } @@ -147,18 +166,15 @@ private void keepReadReplicaAlive() throws HazelcastInstanceNotActiveException String addresses = connectorAddresses.toString(); log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses ); - hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ) - .put( uuid, transactionSource.toString(), timeToLive, MILLISECONDS ); + hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ).put( uuid, transactionSource.toString(), timeToLive, MILLISECONDS ); - hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) - .put( uuid, myself.getUuid().toString(), timeToLive, MILLISECONDS ); + hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ).put( uuid, myself.getUuid().toString(), timeToLive, MILLISECONDS ); refreshGroups( hazelcastInstance, uuid, groups ); // this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas // we assume that all the other maps have been populated if an entry exists in this one - hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ) - .put( uuid, addresses, timeToLive, MILLISECONDS ); + hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).put( uuid, addresses, timeToLive, MILLISECONDS ); } ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 034d9fe7f59f5..45529a2704d29 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -31,6 +31,7 @@ import com.hazelcast.core.MembershipEvent; import com.hazelcast.core.MembershipListener; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,6 +66,7 @@ import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups; import static org.neo4j.causalclustering.discovery.HazelcastSslConfiguration.configureSsl; +import static org.neo4j.kernel.configuration.Settings.DURATION; class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService { @@ -79,6 +81,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol private final long refreshPeriod; private final LogProvider logProvider; private final HostnameResolver hostnameResolver; + private final TopologyServiceRetryStrategy topologyServiceRetryStrategy; private String membershipRegistrationId; private JobScheduler.JobHandle refreshJob; @@ -92,7 +95,8 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol private volatile boolean stopped; HazelcastCoreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler, - LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver ) + LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { this.config = config; this.sslPolicy = sslPolicy; @@ -104,6 +108,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol this.userLog = userLogProvider.getLog( getClass() ); this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); this.hostnameResolver = hostnameResolver; + this.topologyServiceRetryStrategy = topologyServiceRetryStrategy; } @Override @@ -309,6 +314,11 @@ public ReadReplicaTopology readReplicas() @Override public Optional findCatchupAddress( MemberId memberId ) + { + return topologyServiceRetryStrategy.apply( memberId, this::retrieveSocketAddress, Optional::isPresent ); + } + + private Optional retrieveSocketAddress( MemberId memberId ) { return Optional.ofNullable( catchupAddressMap.get( memberId ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java index db02fcfc2f3f1..2b7031b09b2e6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java @@ -32,18 +32,21 @@ public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory { @Override public CoreTopologyService coreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler, - LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver ) + LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { configureHazelcast( config ); - return new HazelcastCoreTopologyService( config, sslPolicy, myself, jobScheduler, logProvider, userLogProvider, hostnameResolver ); + return new HazelcastCoreTopologyService( config, sslPolicy, myself, jobScheduler, logProvider, userLogProvider, hostnameResolver, + topologyServiceRetryStrategy ); } @Override public TopologyService topologyService( Config config, SslPolicy sslPolicy, LogProvider logProvider, - JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver ) + JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { configureHazelcast( config ); - return new HazelcastClient( new HazelcastClientConnector( config, logProvider, sslPolicy, hostnameResolver ), jobScheduler, logProvider, config, myself ); + return new HazelcastClient( new HazelcastClientConnector( config, logProvider, sslPolicy, hostnameResolver ), jobScheduler, logProvider, config, myself, topologyServiceRetryStrategy ); } private static void configureHazelcast( Config config ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java new file mode 100644 index 0000000000000..c7e20c58c8bd6 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.function.Function; +import java.util.function.Predicate; + +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; +import org.neo4j.logging.NullLogProvider; + +/** + * Implementation of the RetryStrategy that repeats the retriable function until the correct result has been retrieved or the limit of retries has been + * encountered. + * There is a fixed delay between each retry. + * + * @param the type of input of the retriable function + * @param the type of output of the retriable function + */ +public class MultiRetryStrategy implements RetryStrategy +{ + private final long delayInMillis; + private final long retries; + private final LogProvider logProvider; + + /** + * @param delayInMillis number of milliseconds between each attempt at getting the desired result + * @param retries the number of attempts to perform before giving up + */ + public MultiRetryStrategy( long delayInMillis, long retries ) + { + this( delayInMillis, retries, NullLogProvider.getInstance() ); + } + + /** + * @param delayInMillis number of milliseconds between each attempt at getting the desired result + * @param retries the number of attempts to perform before giving up + * @param logProvider {@see LogProvider} + */ + public MultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider ) + { + this.delayInMillis = delayInMillis; + this.retries = retries; + this.logProvider = logProvider; + } + + /** + * {@inheritDoc} + */ + @Override + public E apply( I input, Function retriable, Predicate wasRetrySuccessful ) + { + Log log = logProvider.getLog( MultiRetryStrategy.class ); + E result = retriable.apply( input ); + int currentIteration = 0; + while ( !wasRetrySuccessful.test( result ) && currentIteration++ < retries ) + { + log.debug( "Try attempt was unsuccessful for input: %s\n", input ); + try + { + Thread.sleep( delayInMillis ); + } + catch ( InterruptedException e ) + { + throw new RuntimeException( e ); + } + result = retriable.apply( input ); + } + return result; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/NoRetriesStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/NoRetriesStrategy.java new file mode 100644 index 0000000000000..0d049a73f9a98 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/NoRetriesStrategy.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.function.Function; +import java.util.function.Predicate; + +public class NoRetriesStrategy implements RetryStrategy +{ + @Override + public E apply( I input, Function retriable, Predicate shouldRetry ) + { + return retriable.apply( input ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RetryStrategy.java new file mode 100644 index 0000000000000..1038a711fcdb7 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RetryStrategy.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.function.Function; +import java.util.function.Predicate; + +/** + * A strategy pattern for deciding how retries will be handled. + *

+ * Depending on the implementation, it is assumed the retriable function will be executed until conditions satisyfing desired output are met and then the latest + * (or most valid) + * result will be returned + *

+ * + * @param Type of input used for the input function (assumes 1-parameter input functions) + * @param Type of output returned from retriable function + */ +public interface RetryStrategy +{ + /** + * Run a given function until a satisfying result is achieved + * + * @param input the input parameter that is given to the retriable function + * @param retriable a function that will be executed multiple times until it returns a valid output + * @param shouldRetry a predicate deciding if the output of the retriable function is valid. Assume that the function will retry if this returns false and + * exit if it returns true + * @return the latest (or most valid) output of the retriable function, depending on implementation + */ + E apply( I input, Function retriable, Predicate shouldRetry ); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java new file mode 100644 index 0000000000000..6c79ff01d0ce2 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Optional; + +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; + +public class TopologyServiceMultiRetryStrategy extends MultiRetryStrategy> implements TopologyServiceRetryStrategy +{ + public TopologyServiceMultiRetryStrategy( long delayInMillis, long retries ) + { + super( delayInMillis, retries ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceNoRetriesStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceNoRetriesStrategy.java new file mode 100644 index 0000000000000..9f83eca1b63fd --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceNoRetriesStrategy.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Optional; + +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; + +public class TopologyServiceNoRetriesStrategy extends NoRetriesStrategy> implements TopologyServiceRetryStrategy +{ +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceRetryStrategy.java new file mode 100644 index 0000000000000..bac12e9db48de --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceRetryStrategy.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import java.util.Optional; + +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; + +public interface TopologyServiceRetryStrategy extends RetryStrategy> +{ +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeTimeoutStrategy.java similarity index 90% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeRetryStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeTimeoutStrategy.java index 7badf8f7e433e..9b554e3b39eac 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeRetryStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ConstantTimeTimeoutStrategy.java @@ -21,11 +21,11 @@ import java.util.concurrent.TimeUnit; -public class ConstantTimeRetryStrategy implements RetryStrategy +public class ConstantTimeTimeoutStrategy implements TimeoutStrategy { private final Timeout constantTimeout; - public ConstantTimeRetryStrategy( long backoffTime, TimeUnit timeUnit ) + public ConstantTimeTimeoutStrategy( long backoffTime, TimeUnit timeUnit ) { long backoffTimeMillis = timeUnit.toMillis( backoffTime ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategy.java index 9e9efbfc5997a..a4a6ebe5122bd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategy.java @@ -24,7 +24,7 @@ /** * Exponential backoff strategy helper class. Exponent is always 2. */ -public class ExponentialBackoffStrategy implements RetryStrategy +public class ExponentialBackoffStrategy implements TimeoutStrategy { private final long initialBackoffTimeMillis; private final long upperBoundBackoffTimeMillis; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/TimeoutStrategy.java similarity index 96% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RetryStrategy.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/TimeoutStrategy.java index f0fffc17362a9..dc3ee9fd5d011 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RetryStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/TimeoutStrategy.java @@ -19,7 +19,7 @@ */ package org.neo4j.causalclustering.helper; -public interface RetryStrategy +public interface TimeoutStrategy { Timeout newTimeout(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 368462eef84d3..05db48a38d5d8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -45,6 +45,8 @@ import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HostnameResolver; import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.discovery.TopologyServiceMultiRetryStrategy; +import org.neo4j.causalclustering.discovery.TopologyServiceRetryStrategy; import org.neo4j.causalclustering.discovery.procedures.ReadReplicaRoleProcedure; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.identity.MemberId; @@ -189,13 +191,14 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule HostnameResolver hostnameResolver = chooseResolver( config, logProvider, userLogProvider ); TopologyService topologyService = discoveryServiceFactory.topologyService( config, clusterSslPolicy, - logProvider, platformModule.jobScheduler, myself, hostnameResolver ); + logProvider, platformModule.jobScheduler, myself, + hostnameResolver, resolveStrategy( config ) ); life.add( dependencies.satisfyDependency( topologyService ) ); long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis(); CatchUpClient catchUpClient = life.add( - new CatchUpClient( topologyService, logProvider, Clocks.systemClock(), + new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMillis, monitors, clusterSslPolicy ) ); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); @@ -283,7 +286,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient, upstreamDatabaseStrategySelector, catchupTimeoutService, config.get( CausalClusteringSettings.pull_interval ).toMillis(), batchingTxApplier, - platformModule.monitors, storeCopyProcess, databaseHealthSupplier ); + platformModule.monitors, storeCopyProcess, databaseHealthSupplier, topologyService ); dependencies.satisfyDependencies( catchupProcess ); txPulling.add( batchingTxApplier ); @@ -294,7 +297,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ); life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, upstreamDatabaseStrategySelector, - retryStrategy, logProvider, platformModule.logging.getUserLogProvider(), storeCopyProcess ) ); + retryStrategy, logProvider, platformModule.logging.getUserLogProvider(), storeCopyProcess, topologyService ) ); CatchupServer catchupServer = new CatchupServer( platformModule.logging.getInternalLogProvider(), platformModule.logging.getUserLogProvider(), localDatabase::storeId, @@ -382,4 +385,13 @@ public UpstreamDatabaseSelectionStrategy next() }; } } + + private static TopologyServiceRetryStrategy resolveStrategy( Config config ) + { + long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); + int pollingFrequencyWithinRefreshWindow = 2; + int numberOfRetries = + pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java index f77d9a4ffc38f..5509c2cb5ca58 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java @@ -27,9 +27,12 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException; import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException; -import org.neo4j.causalclustering.helper.RetryStrategy; +import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException; +import org.neo4j.causalclustering.discovery.TopologyService; +import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -44,24 +47,26 @@ class ReadReplicaStartupProcess implements Lifecycle private final Log debugLog; private final Log userLog; - private final RetryStrategy retryStrategy; + private final TimeoutStrategy timeoutStrategy; private final UpstreamDatabaseStrategySelector selectionStrategyPipeline; + private final TopologyService topologyService; private String lastIssue; private final StoreCopyProcess storeCopyProcess; - ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase, - Lifecycle txPulling, UpstreamDatabaseStrategySelector selectionStrategyPipeline, RetryStrategy retryStrategy, - LogProvider debugLogProvider, LogProvider userLogProvider, StoreCopyProcess storeCopyProcess ) + ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase, Lifecycle txPulling, + UpstreamDatabaseStrategySelector selectionStrategyPipeline, TimeoutStrategy timeoutStrategy, LogProvider debugLogProvider, + LogProvider userLogProvider, StoreCopyProcess storeCopyProcess, TopologyService topologyService ) { this.remoteStore = remoteStore; this.localDatabase = localDatabase; this.txPulling = txPulling; this.selectionStrategyPipeline = selectionStrategyPipeline; - this.retryStrategy = retryStrategy; + this.timeoutStrategy = timeoutStrategy; this.debugLog = debugLogProvider.getLog( getClass() ); this.userLog = userLogProvider.getLog( getClass() ); this.storeCopyProcess = storeCopyProcess; + this.topologyService = topologyService; } @Override @@ -80,7 +85,7 @@ private String issueOf( String operation, int attempt ) public void start() throws IOException { boolean syncedWithUpstream = false; - RetryStrategy.Timeout timeout = retryStrategy.newTimeout(); + TimeoutStrategy.Timeout timeout = timeoutStrategy.newTimeout(); int attempt = 0; while ( !syncedWithUpstream ) { @@ -145,19 +150,20 @@ public void start() throws IOException } private void syncStoreWithUpstream( MemberId source ) - throws IOException, StoreIdDownloadFailedException, StoreCopyFailedException, - StreamingTransactionsFailedException + throws IOException, StoreIdDownloadFailedException, StoreCopyFailedException, StreamingTransactionsFailedException { if ( localDatabase.isEmpty() ) { debugLog.info( "Local database is empty, attempting to replace with copy from upstream server %s", source ); debugLog.info( "Finding store id of upstream server %s", source ); - StoreId storeId = remoteStore.getStoreId( source ); + AdvertisedSocketAddress fromAddress = + topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) ); + StoreId storeId = remoteStore.getStoreId( fromAddress ); debugLog.info( "Copying store from upstream server %s", source ); localDatabase.delete(); - storeCopyProcess.replaceWithStoreFrom( source, storeId ); + storeCopyProcess.replaceWithStoreFrom( fromAddress, storeId ); debugLog.info( "Restarting local database after copy.", source ); } @@ -170,12 +176,13 @@ private void syncStoreWithUpstream( MemberId source ) private void ensureSameStoreIdAs( MemberId upstream ) throws StoreIdDownloadFailedException { StoreId localStoreId = localDatabase.storeId(); - StoreId remoteStoreId = remoteStore.getStoreId( upstream ); + AdvertisedSocketAddress advertisedSocketAddress = + topologyService.findCatchupAddress( upstream ).orElseThrow( () -> new TopologyLookupException( upstream ) ); + StoreId remoteStoreId = remoteStore.getStoreId( advertisedSocketAddress ); if ( !localStoreId.equals( remoteStoreId ) ) { throw new IllegalStateException( format( "This read replica cannot join the cluster. " + - "The local database is not empty and has a mismatching storeId: expected %s actual %s.", - remoteStoreId, localStoreId ) ); + "The local database is not empty and has a mismatching storeId: expected %s actual %s.", remoteStoreId, localStoreId ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java index b81b76d8bc7f3..5c7bcbae26b5a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java @@ -23,15 +23,14 @@ import java.io.File; import java.io.IOException; -import java.util.UUID; import org.neo4j.causalclustering.catchup.TxPullRequestResult; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.catchup.tx.TxPullResponseListener; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.monitoring.Monitors; @@ -64,7 +63,7 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception null, storeCopyClient, txPullClient, factory( writer ), new Monitors() ); // when - MemberId localhost = new MemberId( UUID.randomUUID() ); + AdvertisedSocketAddress localhost = new AdvertisedSocketAddress( "127.0.0.1", 1234 ); remoteStore.copy( localhost, storeId, new File( "destination" ) ); // then @@ -78,7 +77,7 @@ public void shouldSetLastPulledTransactionId() throws Exception // given long lastFlushedTxId = 12; StoreId wantedStoreId = new StoreId( 1, 2, 3, 4 ); - MemberId localhost = new MemberId( UUID.randomUUID() ); + AdvertisedSocketAddress localhost = new AdvertisedSocketAddress( "127.0.0.1", 1234 ); StoreCopyClient storeCopyClient = mock( StoreCopyClient.class ); when( storeCopyClient.copyStoreFiles( eq( localhost ), eq( wantedStoreId ), any( StoreFileStreams.class ) ) ) @@ -115,7 +114,7 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception storeCopyClient, txPullClient, factory( writer ), new Monitors() ); doThrow( StoreCopyFailedException.class ).when( txPullClient ) - .pullTransactions( any( MemberId.class ), eq( storeId ), anyLong(), any( TransactionLogCatchUpWriter.class ) ); + .pullTransactions( any( AdvertisedSocketAddress.class ), eq( storeId ), anyLong(), any( TransactionLogCatchUpWriter.class ) ); // when try diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java index 22dfaa002c088..5230f5d7921f8 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java @@ -22,6 +22,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -31,9 +32,11 @@ import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.Lifecycle; @@ -70,17 +73,19 @@ public class CatchupPollingProcessTest private final StoreCopyProcess storeCopyProcess = mock( StoreCopyProcess.class ); private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final LocalDatabase localDatabase = mock( LocalDatabase.class ); + private final TopologyService topologyService = mock( TopologyService.class ); + private final AdvertisedSocketAddress coreMemberAddress = new AdvertisedSocketAddress( "hostname", 1234 ); { when( localDatabase.storeId() ).thenReturn( storeId ); + when( topologyService.findCatchupAddress( coreMemberId ) ).thenReturn( Optional.of( coreMemberAddress ) ); } private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class ); private final CatchupPollingProcess txPuller = - new CatchupPollingProcess( NullLogProvider.getInstance(), localDatabase, startStopOnStoreCopy, - catchUpClient, strategyPipeline, timeoutService, txPullIntervalMillis, txApplier, new Monitors(), - storeCopyProcess, () -> mock( DatabaseHealth.class ) ); + new CatchupPollingProcess( NullLogProvider.getInstance(), localDatabase, startStopOnStoreCopy, catchUpClient, strategyPipeline, timeoutService, + txPullIntervalMillis, txApplier, new Monitors(), storeCopyProcess, () -> mock( DatabaseHealth.class ), topologyService ); @Before public void before() throws Throwable @@ -101,8 +106,7 @@ public void shouldSendPullRequestOnTick() throws Throwable timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ); + verify( catchUpClient ).makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), any( CatchUpResponseCallback.class ) ); } @Test @@ -114,16 +118,14 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable when( txApplier.lastQueuedTxId() ).thenReturn( lastAppliedTxId ); // when - when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ) ) - .thenReturn( - new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_BATCH, 10 ), - new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 10 ) ); + when( catchUpClient.makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), + any( CatchUpResponseCallback.class ) ) ).thenReturn( new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_BATCH, 10 ), + new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 10 ) ); timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), + verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), any( CatchUpResponseCallback.class ) ); } @@ -132,9 +134,8 @@ public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable { // when txPuller.start(); - when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ) ).thenReturn( - new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 0 ) ); + when( catchUpClient.makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), + any( CatchUpResponseCallback.class ) ) ).thenReturn( new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 0 ) ); timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); @@ -147,9 +148,8 @@ public void nextStateShouldBeStoreCopyingIfRequestedTransactionHasBeenPrunedAway { // when txPuller.start(); - when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ) ).thenReturn( - new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0 ) ); + when( catchUpClient.makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), + any( CatchUpResponseCallback.class ) ) ).thenReturn( new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0 ) ); timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); @@ -162,9 +162,8 @@ public void nextStateShouldBeTxPullingAfterASuccessfulStoreCopy() throws Throwab { // given txPuller.start(); - when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ) ).thenReturn( - new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0 ) ); + when( catchUpClient.makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), + any( CatchUpResponseCallback.class ) ) ).thenReturn( new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0 ) ); // when (tx pull) timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); @@ -175,7 +174,7 @@ public void nextStateShouldBeTxPullingAfterASuccessfulStoreCopy() throws Throwab // then verify( localDatabase ).stopForStoreCopy(); verify( startStopOnStoreCopy ).stop(); - verify( storeCopyProcess ).replaceWithStoreFrom( any( MemberId.class ), eq( storeId ) ); + verify( storeCopyProcess ).replaceWithStoreFrom( any( AdvertisedSocketAddress.class ), eq( storeId ) ); verify( localDatabase ).start(); verify( startStopOnStoreCopy ).start(); verify( txApplier ).refreshFromNewStore(); @@ -191,8 +190,8 @@ public void shouldNotRenewTheTimeoutIfInPanicState() throws Throwable txPuller.start(); CatchUpResponseCallback callback = mock( CatchUpResponseCallback.class ); - doThrow( new RuntimeException( "Panic all the things" ) ).when( callback ) - .onTxPullResponse( any( CompletableFuture.class ), any( TxPullResponse.class ) ); + doThrow( new RuntimeException( "Panic all the things" ) ).when( callback ).onTxPullResponse( any( CompletableFuture.class ), + any( TxPullResponse.class ) ); // when timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); @@ -206,12 +205,10 @@ public void shouldNotRenewTheTimeoutIfInPanicState() throws Throwable public void shouldNotSignalOperationalUntilPulling() throws Throwable { // given - when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - any( CatchUpResponseCallback.class ) ) ) - .thenReturn( - new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0), - new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_BATCH, 10), - new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 15) ); + when( catchUpClient.makeBlockingRequest( any( AdvertisedSocketAddress.class ), any( TxPullRequest.class ), + any( CatchUpResponseCallback.class ) ) ).thenReturn( new TxStreamFinishedResponse( CatchupResult.E_TRANSACTION_PRUNED, 0), + new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_BATCH, 10), + new TxStreamFinishedResponse( CatchupResult.SUCCESS_END_OF_STREAM, 15) ); // when txPuller.start(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index 7ca21ab32772b..929c8bae2816e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -35,8 +35,8 @@ import org.neo4j.causalclustering.core.replication.session.GlobalSession; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.state.Result; -import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy; -import org.neo4j.causalclustering.helper.RetryStrategy; +import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; +import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Message; import org.neo4j.causalclustering.messaging.Outbound; @@ -68,7 +68,7 @@ public class RaftReplicatorTest private MemberId leader = new MemberId( UUID.randomUUID() ); private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself ); private LocalSessionPool sessionPool = new LocalSessionPool( session ); - private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 0, MILLISECONDS ); + private TimeoutStrategy timeoutStrategy = new ConstantTimeTimeoutStrategy( 0, MILLISECONDS ); private AvailabilityGuard availabilityGuard = new AvailabilityGuard( Clocks.systemClock(), NullLog.getInstance() ); @Test @@ -81,7 +81,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, - capturedProgress, retryStrategy, availabilityGuard, NullLogProvider.getInstance() ); + capturedProgress, timeoutStrategy, availabilityGuard, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -108,7 +108,7 @@ public void shouldResendAfterTimeout() throws Exception CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy, availabilityGuard, NullLogProvider.getInstance() ); + sessionPool, capturedProgress, timeoutStrategy, availabilityGuard, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -132,7 +132,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy, availabilityGuard, NullLogProvider.getInstance() ); + sessionPool, capturedProgress, timeoutStrategy, availabilityGuard, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, true ); @@ -162,7 +162,7 @@ public void stopReplicationOnShutdown() throws NoLeaderFoundException, Interrupt CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = - new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, retryStrategy, + new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, timeoutStrategy, availabilityGuard, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java index 4b19f89fedfa7..d87cc6a69cdf7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; +import java.util.Optional; import java.util.UUID; import org.neo4j.causalclustering.catchup.CatchUpClient; @@ -34,8 +35,10 @@ import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CoreSnapshotService; import org.neo4j.causalclustering.core.state.machines.CoreStateMachines; +import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.NullLogProvider; @@ -57,24 +60,27 @@ public class CoreStateDownloaderTest private final StoreCopyProcess storeCopyProcess = mock( StoreCopyProcess.class ); private CoreSnapshotService snaptshotService = mock( CoreSnapshotService.class ); private CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + private TopologyService topologyService = mock( TopologyService.class ); private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); private final NullLogProvider logProvider = NullLogProvider.getInstance(); private final MemberId remoteMember = new MemberId( UUID.randomUUID() ); + private final AdvertisedSocketAddress remoteAddress = new AdvertisedSocketAddress( "remoteAddress", 1234 ); private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final File storeDir = new File( "graph.db" ); private final CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, startStopLife, remoteStore, catchUpClient, logProvider, - storeCopyProcess, coreStateMachines, snaptshotService, applicationProcess ); + storeCopyProcess, coreStateMachines, snaptshotService, applicationProcess, topologyService ); @Before public void commonMocking() throws IOException { when( localDatabase.storeId() ).thenReturn( storeId ); when( localDatabase.storeDir() ).thenReturn( storeDir ); + when( topologyService.findCatchupAddress( remoteMember ) ).thenReturn( Optional.of( remoteAddress ) ); } @Test @@ -82,7 +88,7 @@ public void shouldDownloadCompleteStoreWhenEmpty() throws Throwable { // given StoreId remoteStoreId = new StoreId( 5, 6, 7, 8 ); - when( remoteStore.getStoreId( remoteMember ) ).thenReturn( remoteStoreId ); + when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( remoteStoreId ); when( localDatabase.isEmpty() ).thenReturn( true ); // when @@ -90,7 +96,7 @@ public void shouldDownloadCompleteStoreWhenEmpty() throws Throwable // then verify( remoteStore, never() ).tryCatchingUp( any(), any(), any() ); - verify( storeCopyProcess ).replaceWithStoreFrom( remoteMember, remoteStoreId ); + verify( storeCopyProcess ).replaceWithStoreFrom( remoteAddress, remoteStoreId ); } @Test @@ -115,7 +121,7 @@ public void shouldNotOverwriteNonEmptyMismatchingStore() throws Exception // given when( localDatabase.isEmpty() ).thenReturn( false ); StoreId remoteStoreId = new StoreId( 5, 6, 7, 8 ); - when( remoteStore.getStoreId( remoteMember ) ).thenReturn( remoteStoreId ); + when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( remoteStoreId ); // when try @@ -138,14 +144,14 @@ public void shouldCatchupIfPossible() throws Exception { // given when( localDatabase.isEmpty() ).thenReturn( false ); - when( remoteStore.getStoreId( remoteMember ) ).thenReturn( storeId ); - when( remoteStore.tryCatchingUp( remoteMember, storeId, storeDir ) ).thenReturn( SUCCESS_END_OF_STREAM ); + when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId ); + when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir ) ).thenReturn( SUCCESS_END_OF_STREAM ); // when downloader.downloadSnapshot( remoteMember ); // then - verify( remoteStore ).tryCatchingUp( remoteMember, storeId, storeDir ); + verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir ); verify( remoteStore, never() ).copy( any(), any(), any() ); } @@ -154,14 +160,14 @@ public void shouldDownloadWholeStoreIfCannotCatchUp() throws Exception { // given when( localDatabase.isEmpty() ).thenReturn( false ); - when( remoteStore.getStoreId( remoteMember ) ).thenReturn( storeId ); - when( remoteStore.tryCatchingUp( remoteMember, storeId, storeDir ) ).thenReturn( E_TRANSACTION_PRUNED ); + when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId ); + when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir ) ).thenReturn( E_TRANSACTION_PRUNED ); // when downloader.downloadSnapshot( remoteMember ); // then - verify( remoteStore ).tryCatchingUp( remoteMember, storeId, storeDir ); - verify( storeCopyProcess ).replaceWithStoreFrom( remoteMember, storeId ); + verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir ); + verify( storeCopyProcess ).replaceWithStoreFrom( remoteAddress, storeId ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java index b232145b7b313..263df05180984 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java @@ -99,6 +99,7 @@ public class HazelcastClientTest { private MemberId myself = new MemberId( UUID.randomUUID() ); + private TopologyServiceRetryStrategy topologyServiceRetryStrategy = new TopologyServiceNoRetriesStrategy(); private Config config() { @@ -123,7 +124,8 @@ public void shouldReturnTopologyUsingHazelcastMembers() throws Throwable HazelcastConnector connector = mock( HazelcastConnector.class ); OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself, + topologyServiceRetryStrategy ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -155,7 +157,8 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Throwable HazelcastConnector connector = mock( HazelcastConnector.class ); OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself, + topologyServiceRetryStrategy ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -203,7 +206,7 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Throw OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient client = new HazelcastClient( connector, jobScheduler, logProvider, config(), myself ); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, logProvider, config(), myself, topologyServiceRetryStrategy ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -253,7 +256,8 @@ public void shouldRegisterReadReplicaInTopology() throws Throwable when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself, + topologyServiceRetryStrategy ); // when hazelcastClient.start(); @@ -296,7 +300,8 @@ public void shouldRemoveReadReplicasOnGracefulShutdown() throws Throwable when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself, + topologyServiceRetryStrategy ); hazelcastClient.start(); @@ -327,7 +332,8 @@ public void shouldSwallowNPEFromHazelcast() throws Throwable OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself, + topologyServiceRetryStrategy ); hazelcastClient.start(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java new file mode 100644 index 0000000000000..da465c8fc80c0 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.discovery; + +import org.junit.Test; + +import java.util.function.Function; +import java.util.function.Predicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MultiRetryStrategyTest +{ + private static final Predicate ALWAYS_VALID = i -> true; + private static final Predicate NEVER_VALID = i -> false; + private static final Predicate VALID_ON_SECOND_TIME = new Predicate() + { + private boolean nextSuccessful; + @Override + public boolean test( Integer integer ) + { + if ( !nextSuccessful ) + { + nextSuccessful = true; + return false; + } + return true; + } + }; + + @Test + public void successOnRetryCausesNoDelay() + { + // given + int delay = 1000; + int retries = 10; + MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + + // when + long startTime = System.currentTimeMillis(); + Integer result = subject.apply( 3, Function.identity(), ALWAYS_VALID ); + long endTime = System.currentTimeMillis(); + + // then + long duration = endTime - startTime; + assertTrue( duration < delay ); + assertEquals( 3, result.intValue() ); + } + + @Test + public void numberOfIterationsDoesNotExceedMaximum() + { + // given + int delay = 20; + int retries = 10; + MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + + // when + long startTime = System.currentTimeMillis(); + Integer result = subject.apply( 3, Function.identity(), NEVER_VALID ); + long endTime = System.currentTimeMillis(); + + // then + long duration = endTime - startTime; + double durationInSeconds = duration / 1000.0; + double expectedDurationInSeconds = (delay * retries) / 1000.0; + double marginOfErrorInSeconds = (delay / 1000.0) / 2; + assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds ); + } + + @Test + public void successfulRetriesBreakTheRetryLoop() + { + // given + int delay = 200; + int retries = 10; + MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + + // when + long startTime = System.currentTimeMillis(); + Integer result = subject.apply( 3, Function.identity(), VALID_ON_SECOND_TIME ); + long endTime = System.currentTimeMillis(); + + // then + long duration = endTime - startTime; + double durationInSeconds = duration / 1000.0; + double expectedDurationInSeconds = delay / 1000.0; + double marginOfErrorInSeconds = (delay / 1000.0) / 4; + assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index 2c0dc12b2bd59..c9d5fd142e9eb 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -89,8 +89,10 @@ public ReadReplicaTopology readReplicas() @Override public Optional findCatchupAddress( MemberId upstream ) { - return coreTopology.find( upstream ).map( info -> Optional.of( info.getCatchupServer() ) ) - .orElseGet( () -> readReplicaTopology.find( upstream ).map( ReadReplicaInfo::getCatchupServer ) ); + return coreTopology.find( upstream ) + .map( info -> Optional.of( info.getCatchupServer() ) ) + .orElseGet( () -> readReplicaTopology.find( upstream ) + .map( ReadReplicaInfo::getCatchupServer ) ); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java index 0d74aed708c82..d384906856b0c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java @@ -80,7 +80,11 @@ public ReadReplicaTopology readReplicas() @Override public Optional findCatchupAddress( MemberId upstream ) { - return sharedDiscoveryService.coreTopology( null ).find( upstream ).map( info -> Optional.of( info.getCatchupServer() ) ) - .orElseGet( () -> sharedDiscoveryService.readReplicaTopology().find( upstream ).map( ReadReplicaInfo::getCatchupServer ) ); + return sharedDiscoveryService.coreTopology( null ) + .find( upstream ) + .map( info -> Optional.of( info.getCatchupServer() ) ) + .orElseGet( () -> sharedDiscoveryService.readReplicaTopology() + .find( upstream ) + .map( ReadReplicaInfo::getCatchupServer ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index c8febaa68b544..a822438f17825 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -49,7 +49,8 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory @Override public CoreTopologyService coreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler, - LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver ) + LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { SharedDiscoveryCoreClient sharedDiscoveryCoreClient = new SharedDiscoveryCoreClient( this, myself, logProvider, config ); @@ -60,7 +61,8 @@ public CoreTopologyService coreTopologyService( Config config, SslPolicy sslPoli @Override public TopologyService topologyService( Config config, SslPolicy sslPolicy, LogProvider logProvider, - JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver ) + JobScheduler jobScheduler, MemberId myself, HostnameResolver hostnameResolver, + TopologyServiceRetryStrategy topologyServiceRetryStrategy ) { return new SharedDiscoveryReadReplicaClient( this, config, myself, logProvider ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryServiceIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryServiceIT.java index 4139420504103..07d46063ed303 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryServiceIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryServiceIT.java @@ -97,7 +97,8 @@ private Callable createDiscoveryJob( MemberId member, DiscoveryServiceFact HostnameResolver hostnameResolver = new NoOpHostnameResolver(); CoreTopologyService topologyService = disoveryServiceFactory - .coreTopologyService( config(), null, member, jobScheduler, logProvider, userLogProvider, hostnameResolver ); + .coreTopologyService( config(), null, member, jobScheduler, logProvider, userLogProvider, hostnameResolver, + new TopologyServiceNoRetriesStrategy() ); return sharedClientStarter( topologyService, expectedTargetSet ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategyTest.java index c1ebec812369d..7c6a9b594b542 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/ExponentialBackoffStrategyTest.java @@ -33,7 +33,7 @@ public void shouldDoubleEachTime() throws Exception { // given ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, 1 << NUMBER_OF_ACCESSES, MILLISECONDS ); - RetryStrategy.Timeout timeout = strategy.newTimeout(); + TimeoutStrategy.Timeout timeout = strategy.newTimeout(); // when for ( int i = 0; i < NUMBER_OF_ACCESSES; i++ ) @@ -50,7 +50,7 @@ public void shouldProvidePreviousTimeout() throws Exception { // given ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, 1 << NUMBER_OF_ACCESSES, MILLISECONDS ); - RetryStrategy.Timeout timeout = strategy.newTimeout(); + TimeoutStrategy.Timeout timeout = strategy.newTimeout(); // when for ( int i = 0; i < NUMBER_OF_ACCESSES; i++ ) @@ -68,7 +68,7 @@ public void shouldRespectUpperBound() throws Exception // given long upperBound = (1 << NUMBER_OF_ACCESSES) - 5; ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, upperBound, MILLISECONDS ); - RetryStrategy.Timeout timeout = strategy.newTimeout(); + TimeoutStrategy.Timeout timeout = strategy.newTimeout(); // when for ( int i = 0; i < NUMBER_OF_ACCESSES; i++ ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java index 27eee79d15e85..5f47e0a9d206b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java @@ -37,9 +37,10 @@ import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.TopologyService; -import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy; +import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.Service; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.lifecycle.Lifecycle; @@ -57,7 +58,7 @@ public class ReadReplicaStartupProcessTest { - private ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, MILLISECONDS ); + private ConstantTimeTimeoutStrategy retryStrategy = new ConstantTimeTimeoutStrategy( 1, MILLISECONDS ); private StoreCopyProcess storeCopyProcess = mock( StoreCopyProcess.class ); private RemoteStore remoteStore = mock( RemoteStore.class ); private final PageCache pageCache = mock( PageCache.class ); @@ -67,6 +68,7 @@ public class ReadReplicaStartupProcessTest private Lifecycle txPulling = mock( Lifecycle.class ); private MemberId memberId = new MemberId( UUID.randomUUID() ); + private AdvertisedSocketAddress fromAddress = new AdvertisedSocketAddress( "127.0.0.1", 123 ); private StoreId localStoreId = new StoreId( 1, 2, 3, 4 ); private StoreId otherStoreId = new StoreId( 5, 6, 7, 8 ); private File storeDir = new File( "store-dir" ); @@ -82,6 +84,7 @@ public void commonMocking() throws StoreIdDownloadFailedException, IOException when( localDatabase.storeId() ).thenReturn( localStoreId ); when( topologyService.coreServers() ).thenReturn( clusterTopology ); when( clusterTopology.members() ).thenReturn( members ); + when( topologyService.findCatchupAddress( memberId ) ).thenReturn( Optional.of( fromAddress ) ); } @Test @@ -89,11 +92,12 @@ public void shouldReplaceEmptyStoreWithRemote() throws Throwable { // given when( localDatabase.isEmpty() ).thenReturn( true ); + when( topologyService.findCatchupAddress( any() )).thenReturn( Optional.of( fromAddress ) ); when( remoteStore.getStoreId( any() ) ).thenReturn( otherStoreId ); ReadReplicaStartupProcess readReplicaStartupProcess = - new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), - retryStrategy, NullLogProvider.getInstance(), NullLogProvider.getInstance(), storeCopyProcess ); + new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), retryStrategy, NullLogProvider.getInstance(), + NullLogProvider.getInstance(), storeCopyProcess, topologyService ); // when readReplicaStartupProcess.start(); @@ -120,8 +124,8 @@ public void shouldNotStartWithMismatchedNonEmptyStore() throws Throwable when( remoteStore.getStoreId( any() ) ).thenReturn( otherStoreId ); ReadReplicaStartupProcess readReplicaStartupProcess = - new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), - retryStrategy, NullLogProvider.getInstance(), NullLogProvider.getInstance(), storeCopyProcess ); + new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), retryStrategy, NullLogProvider.getInstance(), + NullLogProvider.getInstance(), storeCopyProcess, topologyService ); // when try @@ -132,9 +136,8 @@ public void shouldNotStartWithMismatchedNonEmptyStore() throws Throwable catch ( Exception ex ) { //expected. - assertThat( ex.getMessage(), containsString( - "This read replica cannot join the cluster. The local database is not empty and has a " + - "mismatching storeId" ) ); + assertThat( ex.getMessage(), + containsString( "This read replica cannot join the cluster. The local database is not empty and has a " + "mismatching storeId" ) ); } // then @@ -149,8 +152,8 @@ public void shouldStartWithMatchingDatabase() throws Throwable when( localDatabase.isEmpty() ).thenReturn( false ); ReadReplicaStartupProcess readReplicaStartupProcess = - new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), - retryStrategy, NullLogProvider.getInstance(), NullLogProvider.getInstance(), storeCopyProcess ); + new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), retryStrategy, NullLogProvider.getInstance(), + NullLogProvider.getInstance(), storeCopyProcess, topologyService ); // when readReplicaStartupProcess.start(); @@ -168,8 +171,8 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable when( localDatabase.isEmpty() ).thenReturn( false ); ReadReplicaStartupProcess readReplicaStartupProcess = - new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), - retryStrategy, NullLogProvider.getInstance(), NullLogProvider.getInstance(), storeCopyProcess ); + new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, chooseFirstMember(), retryStrategy, NullLogProvider.getInstance(), + NullLogProvider.getInstance(), storeCopyProcess, topologyService ); readReplicaStartupProcess.start();