Skip to content

Commit

Permalink
Read replica copy store from single source
Browse files Browse the repository at this point in the history
Jumping around has proven to be dangerous in some circumstances because
the read replica has no concept of a "primary" (leader) through the
topology service and the transaction pull must happen from the leader,
as it happens on cores, to make sure that all the required transactions
get pulled down.
  • Loading branch information
martinfurmanski committed Oct 4, 2018
1 parent 0f2c818 commit 0afc35d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 43 deletions.
Expand Up @@ -75,31 +75,6 @@ public AdvertisedSocketAddress secondary()
} }
} }


/**
* Uses given strategy for both primary and secondary address.
*/
class UpstreamStrategyBoundAddressProvider implements CatchupAddressProvider
{
private final UpstreamStrategyAddressSupplier upstreamStrategyAddressSupplier;

public UpstreamStrategyBoundAddressProvider( TopologyService topologyService, UpstreamDatabaseStrategySelector strategySelector )
{
upstreamStrategyAddressSupplier = new UpstreamStrategyAddressSupplier( strategySelector, topologyService );
}

@Override
public AdvertisedSocketAddress primary() throws CatchupAddressResolutionException
{
return upstreamStrategyAddressSupplier.get();
}

@Override
public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException
{
return upstreamStrategyAddressSupplier.get();
}
}

