Skip to content

Commit

Permalink
Count rebuilding as separate migration participant
Browse files Browse the repository at this point in the history
  • Loading branch information
ragadeeshu committed Jul 5, 2017
1 parent a086d27 commit a71e204
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 183 deletions.
Expand Up @@ -50,7 +50,7 @@ public static void recomputeCounts( NeoStores stores )
private final int highLabelId; private final int highLabelId;
private final int highRelationshipTypeId; private final int highRelationshipTypeId;
private final long lastCommittedTransactionId; private final long lastCommittedTransactionId;
private final MigrationProgressMonitor progressMonitor; private final MigrationProgressMonitor.Section progressMonitor;


public CountsComputer( NeoStores stores ) public CountsComputer( NeoStores stores )
{ {
Expand All @@ -64,11 +64,11 @@ public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, Relatio
int highLabelId, int highRelationshipTypeId ) int highLabelId, int highRelationshipTypeId )
{ {
this( lastCommittedTransactionId, nodes, relationships, highLabelId, highRelationshipTypeId, this( lastCommittedTransactionId, nodes, relationships, highLabelId, highRelationshipTypeId,
new SilentMigrationProgressMonitor() ); new SilentMigrationProgressMonitor().startSection( "Counts" ) );
} }


public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships, public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
int highLabelId, int highRelationshipTypeId, MigrationProgressMonitor progressMonitor ) int highLabelId, int highRelationshipTypeId, MigrationProgressMonitor.Section progressMonitor )
{ {
this.lastCommittedTransactionId = lastCommittedTransactionId; this.lastCommittedTransactionId = lastCommittedTransactionId;
this.nodes = nodes; this.nodes = nodes;
Expand All @@ -81,27 +81,23 @@ public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, Relatio
@Override @Override
public void initialize( CountsAccessor.Updater countsUpdater ) public void initialize( CountsAccessor.Updater countsUpdater )
{ {
progressMonitor.start( nodes.getHighestPossibleIdInUse() + relationships.getHighestPossibleIdInUse() );
NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId ); NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId );
try try
{ {
// Count nodes // Count nodes
MigrationProgressMonitor.Section nodeSection = progressMonitor.startSection( "node counting" );
nodeSection.start( nodes.getHighestPossibleIdInUse() );
superviseDynamicExecution( superviseDynamicExecution(
new NodeCountsStage( Configuration.DEFAULT, cache, nodes, highLabelId, countsUpdater, new NodeCountsStage( Configuration.DEFAULT, cache, nodes, highLabelId, countsUpdater,
nodeSection ) ); progressMonitor ) );
nodeSection.completed();
// Count relationships // Count relationships
MigrationProgressMonitor.Section relationshipSection = progressMonitor
.startSection( "relationship counting" );
relationshipSection.start( relationships.getHighestPossibleIdInUse() );
superviseDynamicExecution( superviseDynamicExecution(
new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId, new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId,
highRelationshipTypeId, countsUpdater, AUTO, relationshipSection ) ); highRelationshipTypeId, countsUpdater, AUTO, progressMonitor ) );
} }
finally finally
{ {
cache.close(); cache.close();
progressMonitor.completed();
} }
} }


Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.format.RecordFormats; import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.participant.CountsMigrator;
import org.neo4j.kernel.impl.storemigration.participant.LegacyIndexMigrator; import org.neo4j.kernel.impl.storemigration.participant.LegacyIndexMigrator;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator; import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.kernel.spi.legacyindex.IndexImplementation; import org.neo4j.kernel.spi.legacyindex.IndexImplementation;
Expand Down Expand Up @@ -90,10 +91,12 @@ public void migrate( File storeDir )
labelScanStoreProvider ); labelScanStoreProvider );
LegacyIndexMigrator legacyIndexMigrator = new LegacyIndexMigrator( fs, indexProviders, logProvider ); LegacyIndexMigrator legacyIndexMigrator = new LegacyIndexMigrator( fs, indexProviders, logProvider );
StoreMigrator storeMigrator = new StoreMigrator( fs, pageCache, config, logService ); StoreMigrator storeMigrator = new StoreMigrator( fs, pageCache, config, logService );
CountsMigrator countsMigrator = new CountsMigrator( fs, pageCache, config, logService );


