Skip to content

Commit

Permalink
rename store fetcher to remote store
Browse files Browse the repository at this point in the history
The class does more than fetch stores, it also retrieves the remote
store id and pulls transactions. It is simply an entrypoint for
remote store related RPC and this name feels more appropriate today.
  • Loading branch information
martinfurmanski committed Jan 18, 2017
1 parent 92eee07 commit 3879d6d
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 64 deletions.
Expand Up @@ -46,7 +46,10 @@
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

public class StoreFetcher
/**
* Entry point for remote store related RPC.
*/
public class RemoteStore
{
private final Log log;
private final Monitors monitors;
Expand All @@ -57,7 +60,7 @@ public class StoreFetcher
private final TxPullClient txPullClient;
private final TransactionLogCatchUpFactory transactionLogFactory;

public StoreFetcher( LogProvider logProvider,
public RemoteStore( LogProvider logProvider,
FileSystemAbstraction fs, PageCache pageCache,
StoreCopyClient storeCopyClient, TxPullClient txPullClient,
TransactionLogCatchUpFactory transactionLogFactory,
Expand Down Expand Up @@ -133,7 +136,7 @@ public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File
return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false );
}

public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
public void copy( MemberId from, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException
{
try
Expand Down Expand Up @@ -180,7 +183,7 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId,
}
}

public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException
public StoreId getStoreId( MemberId from ) throws StoreIdDownloadFailedException
{
return storeCopyClient.fetchStoreId( from );
}
Expand Down
Expand Up @@ -33,15 +33,15 @@ public class StoreCopyProcess
private final LocalDatabase localDatabase;
private final CopiedStoreRecovery copiedStoreRecovery;
private final Log log;
private final StoreFetcher storeFetcher;
private final RemoteStore remoteStore;

public StoreCopyProcess( FileSystemAbstraction fs, LocalDatabase localDatabase,
CopiedStoreRecovery copiedStoreRecovery, StoreFetcher storeFetcher, LogProvider logProvider )
CopiedStoreRecovery copiedStoreRecovery, RemoteStore remoteStore, LogProvider logProvider )
{
this.fs = fs;
this.localDatabase = localDatabase;
this.copiedStoreRecovery = copiedStoreRecovery;
this.storeFetcher = storeFetcher;
this.remoteStore = remoteStore;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -50,7 +50,7 @@ public void replaceWithStoreFrom( MemberId source, StoreId expectedStoreId )
{
try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, localDatabase.storeDir() ) )
{
storeFetcher.copyStore( source, expectedStoreId, tempStore.storeDir() );
remoteStore.copy( source, expectedStoreId, tempStore.storeDir() );
copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() );
localDatabase.replaceWith( tempStore.storeDir() );
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand Down Expand Up @@ -117,7 +117,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
CatchUpClient catchUpClient = life.add( new CatchUpClient( clusteringModule.topologyService(), logProvider,
Clocks.systemClock(), inactivityTimeoutMillis, monitors ) );

StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache,
RemoteStore remoteStore = new RemoteStore( logProvider, fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient, logProvider ), new TxPullClient( catchUpClient, platformModule.monitors ),
new TransactionLogCatchUpFactory(), platformModule.monitors );

Expand All @@ -127,11 +127,11 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
platformModule.kernelExtensions.listFactories(), platformModule.pageCache );
life.add( copiedStoreRecovery );

StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, localDatabase, copiedStoreRecovery, storeFetcher, logProvider );
StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, localDatabase, copiedStoreRecovery, remoteStore, logProvider );

LifeSupport servicesToStopOnStoreCopy = new LifeSupport();
CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, servicesToStopOnStoreCopy,
storeFetcher, catchUpClient, logProvider, storeCopyProcess );
remoteStore, catchUpClient, logProvider, storeCopyProcess );

