Skip to content

Commit

Permalink
Merge pull request #8297 from chrisvest/3.1-fix-blockdevice-storecopy
Browse files Browse the repository at this point in the history
Fix store copy on block device
  • Loading branch information
burqen committed Nov 3, 2016
2 parents cc45c98 + c94acd0 commit ce6b3ff
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.com.Response;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.ExternallyManagedPageCache;
import org.neo4j.com.storecopy.MoveToDir;
import org.neo4j.com.storecopy.MoveAfterCopy;
import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.com.storecopy.StoreCopyClient;
Expand Down Expand Up @@ -149,9 +149,10 @@ BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePor
monitors.newMonitor( StoreCopyClient.Monitor.class, getClass() ), forensics );
FullBackupStoreCopyRequester storeCopyRequester =
new FullBackupStoreCopyRequester( sourceHostNameOrIp, sourcePort, timeout, forensics, monitors );
File copyOfStore = storeCopier.copyStore( storeCopyRequester, CancellationRequest.NEVER_CANCELLED );

new MoveToDir().move( copyOfStore, targetDirectory );
storeCopier.copyStore(
storeCopyRequester,
CancellationRequest.NEVER_CANCELLED,
MoveAfterCopy.moveReplaceExisting() );

bumpDebugDotLogFileVersion( targetDirectory, timestamp );
boolean consistent = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,41 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;

import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;