storeUpgrader.addParticipant( schemaMigrator ); storeUpgrader.addParticipant( schemaMigrator );
storeUpgrader.addParticipant( legacyIndexMigrator ); storeUpgrader.addParticipant( legacyIndexMigrator );
storeUpgrader.addParticipant( storeMigrator ); storeUpgrader.addParticipant( storeMigrator );
storeUpgrader.addParticipant( countsMigrator );
storeUpgrader.migrateIfNeeded( storeDir ); storeUpgrader.migrateIfNeeded( storeDir );
} }
} }
Expand Up @@ -33,7 +33,6 @@ enum MigrationStatus
{ {
migrating, migrating,
moving, moving,
countsRebuilding,
completed; completed;


public boolean isNeededFor( MigrationStatus current ) public boolean isNeededFor( MigrationStatus current )
Expand Down
Expand Up @@ -64,17 +64,6 @@ void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.Section
void moveMigratedFiles( File migrationDir, File storeDir, String versionToMigrateFrom, String versionToMigrateTo ) void moveMigratedFiles( File migrationDir, File storeDir, String versionToMigrateFrom, String versionToMigrateTo )
throws IOException; throws IOException;


/**
* After a successful migration, and having moved all affected files from {@code upgradeDirectory} over to
* the {@code workingDirectory}, this will rebuild counts if needed.
* @param storeDir directory the store directory of the to move the migrated files to.
* @param progressMonitor
* @param versionToMigrateFrom the version we have migrated from
* @param versionToMigrateTo @throws IOException if unable to move one or more files.
* */
void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom,
String versionToMigrateTo ) throws IOException;

/** /**
* Delete any file from {@code migrationDir} produced during migration. * Delete any file from {@code migrationDir} produced during migration.
* @param migrationDir the directory where migrated files end up. * @param migrationDir the directory where migrated files end up.
Expand Down
Expand Up @@ -69,7 +69,6 @@ public class StoreUpgrader
public static final String MIGRATION_DIRECTORY = "upgrade"; public static final String MIGRATION_DIRECTORY = "upgrade";
public static final String MIGRATION_LEFT_OVERS_DIRECTORY = "upgrade_backup"; public static final String MIGRATION_LEFT_OVERS_DIRECTORY = "upgrade_backup";
private static final String MIGRATION_STATUS_FILE = "_status"; private static final String MIGRATION_STATUS_FILE = "_status";
public static final int COUNT_STORE_REBUILD_STEPS = 2;


private final UpgradableDatabase upgradableDatabase; private final UpgradableDatabase upgradableDatabase;
private final MigrationProgressMonitor progressMonitor; private final MigrationProgressMonitor progressMonitor;
Expand Down Expand Up @@ -121,7 +120,7 @@ public void migrateIfNeeded( File storeDirectory )
} }


// One or more participants would like to do migration // One or more participants would like to do migration
progressMonitor.started( participants.size() + COUNT_STORE_REBUILD_STEPS ); progressMonitor.started( participants.size() );


MigrationStatus migrationStatus = MigrationStatus.readMigrationStatus( fileSystem, migrationStateFile ); MigrationStatus migrationStatus = MigrationStatus.readMigrationStatus( fileSystem, migrationStateFile );
String versionToMigrateFrom = null; String versionToMigrateFrom = null;
Expand All @@ -142,15 +141,6 @@ public void migrateIfNeeded( File storeDirectory )
MigrationStatus.moving.maybeReadInfo( fileSystem, migrationStateFile, versionToMigrateFrom ); MigrationStatus.moving.maybeReadInfo( fileSystem, migrationStateFile, versionToMigrateFrom );
moveMigratedFilesToStoreDirectory( participants, migrationDirectory, storeDirectory, moveMigratedFilesToStoreDirectory( participants, migrationDirectory, storeDirectory,
versionToMigrateFrom, upgradableDatabase.currentVersion() ); versionToMigrateFrom, upgradableDatabase.currentVersion() );
MigrationStatus.countsRebuilding.setMigrationStatus( fileSystem, migrationStateFile, versionToMigrateFrom );
}

if ( MigrationStatus.countsRebuilding.isNeededFor( migrationStatus ) )
{
versionToMigrateFrom = MigrationStatus.countsRebuilding.maybeReadInfo(
fileSystem, migrationStateFile, versionToMigrateFrom );
rebuildCountsInStoreDirectory( participants, storeDirectory, versionToMigrateFrom );
MigrationStatus.completed.setMigrationStatus( fileSystem, migrationStateFile, versionToMigrateFrom );
} }


cleanup( participants, migrationDirectory ); cleanup( participants, migrationDirectory );
Expand Down Expand Up @@ -209,23 +199,6 @@ private void moveMigratedFilesToStoreDirectory( Iterable<StoreMigrationParticipa
} }
} }


private void rebuildCountsInStoreDirectory( List<StoreMigrationParticipant> participants, File storeDirectory,
String versionToMigrateFrom )
{
try
{
for ( StoreMigrationParticipant participant : participants )
{
participant.rebuildCounts( storeDirectory, progressMonitor, versionToMigrateFrom,
upgradableDatabase.currentVersion() );
}
}
catch ( IOException e )
{
throw new UnableToUpgradeException( "Unable to move migrated files into place", e );
}
}

private void migrateToIsolatedDirectory( File storeDir, File migrationDirectory, String versionToMigrateFrom ) private void migrateToIsolatedDirectory( File storeDir, File migrationDirectory, String versionToMigrateFrom )
{ {
try try
Expand Down
Expand Up @@ -23,7 +23,7 @@ public interface MigrationProgressMonitor
{ {
/** /**
* Signals that the migration process has started. * Signals that the migration process has started.
* @param numStages * @param numStages The number of migration stages is the migration process that we are monitoring.
*/ */
void started( int numStages ); void started( int numStages );


Expand Down
Expand Up @@ -55,10 +55,6 @@ public Section startSection( String name )
@Override @Override
public void completed() public void completed()
{ {
if ( currentStage < numStages )
{
log.info( format( "%d stages were not required and have been skipped.", numStages - currentStage ) );
}
log.info( MESSAGE_COMPLETED ); log.info( MESSAGE_COMPLETED );
} }


Expand Down
Expand Up @@ -52,12 +52,6 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
{ {
} }


@Override
public void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom,
String versionToMigrateTo ) throws IOException
{
}

