Skip to content

Commit

Permalink
Cleanup and test fixes for block device migration
Browse files Browse the repository at this point in the history
  • Loading branch information
ragadeeshu committed Jan 10, 2017
1 parent a82e929 commit 32be0cd
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 675 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Exceptions;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor.Section;
Expand Down Expand Up @@ -258,6 +256,8 @@ private void migrateToIsolatedDirectory( File storeDir, File migrationDirectory,

private void cleanMigrationDirectory( File migrationDirectory )
{
// We use the page cache here to make sure that the migration directory is clean even if we are using a block
// device.
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDirectory )::iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.OutputStream;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand All @@ -37,6 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -104,7 +106,6 @@
import org.neo4j.unsafe.impl.batchimport.input.Inputs;
import org.neo4j.unsafe.impl.batchimport.staging.CoarseBoundedProgressExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;

import static java.util.Arrays.asList;
import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME;
Expand Down Expand Up @@ -476,14 +477,15 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
}
StoreFile.fileOperation( DELETE, fileSystem, migrationDir, null, storesToDeleteFromMigratedDirectory,
true, null, StoreFileType.values() );
// When migrating on a block device there might be some files only accessible via the page cache.
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDir )::iterator;
for ( FileHandle fh : fileHandles )
{
if ( storesToDeleteFromMigratedDirectory
.stream().anyMatch( storeFile -> storeFile.fileName( StoreFileType.STORE )
.equals( fh.getFile().getName() ) ) )
Predicate<StoreFile> predicate =
storeFile -> storeFile.fileName( StoreFileType.STORE ).equals( fh.getFile().getName() );
if ( storesToDeleteFromMigratedDirectory.stream().anyMatch( predicate ) )
{
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( fh.getFile() );
if ( optionalPagedFile.isPresent() )
Expand All @@ -496,7 +498,6 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
}
catch ( IOException e )
{
//TODO This might not be the entire truth
// This means that we had no files only present in the page cache, this is fine.
}
}
Expand All @@ -511,7 +512,7 @@ private NeoStores instantiateLegacyStore( RecordFormats format, File storeDir )
private void prepareBatchImportMigration( File storeDir, File migrationDir, RecordFormats oldFormat,
RecordFormats newFormat ) throws IOException
{
BatchingNeoStores.createStore( fileSystem, pageCache, migrationDir.getPath(), newFormat );
createStore( migrationDir, newFormat );

// We use the batch importer for migrating the data, and we use it in a special way where we only
// rewrite the stores that have actually changed format. We know that to be node and relationship
Expand All @@ -530,6 +531,7 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco
StoreFile.NODE_LABEL_STORE};
if ( newFormat.dynamic().equals( oldFormat.dynamic() ) )
{
// We use the page cache for copying the STORE files since these might be on a block device.
for ( StoreFile file : storesFilesToMigrate )
{
File fromPath = new File( storeDir, file.fileName( StoreFileType.STORE ) );
Expand All @@ -549,11 +551,14 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco
}
while ( fromCursor.shouldRetry() );
}

}

catch ( NoSuchFileException e )
{
// It is okay for the file to not be there.
}
}

// The id files are to be kept on the normal file system, hence we use fileOperation to copy them.
StoreFile.fileOperation( COPY, fileSystem, storeDir, migrationDir, Arrays.asList( storesFilesToMigrate ),
true, // OK if it's not there (1.9)
ExistingTargetStrategy.FAIL, StoreFileType.ID);
Expand All @@ -577,6 +582,23 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco
}
}

private void createStore( File migrationDir, RecordFormats newFormat )
{
StoreFactory storeFactory = new StoreFactory( new File( migrationDir.getPath() ), pageCache, fileSystem,
newFormat,
NullLogProvider.getInstance() );
try ( NeoStores neoStores = storeFactory.openAllNeoStores( true ) )
{
neoStores.getMetaDataStore();
neoStores.getLabelTokenStore();
neoStores.getNodeStore();
neoStores.getPropertyStore();
neoStores.getRelationshipGroupStore();
neoStores.getRelationshipStore();
neoStores.getSchemaStore();
}
}