if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.core.state.CoreState;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -42,18 +42,18 @@ public class CoreStateDownloader
{
private final LocalDatabase localDatabase;
private final Lifecycle startStopOnStoreCopy;
private final StoreFetcher storeFetcher;
private final RemoteStore remoteStore;
private final CatchUpClient catchUpClient;
private final Log log;
private final StoreCopyProcess storeCopyProcess;

public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy,
StoreFetcher storeFetcher, CatchUpClient catchUpClient, LogProvider logProvider,
RemoteStore remoteStore, CatchUpClient catchUpClient, LogProvider logProvider,
StoreCopyProcess storeCopyProcess )
{
this.localDatabase = localDatabase;
this.startStopOnStoreCopy = startStopOnStoreCopy;
this.storeFetcher = storeFetcher;
this.remoteStore = remoteStore;
this.catchUpClient = catchUpClient;
this.log = logProvider.getLog( getClass() );
this.storeCopyProcess = storeCopyProcess;
Expand All @@ -68,7 +68,7 @@ public synchronized void downloadSnapshot( MemberId source, CoreState coreState
/* Extract some key properties before shutting it down. */
boolean isEmptyStore = localDatabase.isEmpty();

StoreId remoteStoreId = storeFetcher.getStoreIdOf( source );
StoreId remoteStoreId = remoteStore.getStoreId( source );
if ( !isEmptyStore && !remoteStoreId.equals( localDatabase.storeId() ) )
{
throw new StoreCopyFailedException( "StoreId mismatch and not empty" );
Expand Down Expand Up @@ -104,7 +104,7 @@ public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot
{
StoreId localStoreId = localDatabase.storeId();
CatchupResult catchupResult =
storeFetcher.tryCatchingUp( source, localStoreId, localDatabase.storeDir() );
remoteStore.tryCatchingUp( source, localStoreId, localDatabase.storeDir() );

if ( catchupResult == E_TRANSACTION_PRUNED )
{
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
Expand Down Expand Up @@ -202,7 +202,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
databaseHealthSupplier,
logProvider );

StoreFetcher storeFetcher = new StoreFetcher( platformModule.logging.getInternalLogProvider(),
RemoteStore remoteStore = new RemoteStore( platformModule.logging.getInternalLogProvider(),
fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient, logProvider ),
new TxPullClient( catchUpClient, platformModule.monitors ),
Expand Down Expand Up @@ -238,7 +238,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
}

StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, localDatabase,
copiedStoreRecovery, storeFetcher, logProvider );
copiedStoreRecovery, remoteStore, logProvider );

CatchupPollingProcess catchupProcess =
new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy,
Expand All @@ -254,7 +254,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
txPulling.add( new WaitForUpToDateStore( catchupProcess, logProvider ) );

ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS );
life.add( new ReadReplicaStartupProcess( storeFetcher, localDatabase, txPulling,
life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling,
new ConnectToRandomCoreMember( discoveryService ),
retryStrategy, logProvider,
platformModule.logging.getUserLogProvider(), storeCopyProcess ) );
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.helper.RetryStrategy;
Expand All @@ -40,7 +40,7 @@

class ReadReplicaStartupProcess implements Lifecycle
{
private final StoreFetcher storeFetcher;
private final RemoteStore remoteStore;
private final LocalDatabase localDatabase;
private final Lifecycle txPulling;
private final CoreMemberSelectionStrategy connectionStrategy;
Expand All @@ -51,11 +51,11 @@ class ReadReplicaStartupProcess implements Lifecycle
private String lastIssue;
private final StoreCopyProcess storeCopyProcess;

ReadReplicaStartupProcess( StoreFetcher storeFetcher, LocalDatabase localDatabase,
ReadReplicaStartupProcess( RemoteStore remoteStore, LocalDatabase localDatabase,
Lifecycle txPulling, CoreMemberSelectionStrategy connectionStrategy, RetryStrategy retryStrategy,
LogProvider debugLogProvider, LogProvider userLogProvider, StoreCopyProcess storeCopyProcess )
{
this.storeFetcher = storeFetcher;
this.remoteStore = remoteStore;
this.localDatabase = localDatabase;
this.txPulling = txPulling;
this.connectionStrategy = connectionStrategy;
Expand Down Expand Up @@ -153,7 +153,7 @@ private void syncStoreWithCore( MemberId source ) throws IOException, StoreIdDow
debugLog.info( "Local database is empty, attempting to replace with copy from core server %s", source );

debugLog.info( "Finding store id of core server %s", source );
StoreId storeId = storeFetcher.getStoreIdOf( source );
StoreId storeId = remoteStore.getStoreId( source );

debugLog.info( "Copying store from core server %s", source );
localDatabase.delete();
Expand All @@ -170,7 +170,7 @@ private void syncStoreWithCore( MemberId source ) throws IOException, StoreIdDow
private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFailedException
{
StoreId localStoreId = localDatabase.storeId();
StoreId remoteStoreId = storeFetcher.getStoreIdOf( remoteCore );
StoreId remoteStoreId = remoteStore.getStoreId( remoteCore );
if ( !localStoreId.equals( remoteStoreId ) )
{
throw new IllegalStateException( format( "This read replica cannot join the cluster. " +
Expand Down
Expand Up @@ -48,7 +48,7 @@
import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;

public class StoreFetcherTest
public class RemoteStoreTest
{
@Test
public void shouldCopyStoreFilesAndPullTransactions() throws Exception
Expand All @@ -60,12 +60,12 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception
when( txPullClient.pullTransactions( any(), any(), anyLong(), any() ) ).thenReturn( new TxPullRequestResult( SUCCESS_END_OF_STREAM, 13) );
TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );

StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
RemoteStore remoteStore = new RemoteStore( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
null, storeCopyClient, txPullClient, factory( writer ), new Monitors() );

// when
MemberId localhost = new MemberId( UUID.randomUUID() );
fetcher.copyStore( localhost, storeId, new File( "destination" ) );
remoteStore.copy( localhost, storeId, new File( "destination" ) );

// then
verify( storeCopyClient ).copyStoreFiles( eq( localhost ), eq( storeId ), any( StoreFileStreams.class ) );
Expand All @@ -90,11 +90,11 @@ public void shouldSetLastPulledTransactionId() throws Exception

TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );

StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
RemoteStore remoteStore = new RemoteStore( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
null, storeCopyClient, txPullClient, factory( writer ), new Monitors() );

// when
fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) );
remoteStore.copy( localhost, wantedStoreId, new File( "destination" ) );

// then
long previousTxId = lastFlushedTxId - 1; // the interface is defined as asking for the one preceding
Expand All @@ -110,7 +110,7 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception
TxPullClient txPullClient = mock( TxPullClient.class );
TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );

StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
RemoteStore remoteStore = new RemoteStore( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
null,
storeCopyClient, txPullClient, factory( writer ), new Monitors() );

Expand All @@ -120,7 +120,7 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception
// when
try
{
fetcher.copyStore( null, storeId, null );
remoteStore.copy( null, storeId, null );
}
catch ( StoreCopyFailedException e )
{
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.util.concurrent.CompletableFuture;

import java.util.concurrent.Future;
Expand All @@ -32,7 +31,6 @@
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
Expand Down

0 comments on commit 3879d6d

Please sign in to comment.