Skip to content

Commit

Permalink
Change lifecycle of StoreCopyCheckPointMutex.
Browse files Browse the repository at this point in the history
Move StoreCopyCheckPointMutex to a datasource level instead of
platform level.
  • Loading branch information
MishaDemianenko committed Jun 18, 2018
1 parent 0917618 commit 11ea4fe
Show file tree
Hide file tree
Showing 17 changed files with 38 additions and 96 deletions.
Expand Up @@ -883,6 +883,11 @@ public void afterModeSwitch()
clearTransactions();
}

public StoreCopyCheckPointMutex getStoreCopyCheckPointMutex()
{
return storeCopyCheckPointMutex;
}

// For test purposes only, not thread safe
public LifeSupport getLife()
{
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.transaction.log.files.LogFileCreationMonitor;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.impl.util.Dependencies;
Expand All @@ -66,6 +67,8 @@ public class DataSourceModule

public final Supplier<QueryExecutionEngine> queryExecutor;

public final StoreCopyCheckPointMutex storeCopyCheckPointMutex;

public final TransactionEventHandlers transactionEventHandlers;

public final Supplier<StoreId> storeId;
Expand Down Expand Up @@ -110,6 +113,9 @@ public DataSourceModule( final PlatformModule platformModule, EditionModule edit

NonTransactionalTokenNameLookup tokenNameLookup = new NonTransactionalTokenNameLookup( editionModule.tokenHolders );

storeCopyCheckPointMutex = new StoreCopyCheckPointMutex();
deps.satisfyDependency( storeCopyCheckPointMutex );

neoStoreDataSource = deps.satisfyDependency( new NeoStoreDataSource(
storeDir,
config,
Expand Down Expand Up @@ -139,7 +145,7 @@ public DataSourceModule( final PlatformModule platformModule, EditionModule edit
platformModule.availabilityGuard,
platformModule.clock,
editionModule.accessCapability,
platformModule.storeCopyCheckPointMutex,
storeCopyCheckPointMutex,
platformModule.recoveryCleanupWorkCollector,
editionModule.idController,
platformModule.databaseInfo.operationalMode,
Expand Down
Expand Up @@ -51,7 +51,6 @@
import org.neo4j.kernel.impl.spi.SimpleKernelContext;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerMonitor;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.impl.util.collection.CollectionsFactorySupplier;
import org.neo4j.kernel.info.DiagnosticsManager;
Expand Down Expand Up @@ -122,7 +121,6 @@ public class PlatformModule

public final SystemNanoClock clock;

public final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
public final VersionContextSupplier versionContextSupplier;

public final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector;
Expand Down Expand Up @@ -214,9 +212,6 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa

urlAccessRule = dependencies.satisfyDependency( URLAccessRules.combined( externalDependencies.urlAccessRules() ) );

storeCopyCheckPointMutex = new StoreCopyCheckPointMutex();
dependencies.satisfyDependency( storeCopyCheckPointMutex );

connectorPortRegister = new ConnectorPortRegister();
dependencies.satisfyDependency( connectorPortRegister );

Expand Down
Expand Up @@ -100,7 +100,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
return new OnlineBackupKernelExtension( dependencies.getConfig(), dependencies.getGraphDatabaseAPI(),
dependencies.logService().getInternalLogProvider(), dependencies.monitors(), dependencies.neoStoreDataSource(), dependencies.checkPointer(),
dependencies.transactionIdStoreSupplier(), dependencies.logicalTransactionStoreSupplier(), dependencies.logFileInformationSupplier(),
dependencies.fileSystemAbstraction(), dependencies.pageCache(), dependencies.storeCopyCheckPointMutex() );
dependencies.fileSystemAbstraction() );
}
return new LifecycleAdapter();
}
Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.StoreCopyServer;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
Expand All @@ -47,7 +46,6 @@
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifecycle;
Expand Down Expand Up @@ -91,16 +89,14 @@ public OnlineBackupKernelExtension( Config config, final GraphDatabaseAPI graphD
final Supplier<TransactionIdStore> transactionIdStoreSupplier,
final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
final Supplier<LogFileInformation> logFileInformationSupplier,
final FileSystemAbstraction fileSystemAbstraction,
final PageCache pageCache,
final StoreCopyCheckPointMutex storeCopyCheckPointMutex )
final FileSystemAbstraction fileSystemAbstraction )
{
this( config, graphDatabaseAPI, () ->
{
TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get();
StoreCopyServer copier = new StoreCopyServer( neoStoreDataSource, checkPointerSupplier.get(),
fileSystemAbstraction, graphDatabaseAPI.getStoreDir(),
monitors.newMonitor( StoreCopyServer.Monitor.class ), storeCopyCheckPointMutex );
monitors.newMonitor( StoreCopyServer.Monitor.class ) );
LogicalTransactionStore logicalTransactionStore = logicalTransactionStoreSupplier.get();
LogFileInformation logFileInformation = logFileInformationSupplier.get();
return new BackupImpl( copier, logicalTransactionStore, transactionIdStore, logFileInformation,
Expand Down

This file was deleted.

Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

Expand All @@ -57,14 +56,13 @@ public class RegularCatchupServerHandler implements CatchupServerHandler
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CoreSnapshotService snapshotService;
private final Supplier<CheckPointer> checkPointerSupplier;

public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, Supplier<CheckPointer> checkPointerSupplier )
CoreSnapshotService snapshotService, Supplier<CheckPointer> checkPointerSupplier )
{
this.monitors = monitors;
this.logProvider = logProvider;
Expand All @@ -74,7 +72,6 @@ public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider,
this.dataSourceSupplier = dataSourceSupplier;
this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier;
this.fs = fs;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.snapshotService = snapshotService;
this.checkPointerSupplier = checkPointerSupplier;
}
Expand All @@ -95,8 +92,7 @@ public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupSer
@Override
public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier,
new PrepareStoreCopyFilesProvider( fs ) );
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, dataSourceSupplier, new PrepareStoreCopyFilesProvider( fs ) );
}

