Skip to content

Commit

Permalink
Ensure that store copy also moves non-store files
Browse files Browse the repository at this point in the history
Store files (with the current exception of the counts store) have to be moved
via the page cache, because that is the only way to communicate with any block
device storage that might be configured on the given instance.
The rest of the files are still on kept on the normal file system, and have to
be moved in the old fashion way. However, we can't queue up these moves like we
can with the store files, because we are not going to do the move until after
we have run recovery on the store. The recovery process may delete some files
and add others – Lucene files are particularly susceptible to this – and so we
have to traverse the file system after the recovery. This is made somewhat
awkward by the MoveAfterCopy callback, and the fact that our file system
traversal should not encounter any to-be-moved-by-the-page-cache store files.
We get around both of these issues by using streams. With the streams, we make
sure that the via-page-cache moves are ordered before our file system
traversal, so no moves get queued up twice. The reason is that the streams are
iterated lazily, and they have been carefully composed to retain their lazy
nature until the very last moment.
  • Loading branch information
chrisvest committed Nov 3, 2016
1 parent 256ca22 commit c94acd0
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 20 deletions.
Expand Up @@ -22,9 +22,41 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;

import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;

@FunctionalInterface
public interface FileMoveAction
{
void move( File toDir, CopyOption... copyOptions ) throws IOException;

static FileMoveAction copyViaPageCache( File file, PageCache pageCache )
{
return ( toDir, copyOptions ) ->
{
Optional<FileHandle> handle = pageCache.streamFilesRecursive( file ).findAny();
if ( handle.isPresent() )
{
handle.get().rename( new File( toDir, file.getName() ), copyOptions );
}
};
}

static FileMoveAction copyViaFileSystem( File file, File basePath )
{
Path base = basePath.toPath();
return ( toDir, copyOptions ) ->
{
Path originalPath = file.toPath();
Path relativePath = base.relativize( originalPath );
Path resolvedPath = toDir.toPath().resolve( relativePath );
Files.createDirectories( resolvedPath.getParent() );
Files.copy( originalPath, resolvedPath, copyOptions );
};
}

}
Expand Up @@ -21,16 +21,18 @@

import java.io.File;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;

