Skip to content

Commit

Permalink
Better support for block device store migration
Browse files Browse the repository at this point in the history
This is achieved by also using the page cache for relevant parts of the migration process, in order to reach files only reachable by the page cache.
  • Loading branch information
ragadeeshu committed Jan 9, 2017
1 parent 6361bfe commit d654039
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 27 deletions.
Expand Up @@ -84,7 +84,7 @@ public void migrate(File storeDir)
UpgradableDatabase upgradableDatabase =
new UpgradableDatabase( fs, new StoreVersionCheck( pageCache ), new LegacyStoreVersionCheck( fs ),
format );
StoreUpgrader storeUpgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fs,
StoreUpgrader storeUpgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fs, pageCache,
logProvider );

StoreMigrationParticipant schemaMigrator = schemaIndexProvider.storeMigrationParticipant( fs, pageCache,
Expand Down
Expand Up @@ -24,11 +24,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Exceptions;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor.Section;
Expand Down Expand Up @@ -66,22 +70,23 @@ public class StoreUpgrader
public static final String MIGRATION_DIRECTORY = "upgrade";
public static final String MIGRATION_LEFT_OVERS_DIRECTORY = "upgrade_backup";
private static final String MIGRATION_STATUS_FILE = "_status";
public static final String CUSTOM_IO_EXCEPTION_MESSAGE = "Store upgrade not allowed with custom IO integrations";

private final UpgradableDatabase upgradableDatabase;
private final MigrationProgressMonitor progressMonitor;
private final List<StoreMigrationParticipant> participants = new ArrayList<>();
private final Config config;
private final FileSystemAbstraction fileSystem;
private final PageCache pageCache;
private final Log log;

public StoreUpgrader( UpgradableDatabase upgradableDatabase, MigrationProgressMonitor progressMonitor, Config
config, FileSystemAbstraction fileSystem, LogProvider logProvider )
config, FileSystemAbstraction fileSystem, PageCache pageCache, LogProvider logProvider )
{
this.upgradableDatabase = upgradableDatabase;
this.progressMonitor = progressMonitor;
this.fileSystem = fileSystem;
this.config = config;
this.pageCache = pageCache;
this.log = logProvider.getLog( getClass() );
}

Expand Down Expand Up @@ -115,7 +120,6 @@ public void migrateIfNeeded( File storeDirectory)
throw new UpgradeNotAllowedByConfigurationException();
}


// One or more participants would like to do migration
progressMonitor.started();

