Skip to content

Commit

Permalink
StoreCopyClient now writes through provided PageCache if file is a st…
Browse files Browse the repository at this point in the history
…ore file

When performing a store copy, if file is a store file, StoryCopyClient will now map the target file with the PageCache and write to the file through an opened ByteChannel on the paged file.

 If file is not a store file, then behavior is same as before.

 This makes it possible to support store copy to a device external to the normal file system.
  • Loading branch information
burqen authored and chrisvest committed Sep 16, 2016
1 parent 6466ceb commit 817faf2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 18 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.store; package org.neo4j.kernel.impl.store;


import java.io.IOException; import java.io.IOException;
import java.util.Optional;


import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.impl.store.counts.CountsTracker; import org.neo4j.kernel.impl.store.counts.CountsTracker;
Expand Down Expand Up @@ -226,16 +227,35 @@ void close( NeoStores me, Object object )
* @throws IllegalStateException if can't determine store type for specified file * @throws IllegalStateException if can't determine store type for specified file
*/ */
public static StoreType typeOf( String storeFileName ) public static StoreType typeOf( String storeFileName )
{
final Optional<StoreType> optional = isStoreType( storeFileName );
if ( optional.isPresent() )
{
return optional.get();
}
else
{
throw new IllegalArgumentException( "No enum constant for " + storeFileName + " file." );
}
}

/**
* Determine if a file name corresponds to any store type.
*
* @param fileName - name of the file to check for corresponding store type
* @return {@link Optional} wrapping store type, or empty optional if no store type corresponds to file name
*/
public static Optional<StoreType> isStoreType( String fileName )
{ {
StoreType[] values = StoreType.values(); StoreType[] values = StoreType.values();
for ( StoreType value : values ) for ( StoreType value : values )
{ {
if ( value.getStoreName().equals( storeFileName ) || if ( value.getStoreName().equals( fileName ) ||
storeFileName.equals( MetaDataStore.DEFAULT_NAME + value.getStoreName() ) ) fileName.equals( MetaDataStore.DEFAULT_NAME + value.getStoreName() ) )
{ {
return value; return Optional.of( value );
} }
} }
throw new IllegalArgumentException( "No enum constant for " + storeFileName + " file." ); return Optional.empty();
} }
} }
Expand Up @@ -188,7 +188,7 @@ public void copyStore( StoreCopyRequester requester, CancellationRequest cancell
// Request store files and transactions that will need recovery // Request store files and transactions that will need recovery
monitor.startReceivingStoreFiles(); monitor.startReceivingStoreFiles();
try ( Response<?> response = requester.copyStore( decorateWithProgressIndicator( try ( Response<?> response = requester.copyStore( decorateWithProgressIndicator(
new ToFileStoreWriter( tempStore, monitor ) ) ) ) new ToFileStoreWriter( tempStore, monitor, pageCache ) ) ) )
{ {
monitor.finishReceivingStoreFiles(); monitor.finishReceivingStoreFiles();
// Update highest archived log id // Update highest archived log id
Expand Down
Expand Up @@ -162,7 +162,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
File file = files.next().file(); File file = files.next().file();


// Read from paged file if mapping exists. Otherwise read through file system. // Read from paged file if mapping exists. Otherwise read through file system.
final Optional<PagedFile> optionalPagedFile = pageCache.tryMappedPagedFile( file ); final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( file );
try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ? try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ?
optionalPagedFile.get().openReadableByteChannel() : optionalPagedFile.get().openReadableByteChannel() :
fileSystem.open( file, "r" ) ) fileSystem.open( file, "r" ) )
Expand Down
Expand Up @@ -23,18 +23,29 @@
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Optional;

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.StoreType;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.neo4j.kernel.impl.store.StoreType.isStoreType;


public class ToFileStoreWriter implements StoreWriter public class ToFileStoreWriter implements StoreWriter
{ {
private final File basePath; private final File basePath;
private final StoreCopyClient.Monitor monitor; private final StoreCopyClient.Monitor monitor;
private final PageCache pageCache;


public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor ) public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor, PageCache pageCache )
{ {
this.basePath = graphDbStoreDir; this.basePath = graphDbStoreDir;
this.monitor = monitor; this.monitor = monitor;
this.pageCache = pageCache;
} }


@Override @Override
Expand All @@ -47,22 +58,31 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
File file = new File( basePath, path ); File file = new File( basePath, path );


file.getParentFile().mkdirs(); file.getParentFile().mkdirs();
String filename = file.getName();
final Optional<StoreType> storeType = isStoreType( filename );
final Optional<PagedFile> existingMapping = pageCache.getExistingMapping( file );

monitor.startReceivingStoreFile( file ); monitor.startReceivingStoreFile( file );
try ( RandomAccessFile randomAccessFile = new RandomAccessFile( file, "rw" ) ) try
{ {
long totalWritten = 0; if ( existingMapping.isPresent() )
if ( hasData ) {
try ( PagedFile pagedFile = existingMapping.get() )
{
return writeDataThroughPageCache( file, data, temporaryBuffer, hasData, pagedFile );
}
}
else if ( storeType.isPresent() && storeType.get().isRecordStore() )
{ {
FileChannel channel = randomAccessFile.getChannel(); try ( PagedFile pagedFile = pageCache.map( file, pageCache.pageSize(), CREATE, WRITE ) )
while ( data.read( temporaryBuffer ) >= 0 )
{ {
temporaryBuffer.flip(); return writeDataThroughPageCache( file, data, temporaryBuffer, hasData, pagedFile );
totalWritten += temporaryBuffer.limit();
channel.write( temporaryBuffer );
temporaryBuffer.clear();
} }
} }
return totalWritten; else
{
return writeDataThroughFileSystem( file, data, temporaryBuffer, hasData );
}
} }
finally finally
{ {
Expand All @@ -75,6 +95,41 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
} }
} }


private long writeDataThroughFileSystem( File file, ReadableByteChannel data, ByteBuffer temporaryBuffer,
boolean hasData ) throws IOException
{
try ( RandomAccessFile randomAccessFile = new RandomAccessFile( file, "rw" ) )
{
return writeData( data, temporaryBuffer, hasData, randomAccessFile.getChannel() );
}
}

private long writeDataThroughPageCache( File file, ReadableByteChannel data, ByteBuffer temporaryBuffer,
boolean hasData, PagedFile pagedFile ) throws IOException
{
try ( WritableByteChannel channel = pagedFile.openWritableByteChannel() )
{
return writeData( data, temporaryBuffer, hasData, channel );
}
}

private long writeData( ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData,
WritableByteChannel channel ) throws IOException
{
long totalWritten = 0;
if ( hasData )
{
while ( data.read( temporaryBuffer ) >= 0 )
{
temporaryBuffer.flip();
totalWritten += temporaryBuffer.limit();
channel.write( temporaryBuffer );
temporaryBuffer.clear();
}
}
return totalWritten;
}

@Override @Override
public void close() public void close()
{ {
Expand Down

0 comments on commit 817faf2

Please sign in to comment.