public interface MoveAfterCopy
{
void move( Iterable<FileMoveAction> moves, File fromDirectory, File toDirectory ) throws Exception;
void move( Stream<FileMoveAction> moves, File fromDirectory, File toDirectory ) throws Exception;

static MoveAfterCopy moveReplaceExisting()
{
return (moves, fromDirectory, toDirectory) ->
{
for ( FileMoveAction move : moves )
Iterable<FileMoveAction> itr = moves::iterator;
for ( FileMoveAction move : itr )
{
move.move( toDirectory, StandardCopyOption.REPLACE_EXISTING );
}
Expand Down
Expand Up @@ -24,8 +24,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.neo4j.com.Response;
import org.neo4j.graphdb.GraphDatabaseService;
Expand Down Expand Up @@ -175,13 +177,17 @@ public void copyStore( StoreCopyRequester requester, CancellationRequest cancell
File tempStore = new File( storeDir, StoreUtil.TEMP_COPY_DIRECTORY_NAME );
try
{
List<FileMoveAction> fileMoveActions = new ArrayList<>();
// The ToFileStoreWriter will add FileMoveActions for *RecordStores* that have to be
// *moves via the PageCache*!
// We have to move these files via the page cache, because that is the *only way* that we can communicate
// with any block storage that might have been configured for this instance.
List<FileMoveAction> storeFileMoveActions = new ArrayList<>();
cleanDirectory( tempStore );

// Request store files and transactions that will need recovery
monitor.startReceivingStoreFiles();
try ( Response<?> response = requester.copyStore( decorateWithProgressIndicator(
new ToFileStoreWriter( tempStore, monitor, pageCache, fileMoveActions ) ) ) )
new ToFileStoreWriter( tempStore, monitor, pageCache, storeFileMoveActions ) ) ) )
{
monitor.finishReceivingStoreFiles();
// Update highest archived log id
Expand All @@ -202,9 +208,13 @@ public void copyStore( StoreCopyRequester requester, CancellationRequest cancell
graphDatabaseService.shutdown();
monitor.finishRecoveringStore();

// All is well, move the streamed files to the real store directory
// Start with the files written through the page cache. Should only be record store files
moveAfterCopy.move( fileMoveActions, tempStore, storeDir );
// All is well, move the streamed files to the real store directory.
// Start with the files written through the page cache. Should only be record store files.
// Note that the stream is lazy, so the file system traversal won't happen until *after* the store files
// have been moved. Thus we ensure that we only attempt to move them once.
Stream<FileMoveAction> moveActionStream = Stream.concat(
storeFileMoveActions.stream(), traverseGenerateMoveActions( tempStore, tempStore ) );
moveAfterCopy.move( moveActionStream, tempStore, storeDir );
}
finally
{
Expand All @@ -213,6 +223,30 @@ public void copyStore( StoreCopyRequester requester, CancellationRequest cancell
}
}

private static Stream<FileMoveAction> traverseGenerateMoveActions( File dir, File basePath )
{
// Note that flatMap is an *intermediate operation* and therefor always lazy.
// It is very important that the stream we return only *lazily* calls out to expandTraverseFiles!
return Stream.of( dir ).flatMap( d -> expandTraverseFiles( d, basePath ) );
}

private static Stream<FileMoveAction> expandTraverseFiles( File dir, File basePath )
{
File[] listing = dir.listFiles();
if ( listing == null )
{
// Weird, we somehow listed files for something that is no longer a directory. It's either a file,
// or doesn't exists. If the pathname no longer exists, then we are safe to return null here,
// because the flatMap in traverseGenerateMoveActions will just ignore it.
return dir.isFile() ? Stream.of( FileMoveAction.copyViaFileSystem( dir, basePath ) ) : null;
}
Stream<File> files = Arrays.stream( listing ).filter( File::isFile );
Stream<File> dirs = Arrays.stream( listing ).filter( File::isDirectory );
Stream<FileMoveAction> moveFiles = files.map( f -> FileMoveAction.copyViaFileSystem( f, basePath ) );
Stream<FileMoveAction> traverseDirectories = dirs.flatMap( d -> traverseGenerateMoveActions( d, basePath ) );
return Stream.concat( moveFiles, traverseDirectories );
}

private void writeTransactionsToActiveLogFile( File tempStoreDir, Response<?> response ) throws Exception
{
LifeSupport life = new LifeSupport();
Expand Down
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Optional;

import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.StoreType;
Expand Down Expand Up @@ -82,6 +81,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
return written;
}
}
// We don't add file move actions for these files. The reason is that we will perform the file moves
// *after* we have done recovery on the store, and this may delete some files, and add other files.
return writeDataThroughFileSystem( file, data, temporaryBuffer, hasData );
}
finally
Expand All @@ -99,14 +100,7 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
// the page cache later on when we want to move the files written through the page cache.
private void addPageCacheMoveAction( File file )
{
fileMoveActions.add( ( toDir, copyOptions ) ->
{
Optional<FileHandle> handle = pageCache.streamFilesRecursive( file ).findAny();
if ( handle.isPresent() )
{
handle.get().rename( new File( toDir, file.getName() ), copyOptions );
}
} );
fileMoveActions.add( FileMoveAction.copyViaPageCache( file, pageCache ) );
}

private int filePageSize( int alignment )
Expand Down
Expand Up @@ -236,11 +236,11 @@ public void shouldResetNeoStoreLastTransactionOffsetForNonForensicCopy() throws
File backupStore = testDir.directory( "backupStore" );

PageCache pageCache = pageCacheRule.getPageCache( fs );
GraphDatabaseService initialDatabase = createInitialDatabase( initialStore );
createInitialDatabase( initialStore );
long originalTransactionOffset =
MetaDataStore.getRecord( pageCache, new File( initialStore, MetaDataStore.DEFAULT_NAME ),
MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET );
initialDatabase = startDatabase( initialStore );
GraphDatabaseService initialDatabase = startDatabase( initialStore );

StoreCopyClient copier =
new StoreCopyClient( backupStore, Config.empty(), loadKernelExtensions(), NullLogProvider
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -230,7 +229,7 @@ public void shouldNotBranchStoreUnlessWeHaveCopiedDownAReplacement() throws Thro
doAnswer( invocation ->
{
MoveAfterCopy moveAfterCopy = invocation.getArgumentAt( 2, MoveAfterCopy.class );
moveAfterCopy.move( Collections.emptyList(), new File( "" ), new File( "" ) );
moveAfterCopy.move( Stream.empty(), new File( "" ), new File( "" ) );
return null;
} ).when( storeCopyClient ).copyStore(
any( StoreCopyClient.StoreCopyRequester.class ),
Expand Down

0 comments on commit c94acd0

Please sign in to comment.