Expand Down Expand Up @@ -254,6 +258,18 @@ private void migrateToIsolatedDirectory( File storeDir, File migrationDirectory,

private void cleanMigrationDirectory( File migrationDirectory )
{
try
{
Iterable<FileHandle> fileHandles = pageCache.streamFilesRecursive( migrationDirectory )::iterator;
for ( FileHandle fh : fileHandles )
{
fh.delete();
}
}
catch ( IOException e )
{
// This means that we had no files to clean, this is fine.
}
if ( migrationDirectory.exists() )
{
try
Expand Down
Expand Up @@ -87,6 +87,7 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.CustomIOConfigValidator;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds;
Expand Down Expand Up @@ -138,6 +139,8 @@ public class StoreMigrator extends AbstractStoreMigrationParticipant
// complete upgrades in a reasonable time period.

private static final char TX_LOG_COUNTERS_SEPARATOR = 'A';
public static final String CUSTOM_IO_EXCEPTION_MESSAGE =
"Migrating this version is not supported for custom IO configurations.";

private final Config config;
private final LogService logService;
Expand Down Expand Up @@ -363,6 +366,7 @@ private void removeDuplicateEntityProperties( File storeDir, File migrationDir,
SchemaIndexProvider schemaIndexProvider, RecordFormats oldFormat )
throws IOException
{
CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE );
StoreFile.fileOperation( COPY, fileSystem, storeDir, migrationDir, Iterables.iterable(
StoreFile.PROPERTY_STORE,
StoreFile.PROPERTY_KEY_TOKEN_NAMES_STORE,
Expand Down Expand Up @@ -492,7 +496,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
}
catch ( IOException e )
{
//TODO This might not be the entire thruth
//TODO This might not be the entire truth
// This means that we had no files only present in the page cache, this is fine.
}
}
Expand Down Expand Up @@ -536,9 +540,9 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco
PageCursor fromCursor = fromFile.io( 0L, PagedFile.PF_SHARED_READ_LOCK );
PageCursor toCursor = toFile.io( 0L, PagedFile.PF_SHARED_WRITE_LOCK ); )
{
toCursor.next();
while ( fromCursor.next() )
{
toCursor.next();
do
{
fromCursor.copyTo( 0, toCursor, 0, pageSize );
Expand Down Expand Up @@ -746,6 +750,7 @@ public void rebuildCounts( File storeDir, String versionToMigrateFrom, String ve
if ( StandardV2_1.STORE_VERSION.equals( versionToMigrateFrom ) ||
StandardV2_2.STORE_VERSION.equals( versionToMigrateFrom ) )
{
CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE );
// create counters from scratch
Iterable<StoreFile> countsStoreFiles =
Iterables.iterable( StoreFile.COUNTS_STORE_LEFT, StoreFile.COUNTS_STORE_RIGHT );
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.store.format.standard.StandardV2_2;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.TestDirectory;

Expand Down Expand Up @@ -68,7 +69,7 @@ public void shouldFailToStartWithCustomIOConfigurationTest() throws IOException
}
catch ( RuntimeException ex )
{
assertEquals( StoreUpgrader.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getCause().getCause().getMessage() );
assertEquals( StoreMigrator.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getCause().getCause().getMessage() );
}
}

Expand Down
Expand Up @@ -131,7 +131,7 @@ public void migrate( File sourceStoreDir, File targetStoreDir,

try
{
newUpgrader( upgradableDatabase, progressMonitor, createIndexMigrator(), failingStoreMigrator )
newUpgrader( upgradableDatabase, pageCache, progressMonitor, createIndexMigrator(), failingStoreMigrator )
.migrateIfNeeded( workingDirectory );
fail( "Should throw exception" );
}
Expand All @@ -146,7 +146,8 @@ public void migrate( File sourceStoreDir, File targetStoreDir,
progressMonitor = new SilentMigrationProgressMonitor();
StoreMigrator migrator = new StoreMigrator( fs, pageCache, CONFIG, logService, schemaIndexProvider );
SchemaIndexMigrator indexMigrator = createIndexMigrator();
newUpgrader(upgradableDatabase, progressMonitor, indexMigrator, migrator ).migrateIfNeeded( workingDirectory );
newUpgrader(upgradableDatabase, pageCache, progressMonitor, indexMigrator, migrator ).migrateIfNeeded(
workingDirectory );

assertTrue( checkNeoStoreHasDefaultFormatVersion( check, workingDirectory ) );
assertTrue( allStoreFilesHaveNoTrailer( fs, workingDirectory ) );
Expand Down Expand Up @@ -190,7 +191,7 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT

try
{
newUpgrader( upgradableDatabase, progressMonitor, createIndexMigrator(), failingStoreMigrator )
newUpgrader( upgradableDatabase, pageCache, progressMonitor, createIndexMigrator(), failingStoreMigrator )
.migrateIfNeeded( workingDirectory );
fail( "Should throw exception" );
}
Expand All @@ -204,7 +205,7 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT

progressMonitor = new SilentMigrationProgressMonitor();
StoreMigrator migrator = new StoreMigrator( fs, pageCache, CONFIG, logService, schemaIndexProvider );
newUpgrader( upgradableDatabase, progressMonitor, createIndexMigrator(), migrator )
newUpgrader( upgradableDatabase, pageCache, progressMonitor, createIndexMigrator(), migrator )
.migrateIfNeeded( workingDirectory );

assertTrue( checkNeoStoreHasDefaultFormatVersion( check, workingDirectory ) );
Expand All @@ -217,15 +218,13 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
assertConsistentStore( workingDirectory );
}

private StoreUpgrader newUpgrader(UpgradableDatabase upgradableDatabase, MigrationProgressMonitor progressMonitor,
SchemaIndexMigrator indexMigrator,
StoreMigrator migrator )
private StoreUpgrader newUpgrader( UpgradableDatabase upgradableDatabase, PageCache pageCache,
MigrationProgressMonitor progressMonitor, SchemaIndexMigrator indexMigrator, StoreMigrator migrator )
{
Config allowUpgrade = new Config( stringMap( GraphDatabaseSettings
.allow_store_upgrade.name(), "true" ) );
Config allowUpgrade = new Config( stringMap( GraphDatabaseSettings.allow_store_upgrade.name(), "true" ) );

StoreUpgrader upgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, allowUpgrade, fs,
pageCacheRule.getPageCache( fs ), NullLogProvider.getInstance() );
StoreUpgrader upgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, allowUpgrade, fs, pageCache,
NullLogProvider.getInstance() );
upgrader.addParticipant( indexMigrator );
upgrader.addParticipant( migrator );
return upgrader;
Expand Down
Expand Up @@ -29,11 +29,16 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.EnterpriseGraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -710,8 +715,7 @@ private static void createDb( RecordFormats recordFormat, PageCache pageCache, F
{
GraphDatabaseService database = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder( storeDir )
.setConfig( GraphDatabaseSettings.allow_store_upgrade, Settings.TRUE )
.setConfig( GraphDatabaseSettings.record_format, recordFormat.storeVersion() )
.newGraphDatabase();
.setConfig( GraphDatabaseSettings.record_format, recordFormat.storeVersion() ).newGraphDatabase();
database.shutdown();
}

Expand All @@ -727,39 +731,74 @@ public void shouldMigrate() throws Exception
File db = new File( from.toString() + to.toString() );
FileSystemAbstraction fs = fileSystemRule.get();
fs.deleteRecursively( db );

GraphDatabaseService database = getGraphDatabaseService( db, from.storeVersion() );

database.execute( "CREATE INDEX ON :Person(name)" );
database.execute( "CREATE INDEX ON :Person(born)" );
database.execute( "CREATE CONSTRAINT ON (person:Person) ASSERT exists(person.name)" );
database.execute( CREATE_QUERY );
int beforeNodes;
int beforeLabels;
int beforeKeys;
int beforeRels;
int beforeRelTypes;
int beforeIndexes;
int beforeConstraints;
String[] beforeIndexNames;
try ( Transaction tx = database.beginTx() )
{
beforeNodes = database.getAllNodes().stream().mapToInt( ( n ) -> 1 ).sum();
beforeLabels = database.getAllLabels().stream().mapToInt( ( n ) -> 1 ).sum();
beforeKeys = database.getAllPropertyKeys().stream().mapToInt( ( n ) -> 1 ).sum();
beforeRels = database.getAllRelationships().stream().mapToInt( ( n ) -> 1 ).sum();
beforeRelTypes = database.getAllRelationshipTypes().stream().mapToInt( ( n ) -> 1 ).sum();
Stream<IndexDefinition> indexDefinitionStream = StreamSupport
.stream( database.schema().getIndexes().spliterator(), false );
beforeIndexes = indexDefinitionStream.mapToInt( ( n ) -> 1 ).sum();
Stream<ConstraintDefinition> constraintStream = StreamSupport
.stream( database.schema().getConstraints().spliterator(), false );
beforeConstraints = constraintStream.mapToInt( ( n ) -> 1 ).sum();
}
database.shutdown();

database = getGraphDatabaseService( db, to.storeVersion() );
int afterNodes;
int afterLabels;
int afterKeys;
int afterRels;
int afterRelTypes;
int afterIndexes;
int afterConstraints;
try ( Transaction tx = database.beginTx() )
{
afterNodes = database.getAllNodes().stream().mapToInt( ( n ) -> 1 ).sum();
afterLabels = database.getAllLabels().stream().mapToInt( ( n ) -> 1 ).sum();
afterKeys = database.getAllPropertyKeys().stream().mapToInt( ( n ) -> 1 ).sum();
afterRels = database.getAllRelationships().stream().mapToInt( ( n ) -> 1 ).sum();
afterRelTypes = database.getAllRelationshipTypes().stream().mapToInt( ( n ) -> 1 ).sum();
Stream<IndexDefinition> indexDefinitionStream = StreamSupport
.stream( database.schema().getIndexes().spliterator(), false );
afterIndexes = indexDefinitionStream.mapToInt( ( n ) -> 1 ).sum();
Stream<ConstraintDefinition> constraintStream = StreamSupport
.stream( database.schema().getConstraints().spliterator(), false );
afterConstraints = constraintStream.mapToInt( ( n ) -> 1 ).sum();
}
database.shutdown();

assertEquals( beforeNodes, afterNodes );
assertEquals( beforeRels, afterRels );
assertEquals( beforeNodes, afterNodes ); //171
assertEquals( beforeLabels, afterLabels ); //2
assertEquals( beforeKeys, afterKeys ); //8
assertEquals( beforeRels, afterRels ); //253
assertEquals( beforeRelTypes, afterRelTypes ); //6
assertEquals( beforeIndexes, afterIndexes ); //2
assertEquals( beforeConstraints, afterConstraints ); //2
}

//This method is overridden by a blockdevice test.

protected GraphDatabaseService getGraphDatabaseService( File db, String storeVersion )
{
return new GraphDatabaseFactory().newEmbeddedDatabaseBuilder( db )
return new EnterpriseGraphDatabaseFactory().newEmbeddedDatabaseBuilder( db )
.setConfig( GraphDatabaseSettings.allow_store_upgrade, Settings.TRUE )
.setConfig( GraphDatabaseSettings.record_format, storeVersion )
.newGraphDatabase();
.setConfig( GraphDatabaseSettings.record_format, storeVersion ).newGraphDatabase();
}
}

0 comments on commit d654039

Please sign in to comment.