@FunctionalInterface
public interface FileMoveAction
{
void move( File toDir, CopyOption... copyOptions ) throws IOException;

static FileMoveAction copyViaPageCache( File file, PageCache pageCache )
{
return ( toDir, copyOptions ) ->
{
Optional<FileHandle> handle = pageCache.streamFilesRecursive( file ).findAny();
if ( handle.isPresent() )
{
handle.get().rename( new File( toDir, file.getName() ), copyOptions );
}
};
}

static FileMoveAction copyViaFileSystem( File file, File basePath )
{
Path base = basePath.toPath();
return ( toDir, copyOptions ) ->
{
Path originalPath = file.toPath();
Path relativePath = base.relativize( originalPath );
Path resolvedPath = toDir.toPath().resolve( relativePath );
Files.createDirectories( resolvedPath.getParent() );
Files.copy( originalPath, resolvedPath, copyOptions );
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
package org.neo4j.com.storecopy;

import java.io.File;
import java.io.IOException;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;

import org.neo4j.io.fs.FileUtils;

import static org.neo4j.com.storecopy.StoreUtil.relevantDbFiles;

public class MoveToDir implements PostStoreCopyOperation
public interface MoveAfterCopy
{
@Override
public void move( File from, File to ) throws IOException
void move( Stream<FileMoveAction> moves, File fromDirectory, File toDirectory ) throws Exception;

static MoveAfterCopy moveReplaceExisting()
{
for ( File candidate : relevantDbFiles( from ) )
return (moves, fromDirectory, toDirectory) ->
{
FileUtils.moveFileToDirectory( candidate, to );
}

FileUtils.deleteRecursively( from );
Iterable<FileMoveAction> itr = moves::iterator;
for ( FileMoveAction move : itr )
{
move.move( toDirectory, StandardCopyOption.REPLACE_EXISTING );
}
};
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.neo4j.com.Response;
import org.neo4j.graphdb.GraphDatabaseService;
Expand Down Expand Up @@ -168,38 +170,81 @@ public StoreCopyClient( File storeDir, Config config, Iterable<KernelExtensionFa
this.forensics = forensics;
}

public File copyStore( StoreCopyRequester requester, CancellationRequest cancellationRequest ) throws Exception
public void copyStore( StoreCopyRequester requester, CancellationRequest cancellationRequest,
MoveAfterCopy moveAfterCopy ) throws Exception
{
// Create a temp directory (or clean if present)
File tempStore = new File( storeDir, StoreUtil.TEMP_COPY_DIRECTORY_NAME );
List<FileMoveAction> fileMoveActions = new ArrayList<>();
cleanDirectory( tempStore );

// Request store files and transactions that will need recovery
monitor.startReceivingStoreFiles();
try ( Response<?> response = requester.copyStore( decorateWithProgressIndicator(
new ToFileStoreWriter( tempStore, monitor, pageCache, fileMoveActions ) ) ) )
try
{
monitor.finishReceivingStoreFiles();
// Update highest archived log id
// Write transactions that happened during the copy to the currently active logical log
writeTransactionsToActiveLogFile( tempStore, response );
// The ToFileStoreWriter will add FileMoveActions for *RecordStores* that have to be
// *moves via the PageCache*!
// We have to move these files via the page cache, because that is the *only way* that we can communicate
// with any block storage that might have been configured for this instance.
List<FileMoveAction> storeFileMoveActions = new ArrayList<>();
cleanDirectory( tempStore );

// Request store files and transactions that will need recovery
monitor.startReceivingStoreFiles();
try ( Response<?> response = requester.copyStore( decorateWithProgressIndicator(
new ToFileStoreWriter( tempStore, monitor, pageCache, storeFileMoveActions ) ) ) )
{
monitor.finishReceivingStoreFiles();
// Update highest archived log id
// Write transactions that happened during the copy to the currently active logical log
writeTransactionsToActiveLogFile( tempStore, response );
}
finally
{
requester.done();
}

// This is a good place to check if the switch has been cancelled
checkCancellation( cancellationRequest, tempStore );

// Run recovery, so that the transactions we just wrote into the active log will be applied.
monitor.startRecoveringStore();
GraphDatabaseService graphDatabaseService = newTempDatabase( tempStore );
graphDatabaseService.shutdown();
monitor.finishRecoveringStore();

// All is well, move the streamed files to the real store directory.
// Start with the files written through the page cache. Should only be record store files.
// Note that the stream is lazy, so the file system traversal won't happen until *after* the store files
// have been moved. Thus we ensure that we only attempt to move them once.
Stream<FileMoveAction> moveActionStream = Stream.concat(
storeFileMoveActions.stream(), traverseGenerateMoveActions( tempStore, tempStore ) );
moveAfterCopy.move( moveActionStream, tempStore, storeDir );
}
finally
{
requester.done();
// All done, delete temp directory
FileUtils.deleteRecursively( tempStore );
}
}

// This is a good place to check if the switch has been cancelled
checkCancellation( cancellationRequest, tempStore );

// Run recovery, so that the transactions we just wrote into the active log will be applied.
monitor.startRecoveringStore();
GraphDatabaseService graphDatabaseService = newTempDatabase( tempStore );
graphDatabaseService.shutdown();
monitor.finishRecoveringStore();
private static Stream<FileMoveAction> traverseGenerateMoveActions( File dir, File basePath )
{
// Note that flatMap is an *intermediate operation* and therefor always lazy.
// It is very important that the stream we return only *lazily* calls out to expandTraverseFiles!
return Stream.of( dir ).flatMap( d -> expandTraverseFiles( d, basePath ) );
}

return tempStore;
private static Stream<FileMoveAction> expandTraverseFiles( File dir, File basePath )
{
File[] listing = dir.listFiles();
if ( listing == null )
{
// Weird, we somehow listed files for something that is no longer a directory. It's either a file,
// or doesn't exists. If the pathname no longer exists, then we are safe to return null here,
// because the flatMap in traverseGenerateMoveActions will just ignore it.
return dir.isFile() ? Stream.of( FileMoveAction.copyViaFileSystem( dir, basePath ) ) : null;
}
Stream<File> files = Arrays.stream( listing ).filter( File::isFile );
Stream<File> dirs = Arrays.stream( listing ).filter( File::isDirectory );
Stream<FileMoveAction> moveFiles = files.map( f -> FileMoveAction.copyViaFileSystem( f, basePath ) );
Stream<FileMoveAction> traverseDirectories = dirs.flatMap( d -> traverseGenerateMoveActions( d, basePath ) );
return Stream.concat( moveFiles, traverseDirectories );
}

private void writeTransactionsToActiveLogFile( File tempStoreDir, Response<?> response ) throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Optional;

import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.StoreType;
Expand Down Expand Up @@ -82,6 +81,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
return written;
}
}
// We don't add file move actions for these files. The reason is that we will perform the file moves
// *after* we have done recovery on the store, and this may delete some files, and add other files.
return writeDataThroughFileSystem( file, data, temporaryBuffer, hasData );
}
finally
Expand All @@ -99,14 +100,7 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
// the page cache later on when we want to move the files written through the page cache.
private void addPageCacheMoveAction( File file )
{
fileMoveActions.add( ( toDir, copyOptions ) ->
{
Optional<FileHandle> handle = pageCache.streamFilesRecursive( file ).findAny();
if ( handle.isPresent() )
{
handle.get().rename( new File( toDir, file.getName() ), copyOptions );
}
} );
fileMoveActions.add( FileMoveAction.copyViaPageCache( file, pageCache ) );
}

private int filePageSize( int alignment )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.com.storecopy;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
Expand Down Expand Up @@ -123,8 +122,7 @@ public void finishRecoveringStore()
spy( new LocalStoreCopyRequester( original, originalDir, fs ) );

// when
File copyOfStore = copier.copyStore( storeCopyRequest, cancelStoreCopy::get );
new MoveToDir().move( copyOfStore, copyDir );
copier.copyStore( storeCopyRequest, cancelStoreCopy::get, MoveAfterCopy.moveReplaceExisting() );

// Then
GraphDatabaseService copy = startDatabase( copyDir );
Expand Down Expand Up @@ -169,8 +167,7 @@ copyDir, config, loadKernelExtensions(), NullLogProvider.getInstance(), fs, page
final GraphDatabaseAPI original = (GraphDatabaseAPI) startDatabase( originalDir, recordFormatsName );
StoreCopyClient.StoreCopyRequester storeCopyRequest = new LocalStoreCopyRequester( original, originalDir, fs );

File copyOfStore = copier.copyStore( storeCopyRequest, CancellationRequest.NEVER_CANCELLED );
new MoveToDir().move( copyOfStore, copyDir );
copier.copyStore( storeCopyRequest, CancellationRequest.NEVER_CANCELLED, MoveAfterCopy.moveReplaceExisting() );

assertFalse( new File( copyDir, TEMP_COPY_DIRECTORY_NAME ).exists() );

Expand Down Expand Up @@ -214,8 +211,7 @@ public void finishReceivingStoreFiles()
spy( new LocalStoreCopyRequester( original, originalDir, fs ) );

// when
File copyOfStore = copier.copyStore( storeCopyRequest, cancelStoreCopy::get );
new MoveToDir().move( copyOfStore, copyDir );
copier.copyStore( storeCopyRequest, cancelStoreCopy::get, MoveAfterCopy.moveReplaceExisting() );

// Then
GraphDatabaseService copy = startDatabase( copyDir );
Expand All @@ -240,11 +236,11 @@ public void shouldResetNeoStoreLastTransactionOffsetForNonForensicCopy() throws
File backupStore = testDir.directory( "backupStore" );

PageCache pageCache = pageCacheRule.getPageCache( fs );
GraphDatabaseService initialDatabase = createInitialDatabase( initialStore );
createInitialDatabase( initialStore );
long originalTransactionOffset =
MetaDataStore.getRecord( pageCache, new File( initialStore, MetaDataStore.DEFAULT_NAME ),
MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET );
initialDatabase = startDatabase( initialStore );
GraphDatabaseService initialDatabase = startDatabase( initialStore );

StoreCopyClient copier =
new StoreCopyClient( backupStore, Config.empty(), loadKernelExtensions(), NullLogProvider
Expand All @@ -254,8 +250,7 @@ public void shouldResetNeoStoreLastTransactionOffsetForNonForensicCopy() throws
new LocalStoreCopyRequester( (GraphDatabaseAPI) initialDatabase, initialStore, fs );

// WHEN
File copyOfStore = copier.copyStore( storeCopyRequest, falseCancellationRequest );
new MoveToDir().move( copyOfStore, backupStore );
copier.copyStore( storeCopyRequest, falseCancellationRequest, MoveAfterCopy.moveReplaceExisting() );

// THEN
long updatedTransactionOffset =
Expand All @@ -267,7 +262,6 @@ public void shouldResetNeoStoreLastTransactionOffsetForNonForensicCopy() throws
}

@Test
@Ignore
public void shouldDeleteTempCopyFolderOnFailures() throws Exception
{
// GIVEN
Expand Down Expand Up @@ -295,7 +289,7 @@ public void done()
// WHEN
try
{
copier.copyStore( storeCopyRequest, falseCancellationRequest );
copier.copyStore( storeCopyRequest, falseCancellationRequest, MoveAfterCopy.moveReplaceExisting() );
fail( "should have thrown " );
}
catch ( RuntimeException ex )
Expand Down

0 comments on commit ce6b3ff

Please sign in to comment.