@Override @Override
public void cleanup( File migrationDir ) throws IOException public void cleanup( File migrationDir ) throws IOException
{ {
Expand Down
@@ -0,0 +1,169 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.storemigration.participant;

import java.io.File;
import java.io.IOException;

import org.neo4j.helpers.collection.Iterables;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.CountsComputer;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.MetaDataStore.Position;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.StoreFactory;
import org.neo4j.kernel.impl.store.StoreFailureException;
import org.neo4j.kernel.impl.store.StoreFile;
import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.StoreVersion;
import org.neo4j.kernel.impl.store.format.standard.StandardV2_3;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy;
import org.neo4j.kernel.impl.storemigration.StoreFileType;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.logging.NullLogProvider;

import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME;
import static org.neo4j.kernel.impl.store.format.RecordFormatSelector.selectForVersion;
import static org.neo4j.kernel.impl.storemigration.FileOperation.DELETE;
import static org.neo4j.kernel.impl.storemigration.FileOperation.MOVE;

/**
* Rebuilds the count store during migration.
* <p>
* Since the database may or may not reside in the upgrade directory, depending on whether the new format has
* different capabilities or not, we rebuild the count store in the store directory if we fail to open the store in
* the upgrade directory
* <p>
* Just one out of many potential participants in a {@link StoreUpgrader migration}.
*
* @see StoreUpgrader
*/
public class CountsMigrator extends AbstractStoreMigrationParticipant
{

public static final Iterable<StoreFile> COUNTS_STORE_FILES = Iterables
.iterable( StoreFile.COUNTS_STORE_LEFT, StoreFile.COUNTS_STORE_RIGHT );
private final Config config;
private final LogService logService;
private final FileSystemAbstraction fileSystem;
private final PageCache pageCache;

public CountsMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Config config, LogService logService )
{
super( "Count rebuilding" );
this.fileSystem = fileSystem;
this.pageCache = pageCache;
this.config = config;
this.logService = logService;
}

@Override
public void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.Section progressMonitor,
String versionToMigrateFrom, String versionToMigrateTo ) throws IOException
{
if ( countStoreRebuildRequired( versionToMigrateFrom ) )
{
// create counters from scratch
StoreFile.fileOperation( DELETE, fileSystem, migrationDir, migrationDir, COUNTS_STORE_FILES, true, null,
StoreFileType.STORE );
File neoStore = new File( storeDir, DEFAULT_NAME );
long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID );
try
{
rebuildCountsFromScratch( migrationDir, lastTxId, progressMonitor, versionToMigrateTo, pageCache );
}
catch ( StoreFailureException e )
{
//This means that we did not perform a full migration, as the formats had the same capabilities. Thus
// we should use the store directory for information when rebuilding the count store.
rebuildCountsFromScratch( storeDir, lastTxId, progressMonitor, versionToMigrateFrom, pageCache );
}
}
}