/** /**
* Uses leader address as primary and given upstream strategy as secondary address. * Uses leader address as primary and given upstream strategy as secondary address.
*/ */
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.CatchupAddressProvider.SingleAddressProvider;
import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException; import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
Expand Down Expand Up @@ -93,7 +93,7 @@ enum State
private final StoreCopyProcess storeCopyProcess; private final StoreCopyProcess storeCopyProcess;
private final Supplier<DatabaseHealth> databaseHealthSupplier; private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final CatchUpClient catchUpClient; private final CatchUpClient catchUpClient;
private final UpstreamDatabaseStrategySelector selectionStrategyPipeline; private final UpstreamDatabaseStrategySelector selectionStrategy;
private final TimerService timerService; private final TimerService timerService;
private final long txPullIntervalMillis; private final long txPullIntervalMillis;
private final BatchingTxApplier applier; private final BatchingTxApplier applier;
Expand All @@ -116,7 +116,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.enableDisableOnStoreCopy = enableDisableOnSoreCopy; this.enableDisableOnStoreCopy = enableDisableOnSoreCopy;
this.catchUpClient = catchUpClient; this.catchUpClient = catchUpClient;
this.selectionStrategyPipeline = selectionStrategy; this.selectionStrategy = selectionStrategy;
this.timerService = timerService; this.timerService = timerService;
this.txPullIntervalMillis = txPullIntervalMillis; this.txPullIntervalMillis = txPullIntervalMillis;
this.applier = applier; this.applier = applier;
Expand Down Expand Up @@ -198,7 +198,7 @@ private void pullTransactions()
MemberId upstream; MemberId upstream;
try try
{ {
upstream = selectionStrategyPipeline.bestUpstreamDatabase(); upstream = selectionStrategy.bestUpstreamDatabase();
} }
catch ( UpstreamDatabaseSelectionException e ) catch ( UpstreamDatabaseSelectionException e )
{ {
Expand Down Expand Up @@ -325,11 +325,11 @@ private void downloadDatabase( StoreId localStoreId )


try try
{ {
CatchupAddressProvider.UpstreamStrategyBoundAddressProvider upstreamStrategyBoundAddressProvider = MemberId source = selectionStrategy.bestUpstreamDatabase();
new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline ); AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) );
storeCopyProcess.replaceWithStoreFrom( upstreamStrategyBoundAddressProvider, localStoreId ); storeCopyProcess.replaceWithStoreFrom( new SingleAddressProvider( fromAddress ), localStoreId );
} }
catch ( IOException | StoreCopyFailedException e ) catch ( IOException | StoreCopyFailedException | UpstreamDatabaseSelectionException | TopologyLookupException e )
{ {
log.warn( "Error copying store. Will retry shortly.", e ); log.warn( "Error copying store. Will retry shortly.", e );
return; return;
Expand Down
Expand Up @@ -24,7 +24,7 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.CatchupAddressProvider.SingleAddressProvider;
import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException; import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore; import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
Expand Down Expand Up @@ -54,20 +54,20 @@ class ReadReplicaStartupProcess implements Lifecycle
private final Log userLog; private final Log userLog;


private final TimeoutStrategy timeoutStrategy; private final TimeoutStrategy timeoutStrategy;
private final UpstreamDatabaseStrategySelector selectionStrategyPipeline; private final UpstreamDatabaseStrategySelector selectionStrategy;
private final TopologyService topologyService; private final TopologyService topologyService;


private String lastIssue; private String lastIssue;
private final StoreCopyProcess storeCopyProcess; private final StoreCopyProcess storeCopyProcess;


ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase, Lifecycle txPulling, ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase, Lifecycle txPulling,
UpstreamDatabaseStrategySelector selectionStrategyPipeline, TimeoutStrategy timeoutStrategy, LogProvider debugLogProvider, UpstreamDatabaseStrategySelector selectionStrategy, TimeoutStrategy timeoutStrategy, LogProvider debugLogProvider,
LogProvider userLogProvider, StoreCopyProcess storeCopyProcess, TopologyService topologyService ) LogProvider userLogProvider, StoreCopyProcess storeCopyProcess, TopologyService topologyService )
{ {
this.remoteStore = remoteStore; this.remoteStore = remoteStore;
this.localDatabase = localDatabase; this.localDatabase = localDatabase;
this.txPulling = txPulling; this.txPulling = txPulling;
this.selectionStrategyPipeline = selectionStrategyPipeline; this.selectionStrategy = selectionStrategy;
this.timeoutStrategy = timeoutStrategy; this.timeoutStrategy = timeoutStrategy;
this.debugLog = debugLogProvider.getLog( getClass() ); this.debugLog = debugLogProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() ); this.userLog = userLogProvider.getLog( getClass() );
Expand Down Expand Up @@ -99,7 +99,7 @@ public void start() throws IOException, DatabaseShutdownException
MemberId source = null; MemberId source = null;
try try
{ {
source = selectionStrategyPipeline.bestUpstreamDatabase(); source = selectionStrategy.bestUpstreamDatabase();
syncStoreWithUpstream( source ); syncStoreWithUpstream( source );
syncedWithUpstream = true; syncedWithUpstream = true;
} }
Expand Down Expand Up @@ -163,15 +163,12 @@ private void syncStoreWithUpstream( MemberId source ) throws IOException, StoreI
debugLog.info( "Local database is empty, attempting to replace with copy from upstream server %s", source ); debugLog.info( "Local database is empty, attempting to replace with copy from upstream server %s", source );


debugLog.info( "Finding store id of upstream server %s", source ); debugLog.info( "Finding store id of upstream server %s", source );
AdvertisedSocketAddress fromAddress = AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) );
topologyService.findCatchupAddress( source ).orElseThrow( () -> new TopologyLookupException( source ) );
StoreId storeId = remoteStore.getStoreId( fromAddress ); StoreId storeId = remoteStore.getStoreId( fromAddress );


debugLog.info( "Copying store from upstream server %s", source ); debugLog.info( "Copying store from upstream server %s", source );
localDatabase.delete(); localDatabase.delete();
CatchupAddressProvider.UpstreamStrategyBoundAddressProvider addressProvider = storeCopyProcess.replaceWithStoreFrom( new SingleAddressProvider( fromAddress ), storeId );
new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline );
storeCopyProcess.replaceWithStoreFrom( addressProvider, storeId );


debugLog.info( "Restarting local database after copy.", source ); debugLog.info( "Restarting local database after copy.", source );
} }
Expand Down

0 comments on commit 0afc35d

Please sign in to comment.