Skip to content

Commit

Permalink
StoreCopyServer now read from paged file is mapping exists
Browse files Browse the repository at this point in the history
When performing a store copy, StoryCopyServer will now try to get the ByteChannel from the mapped PagedFile if a mapping exist, otherwise will behave as before and get the ByteChannel from the FileSystem.

 This makes it possible to support store copy from a device external to the normal file system.
  • Loading branch information
burqen authored and chrisvest committed Sep 16, 2016
1 parent 2a07ebc commit 6466ceb
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
Expand Down Expand Up @@ -62,6 +63,8 @@ public interface Dependencies
Supplier<LogFileInformation> logFileInformationSupplier();

FileSystemAbstraction fileSystemAbstraction();

PageCache pageCache();
}

public OnlineBackupExtensionFactory()
Expand All @@ -85,6 +88,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
dependencies.transactionIdStoreSupplier(),
dependencies.logicalTransactionStoreSupplier(),
dependencies.logFileInformationSupplier(),
dependencies.fileSystemAbstraction());
dependencies.fileSystemAbstraction(),
dependencies.pageCache() );
}
}
Expand Up @@ -34,16 +34,17 @@
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.StoreCopyServer;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.util.CustomIOConfigValidator;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.util.CustomIOConfigValidator;
import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -82,13 +83,14 @@ public OnlineBackupKernelExtension( Config config, final GraphDatabaseAPI graphD
final Supplier<TransactionIdStore> transactionIdStoreSupplier,
final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
final Supplier<LogFileInformation> logFileInformationSupplier,
final FileSystemAbstraction fileSystemAbstraction)
final FileSystemAbstraction fileSystemAbstraction,
final PageCache pageCache )
{
this( config, graphDatabaseAPI, () -> {
TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get();
StoreCopyServer copier = new StoreCopyServer( neoStoreDataSource, checkPointerSupplier.get(),
fileSystemAbstraction, new File( graphDatabaseAPI.getStoreDir() ),
monitors.newMonitor( StoreCopyServer.Monitor.class ) );
monitors.newMonitor( StoreCopyServer.Monitor.class ), pageCache );
LogicalTransactionStore logicalTransactionStore = logicalTransactionStoreSupplier.get();
LogFileInformation logFileInformation = logFileInformationSupplier.get();
return new BackupImpl( copier, monitors,
Expand Down
Expand Up @@ -22,14 +22,17 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Optional;

import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.com.ServerFailureException;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
Expand Down Expand Up @@ -114,15 +117,17 @@ public void finishStreamingTransactions( long endTxId )
private final FileSystemAbstraction fileSystem;
private final File storeDirectory;
private final Monitor monitor;
private final PageCache pageCache;

public StoreCopyServer( NeoStoreDataSource dataSource, CheckPointer checkPointer, FileSystemAbstraction fileSystem,
File storeDirectory, Monitor monitor )
File storeDirectory, Monitor monitor, PageCache pageCache )
{
this.dataSource = dataSource;
this.checkPointer = checkPointer;
this.fileSystem = fileSystem;
this.storeDirectory = getMostCanonicalFile( storeDirectory );
this.monitor = monitor;
this.pageCache = pageCache;
}

public Monitor monitor()
Expand Down Expand Up @@ -155,13 +160,25 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
while ( files.hasNext() )
{
File file = files.next().file();
try ( StoreChannel fileChannel = fileSystem.open( file, "r" ) )

// Read from paged file if mapping exists. Otherwise read through file system.
final Optional<PagedFile> optionalPagedFile = pageCache.tryMappedPagedFile( file );
try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ?
optionalPagedFile.get().openReadableByteChannel() :
fileSystem.open( file, "r" ) )
{
monitor.startStreamingStoreFile( file );
writer.write( relativePath( storeDirectory, file ), fileChannel,
temporaryBuffer, file.length() > 0 );
monitor.finishStreamingStoreFile( file );
}
finally
{
if ( optionalPagedFile.isPresent() )
{
optionalPagedFile.get().close();
}
}
}
}
finally
Expand Down
Expand Up @@ -300,12 +300,16 @@ public Response<?> copyStore( StoreWriter writer )
CheckPointer checkPointer =
original.getDependencyResolver().resolveDependency( CheckPointer.class );