@Override
public void moveMigratedFiles( File migrationDir, File storeDir, String versionToUpgradeFrom,
String versionToUpgradeTo ) throws IOException
{
// Move the migrated ones into the store directory
StoreFile.fileOperation( MOVE, fileSystem, migrationDir, storeDir, COUNTS_STORE_FILES, true,
// allow to skip non existent source files
ExistingTargetStrategy.OVERWRITE, // allow to overwrite target files
StoreFileType.values() );
// We do not need to move files with the page cache, as the count files always reside on the normal file system.
}

@Override
public void cleanup( File migrationDir ) throws IOException
{
fileSystem.deleteRecursively( migrationDir );
}

@Override
public String toString()
{
return "Kernel Node Count Rebuilder";
}

boolean countStoreRebuildRequired( String versionToMigrateFrom )
{
return StandardV2_3.STORE_VERSION.equals( versionToMigrateFrom ) ||
StandardV3_0.STORE_VERSION.equals( versionToMigrateFrom ) ||
StoreVersion.HIGH_LIMIT_V3_0_0.versionString().equals( versionToMigrateFrom ) ||
StoreVersion.HIGH_LIMIT_V3_0_6.versionString().equals( versionToMigrateFrom ) ||
StoreVersion.HIGH_LIMIT_V3_1_0.versionString().equals( versionToMigrateFrom );
}

private void rebuildCountsFromScratch( File migrationDir, long lastTxId,
MigrationProgressMonitor.Section progressMonitor, String expectedStoreVersion, PageCache pageCache )
{
final File storeFileBase = new File( migrationDir, MetaDataStore.DEFAULT_NAME + StoreFactory.COUNTS_STORE );

RecordFormats recordFormats = selectForVersion( expectedStoreVersion );
StoreFactory storeFactory = new StoreFactory( migrationDir, pageCache, fileSystem, recordFormats,
NullLogProvider.getInstance() );
try ( NeoStores neoStores = storeFactory
.openNeoStores( StoreType.NODE, StoreType.RELATIONSHIP, StoreType.LABEL_TOKEN,
StoreType.RELATIONSHIP_TYPE_TOKEN ) )
{
NodeStore nodeStore = neoStores.getNodeStore();
RelationshipStore relationshipStore = neoStores.getRelationshipStore();
try ( Lifespan life = new Lifespan() )
{
int highLabelId = (int) neoStores.getLabelTokenStore().getHighId();
int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId();
CountsComputer initializer = new CountsComputer( lastTxId, nodeStore, relationshipStore, highLabelId,
highRelationshipTypeId, progressMonitor );
life.add( new CountsTracker( logService.getInternalLogProvider(), fileSystem, pageCache, config,
storeFileBase ).setInitializer( initializer ) );
}
}
}
}

0 comments on commit a71e204

Please sign in to comment.