private AdditionalInitialIds readAdditionalIds( final long lastTxId, final long lastTxChecksum,
final long lastTxLogVersion, final long lastTxLogByteOffset ) throws IOException
{
Expand Down Expand Up @@ -692,15 +714,16 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
true, // allow to skip non existent source files
ExistingTargetStrategy.OVERWRITE, // allow to overwrite target files
StoreFileType.values() );
// Since some of the files might only be accessible through the page cache (i.e. block devices), we also try to
// move the files with the page cache.
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDir )::iterator;
for ( FileHandle fh : fileHandles )
{
if ( StreamSupport.stream( StoreFile.currentStoreFiles().spliterator(), false )
.anyMatch( storeFile -> storeFile.fileName( StoreFileType.STORE ).equals( fh.getFile()
.getName() )
) )
Predicate<StoreFile> predicate =
storeFile -> storeFile.fileName( StoreFileType.STORE ).equals( fh.getFile().getName() );
if ( StreamSupport.stream( StoreFile.currentStoreFiles().spliterator(), false ).anyMatch( predicate ) )
{
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( fh.getFile() );
if ( optionalPagedFile.isPresent() )
Expand Down Expand Up @@ -750,6 +773,7 @@ public void rebuildCounts( File storeDir, String versionToMigrateFrom, String ve
if ( StandardV2_1.STORE_VERSION.equals( versionToMigrateFrom ) ||
StandardV2_2.STORE_VERSION.equals( versionToMigrateFrom ) )
{
// These versions are not supported for block devices.
CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE );
// create counters from scratch
Iterable<StoreFile> countsStoreFiles =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void doImport( Input input ) throws IOException
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStores neoStore = getBatchingNeoStores();
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() );
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) )
{
Collector badCollector = input.badCollector();
Expand Down Expand Up @@ -256,11 +256,16 @@ public void doImport( Input input ) throws IOException

private BatchingNeoStores getBatchingNeoStores()
{
return (pageCache == null)
? BatchingNeoStores.batchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig )
: BatchingNeoStores.batchingNeoStoresWithExternalPageCache( fileSystem, pageCache, PageCacheTracer.NULL,
storeDir, recordFormats, config, logService, additionalInitialIds, dbConfig );
if ( pageCache == null )
{
return BatchingNeoStores.batchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig );
}
else
{
return BatchingNeoStores.batchingNeoStoresWithExternalPageCache( fileSystem, pageCache,
PageCacheTracer.NULL, storeDir, recordFormats, config, logService, additionalInitialIds, dbConfig );
}
}

private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,34 +176,37 @@ public static BatchingNeoStores batchingNeoStores( FileSystemAbstraction fileSys
RecordFormats recordFormats, Configuration config, LogService logService, AdditionalInitialIds initialIds,
Config dbConfig )
{
Config neo4jConfig = new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( config.pageCacheMemory() ),
mapped_memory_page_size.name(), valueOf( config.pageSize() ) ),
GraphDatabaseSettings.class );
Config neo4jConfig = getNeo4jConfig( config, dbConfig );
final PageCacheTracer tracer = new DefaultPageCacheTracer();
PageCache pageCache = createPageCache( fileSystem, neo4jConfig, logService.getInternalLogProvider(), tracer );

BatchingNeoStores batchingNeoStores =
new BatchingNeoStores( fileSystem, pageCache, storeDir, recordFormats, neo4jConfig, logService,
initialIds, false, tracer::bytesWritten );
return batchingNeoStores;
}