RequestContext requestContext = new StoreCopyServer( neoStoreDataSource,
checkPointer, fs, originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ) )
PageCache pageCache =
original.getDependencyResolver().resolveDependency( PageCache.class );

RequestContext requestContext = new StoreCopyServer( neoStoreDataSource, checkPointer, fs,
originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ), pageCache )
.flushStoresAndStreamStoreFiles( "test", writer, false );

final StoreId storeId = original.getDependencyResolver().resolveDependency( RecordStorageEngine.class )
.testAccessNeoStores().getMetaDataStore().getStoreId();
final StoreId storeId =
original.getDependencyResolver().resolveDependency( RecordStorageEngine.class )
.testAccessNeoStores().getMetaDataStore().getStoreId();

ResponsePacker responsePacker = new ResponsePacker( logicalTransactionStore,
transactionIdStore, () -> storeId );
Expand Down
Expand Up @@ -28,9 +28,7 @@
import org.neo4j.com.storecopy.StoreCopyServer;
import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.TransactionChecksumLookup;
Expand All @@ -43,11 +41,14 @@
import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.TransactionApplicationMode;

Expand All @@ -67,6 +68,7 @@ public class DefaultMasterImplSPI implements MasterImpl.SPI
private final File storeDir;
private final ResponsePacker responsePacker;
private final Monitors monitors;
private final PageCache pageCache;

private final TransactionCommitProcess transactionCommitProcess;
private final CheckPointer checkPointer;
Expand All @@ -81,7 +83,8 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb,
CheckPointer checkPointer,
TransactionIdStore transactionIdStore,
LogicalTransactionStore logicalTransactionStore,
NeoStoreDataSource neoStoreDataSource)
NeoStoreDataSource neoStoreDataSource,
PageCache pageCache )
{
this.graphDb = graphDb;

Expand All @@ -99,6 +102,7 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb,
this.txChecksumLookup = new TransactionChecksumLookup( transactionIdStore, logicalTransactionStore );
this.responsePacker = new ResponsePacker( logicalTransactionStore, transactionIdStore, graphDb::storeId );
this.monitors = monitors;
this.pageCache = pageCache;
}

@Override
Expand Down Expand Up @@ -158,7 +162,7 @@ public long getTransactionChecksum( long txId ) throws IOException
public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer )
{
StoreCopyServer streamer = new StoreCopyServer( neoStoreDataSource,
checkPointer, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ) );
checkPointer, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ), pageCache );
return streamer.flushStoresAndStreamStoreFiles( STORE_COPY_CHECKPOINT_TRIGGER, writer, false );
}

Expand Down
Expand Up @@ -435,7 +435,8 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
platformModule.dependencies.resolveDependency( CheckPointer.class ),
platformModule.dependencies.resolveDependency( TransactionIdStore.class ),
platformModule.dependencies.resolveDependency( LogicalTransactionStore.class ),
platformModule.dependencies.resolveDependency( NeoStoreDataSource.class ));
platformModule.dependencies.resolveDependency( NeoStoreDataSource.class ),
platformModule.dependencies.resolveDependency( PageCache.class ) );

final Factory<ConversationSPI> conversationSPIFactory =
() -> new DefaultConversationSPI( lockManager, platformModule.jobScheduler );
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.core.LabelTokenHolder;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void flushStoreFilesWithCorrectCheckpointTriggerName() throws IOException
mock( PropertyKeyTokenHolder.class ), mock( RelationshipTypeTokenHolder.class ),
mock( IdGeneratorFactory.class ), mock( TransactionCommitProcess.class ), checkPointer,
mock( TransactionIdStore.class ), mock( LogicalTransactionStore.class ),
dataSource );
dataSource, mock( PageCache.class ) );

master.flushStoresAndStreamStoreFiles( mock( StoreWriter.class ) );

Expand Down

0 comments on commit 6466ceb

Please sign in to comment.