Skip to content

Commit

Permalink
Initial support for blockdevice store migration
Browse files Browse the repository at this point in the history
Supports migrating nodes and relationships. This is achieved by making the StoreMigrator partly use the page cache for migration. Also adds support for a BatchingneoStores that uses an external page cache.
  • Loading branch information
ragadeeshu committed Jan 9, 2017
1 parent 72d7f47 commit 6361bfe
Show file tree
Hide file tree
Showing 17 changed files with 975 additions and 64 deletions.
Expand Up @@ -431,6 +431,12 @@ public void rename( File targetFile, CopyOption... options ) throws IOException
}
}

@Override
public String toString()
{
return fileHandle.toString();
}

@Override
public void delete() throws IOException
{
Expand Down
Expand Up @@ -262,7 +262,11 @@ else if ( FormatFamily.isHigherFamilyFormat( DEFAULT_FORMAT, result ) ||
}
}

private static Iterable<RecordFormats> allFormats()
/**
* Gets all {@link RecordFormats} that the selector is aware of.
* @return An iterable over all known record formats.
*/
public static Iterable<RecordFormats> allFormats()
{
Iterable<RecordFormats.Factory> loadableFormatFactories = Service.load( RecordFormats.Factory.class );
Iterable<RecordFormats> loadableFormats = map( RecordFormats.Factory::newInstance, loadableFormatFactories );
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor.Section;
import org.neo4j.kernel.impl.util.CustomIOConfigValidator;
import org.neo4j.kernel.internal.Version;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -116,7 +115,6 @@ public void migrateIfNeeded( File storeDirectory)
throw new UpgradeNotAllowedByConfigurationException();
}

CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE );

// One or more participants would like to do migration
progressMonitor.started();
Expand Down
Expand Up @@ -29,17 +29,23 @@
import java.io.OutputStream;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

import org.neo4j.helpers.collection.Iterables;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.store.StorePropertyCursor;
Expand Down Expand Up @@ -425,7 +431,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
readAdditionalIds( lastTxId, lastTxChecksum, lastTxLogVersion, lastTxLogByteOffset );

// We have to make sure to keep the token ids if we're migrating properties/labels
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem,
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem, pageCache,
importConfig, logService,
withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressMonitor,
importConfig ), importConfig ), additionalInitialIds, config, newFormat );
Expand Down Expand Up @@ -466,6 +472,29 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
}
StoreFile.fileOperation( DELETE, fileSystem, migrationDir, null, storesToDeleteFromMigratedDirectory,
true, null, StoreFileType.values() );
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDir )::iterator;
for ( FileHandle fh : fileHandles )
{
if ( storesToDeleteFromMigratedDirectory
.stream().anyMatch( storeFile -> storeFile.fileName( StoreFileType.STORE )
.equals( fh.getFile().getName() ) ) )
{
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( fh.getFile() );
if ( optionalPagedFile.isPresent() )
{
optionalPagedFile.get().close();
}
fh.delete();
}
}
}
catch ( IOException e )
{
//TODO This might not be the entire thruth
// This means that we had no files only present in the page cache, this is fine.
}
}
}

Expand All @@ -478,7 +507,7 @@ private NeoStores instantiateLegacyStore( RecordFormats format, File storeDir )
private void prepareBatchImportMigration( File storeDir, File migrationDir, RecordFormats oldFormat,
RecordFormats newFormat ) throws IOException
{
BatchingNeoStores.createStore( fileSystem, migrationDir.getPath(), config, newFormat );
BatchingNeoStores.createStore( fileSystem, pageCache, migrationDir.getPath(), newFormat );

// We use the batch importer for migrating the data, and we use it in a special way where we only
// rewrite the stores that have actually changed format. We know that to be node and relationship
Expand All @@ -497,9 +526,33 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco
StoreFile.NODE_LABEL_STORE};
if ( newFormat.dynamic().equals( oldFormat.dynamic() ) )
{
StoreFile.fileOperation( COPY, fileSystem, storeDir, migrationDir, Arrays.asList(storesFilesToMigrate),
for ( StoreFile file : storesFilesToMigrate )
{
File fromPath = new File( storeDir, file.fileName( StoreFileType.STORE ) );
File toPath = new File( migrationDir, file.fileName( StoreFileType.STORE ) );
int pageSize = pageCache.pageSize();
try ( PagedFile fromFile = pageCache.map( fromPath, pageSize );
PagedFile toFile = pageCache.map( toPath, pageSize, StandardOpenOption.CREATE );
PageCursor fromCursor = fromFile.io( 0L, PagedFile.PF_SHARED_READ_LOCK );
PageCursor toCursor = toFile.io( 0L, PagedFile.PF_SHARED_WRITE_LOCK ); )
{
toCursor.next();
while ( fromCursor.next() )
{
do
{
fromCursor.copyTo( 0, toCursor, 0, pageSize );
}
while ( fromCursor.shouldRetry() );
}

}

}

StoreFile.fileOperation( COPY, fileSystem, storeDir, migrationDir, Arrays.asList( storesFilesToMigrate ),
true, // OK if it's not there (1.9)
ExistingTargetStrategy.FAIL, StoreFileType.values() );
ExistingTargetStrategy.FAIL, StoreFileType.ID);
}
else
{
Expand Down Expand Up @@ -602,7 +655,7 @@ protected InputNode inputEntityOf( NodeRecord record )
};
}

