From 817faf2ba3072ab67db98cc842a24c45e8c794c1 Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Tue, 23 Aug 2016 12:18:02 +0200 Subject: [PATCH] StoreCopyClient now writes through provided PageCache if file is a store file When performing a store copy, if file is a store file, StoryCopyClient will now map the target file with the PageCache and write to the file through an opened ByteChannel on the paged file. If file is not a store file, then behavior is same as before. This makes it possible to support store copy to a device external to the normal file system. --- .../neo4j/kernel/impl/store/StoreType.java | 28 ++++++- .../neo4j/com/storecopy/StoreCopyClient.java | 2 +- .../neo4j/com/storecopy/StoreCopyServer.java | 2 +- .../com/storecopy/ToFileStoreWriter.java | 79 ++++++++++++++++--- 4 files changed, 93 insertions(+), 18 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/StoreType.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/StoreType.java index fde9813fdbb8b..dd824452c18c3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/StoreType.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/StoreType.java @@ -20,6 +20,7 @@ package org.neo4j.kernel.impl.store; import java.io.IOException; +import java.util.Optional; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.impl.store.counts.CountsTracker; @@ -226,16 +227,35 @@ void close( NeoStores me, Object object ) * @throws IllegalStateException if can't determine store type for specified file */ public static StoreType typeOf( String storeFileName ) + { + final Optional optional = isStoreType( storeFileName ); + if ( optional.isPresent() ) + { + return optional.get(); + } + else + { + throw new IllegalArgumentException( "No enum constant for " + storeFileName + " file." ); + } + } + + /** + * Determine if a file name corresponds to any store type. + * + * @param fileName - name of the file to check for corresponding store type + * @return {@link Optional} wrapping store type, or empty optional if no store type corresponds to file name + */ + public static Optional isStoreType( String fileName ) { StoreType[] values = StoreType.values(); for ( StoreType value : values ) { - if ( value.getStoreName().equals( storeFileName ) || - storeFileName.equals( MetaDataStore.DEFAULT_NAME + value.getStoreName() ) ) + if ( value.getStoreName().equals( fileName ) || + fileName.equals( MetaDataStore.DEFAULT_NAME + value.getStoreName() ) ) { - return value; + return Optional.of( value ); } } - throw new IllegalArgumentException( "No enum constant for " + storeFileName + " file." ); + return Optional.empty(); } } diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java index b695fed5100db..9b4223a926d07 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java @@ -188,7 +188,7 @@ public void copyStore( StoreCopyRequester requester, CancellationRequest cancell // Request store files and transactions that will need recovery monitor.startReceivingStoreFiles(); try ( Response response = requester.copyStore( decorateWithProgressIndicator( - new ToFileStoreWriter( tempStore, monitor ) ) ) ) + new ToFileStoreWriter( tempStore, monitor, pageCache ) ) ) ) { monitor.finishReceivingStoreFiles(); // Update highest archived log id diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java index fd78597c660fe..a9a18859cb6ba 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java @@ -162,7 +162,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW File file = files.next().file(); // Read from paged file if mapping exists. Otherwise read through file system. - final Optional optionalPagedFile = pageCache.tryMappedPagedFile( file ); + final Optional optionalPagedFile = pageCache.getExistingMapping( file ); try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ? optionalPagedFile.get().openReadableByteChannel() : fileSystem.open( file, "r" ) ) diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java index ff19178758b21..55ac74f5362a0 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java @@ -23,18 +23,29 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Optional; + +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; +import org.neo4j.kernel.impl.store.StoreType; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; +import static org.neo4j.kernel.impl.store.StoreType.isStoreType; public class ToFileStoreWriter implements StoreWriter { private final File basePath; private final StoreCopyClient.Monitor monitor; + private final PageCache pageCache; - public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor ) + public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor, PageCache pageCache ) { this.basePath = graphDbStoreDir; this.monitor = monitor; + this.pageCache = pageCache; } @Override @@ -47,22 +58,31 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu File file = new File( basePath, path ); file.getParentFile().mkdirs(); + String filename = file.getName(); + final Optional storeType = isStoreType( filename ); + final Optional existingMapping = pageCache.getExistingMapping( file ); + monitor.startReceivingStoreFile( file ); - try ( RandomAccessFile randomAccessFile = new RandomAccessFile( file, "rw" ) ) + try { - long totalWritten = 0; - if ( hasData ) + if ( existingMapping.isPresent() ) + { + try ( PagedFile pagedFile = existingMapping.get() ) + { + return writeDataThroughPageCache( file, data, temporaryBuffer, hasData, pagedFile ); + } + } + else if ( storeType.isPresent() && storeType.get().isRecordStore() ) { - FileChannel channel = randomAccessFile.getChannel(); - while ( data.read( temporaryBuffer ) >= 0 ) + try ( PagedFile pagedFile = pageCache.map( file, pageCache.pageSize(), CREATE, WRITE ) ) { - temporaryBuffer.flip(); - totalWritten += temporaryBuffer.limit(); - channel.write( temporaryBuffer ); - temporaryBuffer.clear(); + return writeDataThroughPageCache( file, data, temporaryBuffer, hasData, pagedFile ); } } - return totalWritten; + else + { + return writeDataThroughFileSystem( file, data, temporaryBuffer, hasData ); + } } finally { @@ -75,6 +95,41 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu } } + private long writeDataThroughFileSystem( File file, ReadableByteChannel data, ByteBuffer temporaryBuffer, + boolean hasData ) throws IOException + { + try ( RandomAccessFile randomAccessFile = new RandomAccessFile( file, "rw" ) ) + { + return writeData( data, temporaryBuffer, hasData, randomAccessFile.getChannel() ); + } + } + + private long writeDataThroughPageCache( File file, ReadableByteChannel data, ByteBuffer temporaryBuffer, + boolean hasData, PagedFile pagedFile ) throws IOException + { + try ( WritableByteChannel channel = pagedFile.openWritableByteChannel() ) + { + return writeData( data, temporaryBuffer, hasData, channel ); + } + } + + private long writeData( ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData, + WritableByteChannel channel ) throws IOException + { + long totalWritten = 0; + if ( hasData ) + { + while ( data.read( temporaryBuffer ) >= 0 ) + { + temporaryBuffer.flip(); + totalWritten += temporaryBuffer.limit(); + channel.write( temporaryBuffer ); + temporaryBuffer.clear(); + } + } + return totalWritten; + } + @Override public void close() {