public static BatchingNeoStores batchingNeoStoresWithExternalPageCache( FileSystemAbstraction fileSystem,
PageCache pageCache, PageCacheTracer tracer, File storeDir, RecordFormats recordFormats, Configuration config,
LogService logService, AdditionalInitialIds initialIds, Config dbConfig )
PageCache pageCache, PageCacheTracer tracer, File storeDir, RecordFormats recordFormats,
Configuration config, LogService logService, AdditionalInitialIds initialIds, Config dbConfig )
{
Config neo4jConfig = new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( config.pageCacheMemory() ),
mapped_memory_page_size.name(), valueOf( config.pageSize() ) ),
GraphDatabaseSettings.class );
Config neo4jConfig = getNeo4jConfig( config, dbConfig );

BatchingNeoStores batchingNeoStores =
new BatchingNeoStores( fileSystem, pageCache, storeDir, recordFormats, neo4jConfig, logService,
initialIds, true, tracer::bytesWritten );
return batchingNeoStores;
}

protected static Config getNeo4jConfig( Configuration config, Config dbConfig )
{
return new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( config.pageCacheMemory() ),
mapped_memory_page_size.name(), valueOf( config.pageSize() ) ),
GraphDatabaseSettings.class );
}

private static PageCache createPageCache( FileSystemAbstraction fileSystem, Config config, LogProvider log,
PageCacheTracer tracer )
{
Expand All @@ -216,30 +219,6 @@ private boolean alreadyContainsData( NeoStores neoStores )
return neoStores.getNodeStore().getHighId() > 0 || neoStores.getRelationshipStore().getHighId() > 0;
}

/**
* A way to create the underlying {@link NeoStores} files in the {@link FileSystemAbstraction file system}
* before instantiating the real one. This allows some store contents to be populated before an import.
* Useful for store migration where the {@link ParallelBatchImporter} is used as migrator and some of
* its data need to be communicated by copying a store file.
*/
public static void createStore( FileSystemAbstraction fileSystem, PageCache pageCache, String storeDir,
RecordFormats newFormat )
throws IOException
{
StoreFactory storeFactory = new StoreFactory( new File( storeDir ), pageCache, fileSystem, newFormat,
NullLogProvider.getInstance() );
try ( NeoStores neoStores = storeFactory.openAllNeoStores( true ) )
{
neoStores.getMetaDataStore();
neoStores.getLabelTokenStore();
neoStores.getNodeStore();
neoStores.getPropertyStore();
neoStores.getRelationshipGroupStore();
neoStores.getRelationshipStore();
neoStores.getSchemaStore();
}
}

private StoreFactory newStoreFactory( String name, OpenOption... openOptions )
{
return new StoreFactory( storeDir, name, neo4jConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public void shouldNotOpenStoreWithNodesOrRelationshipsInIt() throws Exception
{
RecordFormats recordFormats = RecordFormatSelector.selectForConfig( Config.empty(),
NullLogProvider.getInstance() );
BatchingNeoStores.batchingNeoStores( fsr.get(), storeDir, recordFormats, DEFAULT, NullLogService.getInstance(), EMPTY,
Config.empty() );
BatchingNeoStores.batchingNeoStores( fsr.get(), storeDir, recordFormats, DEFAULT,
NullLogService.getInstance(), EMPTY, Config.empty() );
fail( "Should fail on existing data" );
}
catch ( IllegalStateException e )
Expand All @@ -93,8 +93,8 @@ public void shouldRespectDbConfig() throws Exception
// WHEN
RecordFormats recordFormats = StandardV3_0.RECORD_FORMATS;
int headerSize = recordFormats.dynamic().getRecordHeaderSize();
try ( BatchingNeoStores store = BatchingNeoStores.batchingNeoStores( fsr.get(), storeDir, recordFormats, DEFAULT,
NullLogService.getInstance(), EMPTY, config ) )
try ( BatchingNeoStores store = BatchingNeoStores.batchingNeoStores( fsr.get(), storeDir, recordFormats,
DEFAULT, NullLogService.getInstance(), EMPTY, config ) )
{
// THEN
assertEquals( size + headerSize, store.getPropertyStore().getArrayStore().getRecordSize() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.store.format.standard.StandardV2_2;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.TestDirectory;
Expand Down

0 comments on commit 32be0cd

Please sign in to comment.