private <ENTITY extends InputEntity,RECORD extends PrimitiveRecord> BiConsumer<ENTITY,RECORD> propertyDecorator(
private <ENTITY extends InputEntity, RECORD extends PrimitiveRecord> BiConsumer<ENTITY,RECORD> propertyDecorator(
boolean requiresPropertyMigration, RecordCursors cursors )
{
if ( !requiresPropertyMigration )
Expand All @@ -612,7 +665,8 @@ private <ENTITY extends InputEntity,RECORD extends PrimitiveRecord> BiConsumer<E

final StorePropertyCursor cursor = new StorePropertyCursor( cursors, ignored -> {} );
final List<Object> scratch = new ArrayList<>();
return (ENTITY entity, RECORD record) -> {
return ( ENTITY entity, RECORD record ) ->
{
cursor.init( record.getNextProp(), LockService.NO_LOCK );
scratch.clear();
while ( cursor.next() )
Expand All @@ -634,6 +688,29 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
true, // allow to skip non existent source files
ExistingTargetStrategy.OVERWRITE, // allow to overwrite target files
StoreFileType.values() );
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDir )::iterator;
for ( FileHandle fh : fileHandles )
{
if ( StreamSupport.stream( StoreFile.currentStoreFiles().spliterator(), false )
.anyMatch( storeFile -> storeFile.fileName( StoreFileType.STORE ).equals( fh.getFile()
.getName() )
) )
{
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( fh.getFile() );
if ( optionalPagedFile.isPresent() )
{
optionalPagedFile.get().close();
}
fh.rename( new File( storeDir, fh.getFile().getName() ), StandardCopyOption.REPLACE_EXISTING );
}
}
}
catch ( IOException e )
{
//This means that we had no files only present in the page cache, this is fine.
}

RecordFormats oldFormat = selectForVersion( versionToUpgradeFrom );
RecordFormats newFormat = selectForVersion( versionToUpgradeTo );
Expand Down
Expand Up @@ -28,6 +28,8 @@
import org.neo4j.helpers.Format;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class ParallelBatchImporter implements BatchImporter
private final AdditionalInitialIds additionalInitialIds;
private final Config dbConfig;
private final RecordFormats recordFormats;
private final PageCache pageCache;

/**
* Advanced usage of the parallel batch importer, for special and very specific cases. Please use
Expand All @@ -100,6 +103,27 @@ public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, C
AdditionalInitialIds additionalInitialIds,
Config dbConfig, RecordFormats recordFormats )
{
this.storeDir = storeDir;
this.fileSystem = fileSystem;
this.pageCache = null;
this.config = config;
this.logService = logService;
this.dbConfig = dbConfig;
this.recordFormats = recordFormats;
this.log = logService.getInternalLogProvider().getLog( getClass() );
this.executionMonitor = executionMonitor;
this.additionalInitialIds = additionalInitialIds;
}

/**
* Advanced usage of the parallel batch importer, for special and very specific cases. Please use
* a constructor with fewer arguments instead.
*/
public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, PageCache pageCache,
Configuration config, LogService logService, ExecutionMonitor executionMonitor,
AdditionalInitialIds additionalInitialIds, Config dbConfig, RecordFormats recordFormats )
{
this.pageCache = pageCache;
this.storeDir = storeDir;
this.fileSystem = fileSystem;
this.config = config;
Expand All @@ -121,7 +145,7 @@ public ParallelBatchImporter( File storeDir, Configuration config, LogService lo
{
this( storeDir, new DefaultFileSystemAbstraction(), config, logService,
withDynamicProcessorAssignment( executionMonitor, config ), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, NullLogProvider.getInstance() ));
RecordFormatSelector.selectForConfig( dbConfig, NullLogProvider.getInstance() ) );
}

@Override
Expand All @@ -135,10 +159,9 @@ public void doImport( Input input ) throws IOException
NodeLabelsCache nodeLabelsCache = null;
long startTime = currentTimeMillis();
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStores neoStore = new BatchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig );
try ( BatchingNeoStores neoStore = getBatchingNeoStores();
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() );
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) )
{
Collector badCollector = input.badCollector();
Expand Down Expand Up @@ -231,6 +254,15 @@ public void doImport( Input input ) throws IOException
}
}

private BatchingNeoStores getBatchingNeoStores()
{
return (pageCache == null)
? BatchingNeoStores.batchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig )
: BatchingNeoStores.batchingNeoStoresWithExternalPageCache( fileSystem, pageCache, PageCacheTracer.NULL,
storeDir, recordFormats, config, logService, additionalInitialIds, dbConfig );
}

private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users )
{
GatheringMemoryStatsVisitor total = new GatheringMemoryStatsVisitor();
Expand Down Expand Up @@ -279,7 +311,7 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,

InputIterator<InputRelationship> perType = perTypeIterator.next();
String topic = " [:" + currentType + "] (" +
(i+1) + "/" + allRelationshipTypes.length + ")";
(i + 1) + "/" + allRelationshipTypes.length + ")";
final RelationshipStage relationshipStage = new RelationshipStage( topic, config,
writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache,
storeUpdateMonitor, nextRelationshipId );
Expand Down

0 comments on commit 6361bfe

Please sign in to comment.