From 6466cebfda3611b4eddb7fa9f3e6cbad3bf2e56d Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Fri, 19 Aug 2016 11:50:55 +0200 Subject: [PATCH] StoreCopyServer now read from paged file is mapping exists When performing a store copy, StoryCopyServer will now try to get the ByteChannel from the mapped PagedFile if a mapping exist, otherwise will behave as before and get the ByteChannel from the FileSystem. This makes it possible to support store copy from a device external to the normal file system. --- .../backup/OnlineBackupExtensionFactory.java | 6 ++++- .../backup/OnlineBackupKernelExtension.java | 10 ++++---- .../neo4j/com/storecopy/StoreCopyServer.java | 23 ++++++++++++++++--- .../com/storecopy/StoreCopyClientTest.java | 12 ++++++---- .../ha/cluster/DefaultMasterImplSPI.java | 14 +++++++---- .../factory/HighlyAvailableEditionModule.java | 3 ++- .../ha/cluster/DefaultMasterImplSPITest.java | 3 ++- 7 files changed, 52 insertions(+), 19 deletions(-) diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupExtensionFactory.java b/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupExtensionFactory.java index 75870e8ad29d..ea1e48c81bfd 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupExtensionFactory.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupExtensionFactory.java @@ -23,6 +23,7 @@ import org.neo4j.helpers.Service; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.extension.KernelExtensionFactory; @@ -62,6 +63,8 @@ public interface Dependencies Supplier logFileInformationSupplier(); FileSystemAbstraction fileSystemAbstraction(); + + PageCache pageCache(); } public OnlineBackupExtensionFactory() @@ -85,6 +88,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) dependencies.transactionIdStoreSupplier(), dependencies.logicalTransactionStoreSupplier(), dependencies.logFileInformationSupplier(), - dependencies.fileSystemAbstraction()); + dependencies.fileSystemAbstraction(), + dependencies.pageCache() ); } } diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupKernelExtension.java b/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupKernelExtension.java index 4cbdc23347fc..3a651b2d10f6 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupKernelExtension.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/OnlineBackupKernelExtension.java @@ -34,8 +34,7 @@ import org.neo4j.com.monitor.RequestMonitor; import org.neo4j.com.storecopy.StoreCopyServer; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.impl.util.CustomIOConfigValidator; -import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.store.StoreId; @@ -43,7 +42,9 @@ import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; +import org.neo4j.kernel.impl.util.CustomIOConfigValidator; import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException; +import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.monitoring.ByteCounterMonitor; import org.neo4j.kernel.monitoring.Monitors; @@ -82,13 +83,14 @@ public OnlineBackupKernelExtension( Config config, final GraphDatabaseAPI graphD final Supplier transactionIdStoreSupplier, final Supplier logicalTransactionStoreSupplier, final Supplier logFileInformationSupplier, - final FileSystemAbstraction fileSystemAbstraction) + final FileSystemAbstraction fileSystemAbstraction, + final PageCache pageCache ) { this( config, graphDatabaseAPI, () -> { TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get(); StoreCopyServer copier = new StoreCopyServer( neoStoreDataSource, checkPointerSupplier.get(), fileSystemAbstraction, new File( graphDatabaseAPI.getStoreDir() ), - monitors.newMonitor( StoreCopyServer.Monitor.class ) ); + monitors.newMonitor( StoreCopyServer.Monitor.class ), pageCache ); LogicalTransactionStore logicalTransactionStore = logicalTransactionStoreSupplier.get(); LogFileInformation logFileInformation = logFileInformationSupplier.get(); return new BackupImpl( copier, monitors, diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java index 21c9a9dc481d..fd78597c660f 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Optional; import org.neo4j.com.RequestContext; import org.neo4j.com.Response; @@ -29,7 +31,8 @@ import org.neo4j.graphdb.ResourceIterator; import org.neo4j.io.ByteUnit; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.StoreChannel; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo; @@ -114,15 +117,17 @@ public void finishStreamingTransactions( long endTxId ) private final FileSystemAbstraction fileSystem; private final File storeDirectory; private final Monitor monitor; + private final PageCache pageCache; public StoreCopyServer( NeoStoreDataSource dataSource, CheckPointer checkPointer, FileSystemAbstraction fileSystem, - File storeDirectory, Monitor monitor ) + File storeDirectory, Monitor monitor, PageCache pageCache ) { this.dataSource = dataSource; this.checkPointer = checkPointer; this.fileSystem = fileSystem; this.storeDirectory = getMostCanonicalFile( storeDirectory ); this.monitor = monitor; + this.pageCache = pageCache; } public Monitor monitor() @@ -155,13 +160,25 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW while ( files.hasNext() ) { File file = files.next().file(); - try ( StoreChannel fileChannel = fileSystem.open( file, "r" ) ) + + // Read from paged file if mapping exists. Otherwise read through file system. + final Optional optionalPagedFile = pageCache.tryMappedPagedFile( file ); + try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ? + optionalPagedFile.get().openReadableByteChannel() : + fileSystem.open( file, "r" ) ) { monitor.startStreamingStoreFile( file ); writer.write( relativePath( storeDirectory, file ), fileChannel, temporaryBuffer, file.length() > 0 ); monitor.finishStreamingStoreFile( file ); } + finally + { + if ( optionalPagedFile.isPresent() ) + { + optionalPagedFile.get().close(); + } + } } } finally diff --git a/enterprise/com/src/test/java/org/neo4j/com/storecopy/StoreCopyClientTest.java b/enterprise/com/src/test/java/org/neo4j/com/storecopy/StoreCopyClientTest.java index bbc0fc83ad35..9da556962301 100644 --- a/enterprise/com/src/test/java/org/neo4j/com/storecopy/StoreCopyClientTest.java +++ b/enterprise/com/src/test/java/org/neo4j/com/storecopy/StoreCopyClientTest.java @@ -300,12 +300,16 @@ public Response copyStore( StoreWriter writer ) CheckPointer checkPointer = original.getDependencyResolver().resolveDependency( CheckPointer.class ); - RequestContext requestContext = new StoreCopyServer( neoStoreDataSource, - checkPointer, fs, originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ) ) + PageCache pageCache = + original.getDependencyResolver().resolveDependency( PageCache.class ); + + RequestContext requestContext = new StoreCopyServer( neoStoreDataSource, checkPointer, fs, + originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ), pageCache ) .flushStoresAndStreamStoreFiles( "test", writer, false ); - final StoreId storeId = original.getDependencyResolver().resolveDependency( RecordStorageEngine.class ) - .testAccessNeoStores().getMetaDataStore().getStoreId(); + final StoreId storeId = + original.getDependencyResolver().resolveDependency( RecordStorageEngine.class ) + .testAccessNeoStores().getMetaDataStore().getStoreId(); ResponsePacker responsePacker = new ResponsePacker( logicalTransactionStore, transactionIdStore, () -> storeId ); diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPI.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPI.java index 2c00338968b5..437be6cdf935 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPI.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPI.java @@ -28,9 +28,7 @@ import org.neo4j.com.storecopy.StoreCopyServer; import org.neo4j.com.storecopy.StoreWriter; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.internal.GraphDatabaseAPI; -import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; -import org.neo4j.kernel.impl.store.id.IdType; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.ha.TransactionChecksumLookup; @@ -43,11 +41,14 @@ import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder; import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.id.IdGenerator; +import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; +import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; +import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.storageengine.api.TransactionApplicationMode; @@ -67,6 +68,7 @@ public class DefaultMasterImplSPI implements MasterImpl.SPI private final File storeDir; private final ResponsePacker responsePacker; private final Monitors monitors; + private final PageCache pageCache; private final TransactionCommitProcess transactionCommitProcess; private final CheckPointer checkPointer; @@ -81,7 +83,8 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb, CheckPointer checkPointer, TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore, - NeoStoreDataSource neoStoreDataSource) + NeoStoreDataSource neoStoreDataSource, + PageCache pageCache ) { this.graphDb = graphDb; @@ -99,6 +102,7 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb, this.txChecksumLookup = new TransactionChecksumLookup( transactionIdStore, logicalTransactionStore ); this.responsePacker = new ResponsePacker( logicalTransactionStore, transactionIdStore, graphDb::storeId ); this.monitors = monitors; + this.pageCache = pageCache; } @Override @@ -158,7 +162,7 @@ public long getTransactionChecksum( long txId ) throws IOException public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer ) { StoreCopyServer streamer = new StoreCopyServer( neoStoreDataSource, - checkPointer, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ) ); + checkPointer, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ), pageCache ); return streamer.flushStoresAndStreamStoreFiles( STORE_COPY_CHECKPOINT_TRIGGER, writer, false ); } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java index d2d394fafa65..d8635e893f5c 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java @@ -435,7 +435,8 @@ public void elected( String role, InstanceId instanceId, URI electedMember ) platformModule.dependencies.resolveDependency( CheckPointer.class ), platformModule.dependencies.resolveDependency( TransactionIdStore.class ), platformModule.dependencies.resolveDependency( LogicalTransactionStore.class ), - platformModule.dependencies.resolveDependency( NeoStoreDataSource.class )); + platformModule.dependencies.resolveDependency( NeoStoreDataSource.class ), + platformModule.dependencies.resolveDependency( PageCache.class ) ); final Factory conversationSPIFactory = () -> new DefaultConversationSPI( lockManager, platformModule.jobScheduler ); diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPITest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPITest.java index df2aa5d14dfd..3ecf4997b465 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPITest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultMasterImplSPITest.java @@ -26,6 +26,7 @@ import org.neo4j.com.storecopy.StoreWriter; import org.neo4j.helpers.collection.Iterators; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.core.LabelTokenHolder; @@ -61,7 +62,7 @@ public void flushStoreFilesWithCorrectCheckpointTriggerName() throws IOException mock( PropertyKeyTokenHolder.class ), mock( RelationshipTypeTokenHolder.class ), mock( IdGeneratorFactory.class ), mock( TransactionCommitProcess.class ), checkPointer, mock( TransactionIdStore.class ), mock( LogicalTransactionStore.class ), - dataSource ); + dataSource, mock( PageCache.class ) ); master.flushStoresAndStreamStoreFiles( mock( StoreWriter.class ) );