From 0afc35dd2d7807d18bf9133a54c6eb642af24246 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 4 Oct 2018 11:31:49 +0200 Subject: [PATCH] Read replica copy store from single source Jumping around has proven to be dangerous in some circumstances because the read replica has no concept of a "primary" (leader) through the topology service and the transaction pull must happen from the leader, as it happens on cores, to make sure that all the required transactions get pulled down. --- .../catchup/CatchupAddressProvider.java | 25 ------------------- .../catchup/tx/CatchupPollingProcess.java | 16 ++++++------ .../ReadReplicaStartupProcess.java | 17 ++++++------- 3 files changed, 15 insertions(+), 43 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java index 3023c605034e..8164d85ad17f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupAddressProvider.java @@ -75,31 +75,6 @@ public AdvertisedSocketAddress secondary() } } - /** - * Uses given strategy for both primary and secondary address. - */ - class UpstreamStrategyBoundAddressProvider implements CatchupAddressProvider - { - private final UpstreamStrategyAddressSupplier upstreamStrategyAddressSupplier; - - public UpstreamStrategyBoundAddressProvider( TopologyService topologyService, UpstreamDatabaseStrategySelector strategySelector ) - { - upstreamStrategyAddressSupplier = new UpstreamStrategyAddressSupplier( strategySelector, topologyService ); - } - - @Override - public AdvertisedSocketAddress primary() throws CatchupAddressResolutionException - { - return upstreamStrategyAddressSupplier.get(); - } - - @Override - public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException - { - return upstreamStrategyAddressSupplier.get(); - } - } - /** * Uses leader address as primary and given upstream strategy as secondary address. */ diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java index c6a26ff26b05..f7c1066e05f4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -30,7 +30,7 @@ import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; -import org.neo4j.causalclustering.catchup.CatchupAddressProvider; +import org.neo4j.causalclustering.catchup.CatchupAddressProvider.SingleAddressProvider; import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; @@ -93,7 +93,7 @@ enum State private final StoreCopyProcess storeCopyProcess; private final Supplier databaseHealthSupplier; private final CatchUpClient catchUpClient; - private final UpstreamDatabaseStrategySelector selectionStrategyPipeline; + private final UpstreamDatabaseStrategySelector selectionStrategy; private final TimerService timerService; private final long txPullIntervalMillis; private final BatchingTxApplier applier; @@ -116,7 +116,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba this.log = logProvider.getLog( getClass() ); this.enableDisableOnStoreCopy = enableDisableOnSoreCopy; this.catchUpClient = catchUpClient; - this.selectionStrategyPipeline = selectionStrategy; + this.selectionStrategy = selectionStrategy; this.timerService = timerService; this.txPullIntervalMillis = txPullIntervalMillis; this.applier = applier; @@ -198,7 +198,7 @@ private void pullTransactions() MemberId upstream; try { - upstream = selectionStrategyPipeline.bestUpstreamDatabase(); + upstream = selectionStrategy.bestUpstreamDatabase(); } catch ( UpstreamDatabaseSelectionException e ) { @@ -325,11 +325,11 @@ private void downloadDatabase( StoreId localStoreId ) try { - CatchupAddressProvider.UpstreamStrategyBoundAddressProvider upstreamStrategyBoundAddressProvider = - new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline ); - storeCopyProcess.replaceWithStoreFrom( upstreamStrategyBoundAddressProvider, localStoreId ); + MemberId source = selectionStrategy.bestUpstreamDatabase(); + AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) ); + storeCopyProcess.replaceWithStoreFrom( new SingleAddressProvider( fromAddress ), localStoreId ); } - catch ( IOException | StoreCopyFailedException e ) + catch ( IOException | StoreCopyFailedException | UpstreamDatabaseSelectionException | TopologyLookupException e ) { log.warn( "Error copying store. Will retry shortly.", e ); return; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java index b39f9b76d971..4cab26515ac3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcess.java @@ -24,7 +24,7 @@ import java.io.IOException; -import org.neo4j.causalclustering.catchup.CatchupAddressProvider; +import org.neo4j.causalclustering.catchup.CatchupAddressProvider.SingleAddressProvider; import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.RemoteStore; @@ -54,20 +54,20 @@ class ReadReplicaStartupProcess implements Lifecycle private final Log userLog; private final TimeoutStrategy timeoutStrategy; - private final UpstreamDatabaseStrategySelector selectionStrategyPipeline; + private final UpstreamDatabaseStrategySelector selectionStrategy; private final TopologyService topologyService; private String lastIssue; private final StoreCopyProcess storeCopyProcess; ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase, Lifecycle txPulling, - UpstreamDatabaseStrategySelector selectionStrategyPipeline, TimeoutStrategy timeoutStrategy, LogProvider debugLogProvider, + UpstreamDatabaseStrategySelector selectionStrategy, TimeoutStrategy timeoutStrategy, LogProvider debugLogProvider, LogProvider userLogProvider, StoreCopyProcess storeCopyProcess, TopologyService topologyService ) { this.remoteStore = remoteStore; this.localDatabase = localDatabase; this.txPulling = txPulling; - this.selectionStrategyPipeline = selectionStrategyPipeline; + this.selectionStrategy = selectionStrategy; this.timeoutStrategy = timeoutStrategy; this.debugLog = debugLogProvider.getLog( getClass() ); this.userLog = userLogProvider.getLog( getClass() ); @@ -99,7 +99,7 @@ public void start() throws IOException, DatabaseShutdownException MemberId source = null; try { - source = selectionStrategyPipeline.bestUpstreamDatabase(); + source = selectionStrategy.bestUpstreamDatabase(); syncStoreWithUpstream( source ); syncedWithUpstream = true; } @@ -163,15 +163,12 @@ private void syncStoreWithUpstream( MemberId source ) throws IOException, StoreI debugLog.info( "Local database is empty, attempting to replace with copy from upstream server %s", source ); debugLog.info( "Finding store id of upstream server %s", source ); - AdvertisedSocketAddress fromAddress = - topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) ); + AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) ); StoreId storeId = remoteStore.getStoreId( fromAddress ); debugLog.info( "Copying store from upstream server %s", source ); localDatabase.delete(); - CatchupAddressProvider.UpstreamStrategyBoundAddressProvider addressProvider = - new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline ); - storeCopyProcess.replaceWithStoreFrom( addressProvider, storeId ); + storeCopyProcess.replaceWithStoreFrom( new SingleAddressProvider( fromAddress ), storeId ); debugLog.info( "Restarting local database after copy.", source ); }