@Override
Expand Down
Expand Up @@ -36,26 +36,22 @@
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;

import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.hasSameStoreId;

public class PrepareStoreCopyRequestHandler extends SimpleChannelInboundHandler<PrepareStoreCopyRequest>
{
private final CatchupServerProtocol protocol;
private final Supplier<CheckPointer> checkPointerSupplier;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider;
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final StoreFileStreamingProtocol streamingProtocol = new StoreFileStreamingProtocol();

public PrepareStoreCopyRequestHandler( CatchupServerProtocol catchupServerProtocol, Supplier<CheckPointer> checkPointerSupplier,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, Supplier<NeoStoreDataSource> dataSourceSupplier,
PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider )
Supplier<NeoStoreDataSource> dataSourceSupplier, PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider )
{
this.protocol = catchupServerProtocol;
this.checkPointerSupplier = checkPointerSupplier;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.prepareStoreCopyFilesProvider = prepareStoreCopyFilesProvider;
this.dataSourceSupplier = dataSourceSupplier;
}
Expand Down Expand Up @@ -106,6 +102,6 @@ private PrepareStoreCopyResponse createSuccessfulResponse( CheckPointer checkPoi

private Resource tryCheckpointAndAcquireMutex( CheckPointer checkPointer ) throws IOException
{
return storeCopyCheckPointMutex.storeCopy( () -> checkPointer.tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) ) );
return dataSourceSupplier.get().getStoreCopyCheckPointMutex().storeCopy( () -> checkPointer.tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) ) );
}
}
Expand Up @@ -34,7 +34,6 @@
import org.neo4j.causalclustering.catchup.CatchupClientBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerHandler;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
Expand Down Expand Up @@ -80,6 +79,7 @@
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -184,8 +184,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.storeCopyCheckPointMutex, snapshotService,
new CheckpointerSupplier( platformModule.dependencies ) );
fileSystem, snapshotService, platformModule.dependencies.provideDependency( CheckPointer.class ) );

catchupServer = new CatchupServerBuilder( catchupServerHandler )
.serverHandler( installedProtocolsHandler )
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.causalclustering.catchup.CatchUpResponseHandler;
import org.neo4j.causalclustering.catchup.CatchupProtocolClientInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
Expand Down Expand Up @@ -128,6 +127,7 @@
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogFiles;
Expand Down Expand Up @@ -321,7 +321,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.storeCopyCheckPointMutex, null, new CheckpointerSupplier( platformModule.dependencies ) );
fileSystem, null, platformModule.dependencies.provideDependency( CheckPointer.class ) );

InstalledProtocolHandler installedProtocolHandler = new InstalledProtocolHandler(); // TODO: hook into a procedure
Server catchupServer = new CatchupServerBuilder( catchupServerHandler )
Expand Down
Expand Up @@ -64,11 +64,12 @@ public class PrepareStoreCopyRequestHandlerTest
public void setup()
{
StoreCopyCheckPointMutex storeCopyCheckPointMutex = new StoreCopyCheckPointMutex();
PrepareStoreCopyRequestHandler subject = createHandler( storeCopyCheckPointMutex );
when( neoStoreDataSource.getStoreCopyCheckPointMutex() ).thenReturn( storeCopyCheckPointMutex );
PrepareStoreCopyRequestHandler subject = createHandler();
embeddedChannel = new EmbeddedChannel( subject );
}

private PrepareStoreCopyRequestHandler createHandler( StoreCopyCheckPointMutex storeCopyCheckPointMutex )
private PrepareStoreCopyRequestHandler createHandler()
{
catchupServerProtocol = new CatchupServerProtocol();
catchupServerProtocol.expect( CatchupServerProtocol.State.PREPARE_STORE_COPY );
Expand All @@ -79,7 +80,7 @@ private PrepareStoreCopyRequestHandler createHandler( StoreCopyCheckPointMutex s
PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider = mock( PrepareStoreCopyFilesProvider.class );
when( prepareStoreCopyFilesProvider.prepareStoreCopyFiles( any() ) ).thenReturn( prepareStoreCopyFiles );

return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier,
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, dataSourceSupplier,
prepareStoreCopyFilesProvider );
}

Expand Down Expand Up @@ -131,7 +132,8 @@ public void shouldRetainLockWhileStreaming() throws Exception
when( channelHandlerContext.writeAndFlush( any( PrepareStoreCopyResponse.class ) ) ).thenReturn( channelPromise );

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
PrepareStoreCopyRequestHandler subjectHandler = createHandler( new StoreCopyCheckPointMutex( lock ) );
when( neoStoreDataSource.getStoreCopyCheckPointMutex() ).thenReturn( new StoreCopyCheckPointMutex( lock ) );
PrepareStoreCopyRequestHandler subjectHandler = createHandler();

// and
LongSet indexIds = LongSets.immutable.of( 42 );
Expand Down
Expand Up @@ -95,7 +95,7 @@ private static ChildInitializer childInitializer( FileSystemAbstraction fileSyst

RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( new Monitors(), logProvider,
() -> storeId, dependencies.provideDependency( TransactionIdStore.class ), dependencies.provideDependency( LogicalTransactionStore.class ),
dataSource, availability, fileSystem, storeCopyCheckPointMutex, null, checkPointer );
dataSource, availability, fileSystem, null, checkPointer );

NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VoidPipelineWrapperFactory.VOID_WRAPPER );
CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, logProvider,
Expand Down

0 comments on commit 11ea4fe

Please sign in to comment.