From a71e204aa2c40c49f4836b1e31c08a8306008c37 Mon Sep 17 00:00:00 2001 From: Ragnar Mellbin Date: Thu, 15 Jun 2017 16:02:45 +0200 Subject: [PATCH] Count rebuilding as separate migration participant --- .../kernel/impl/store/CountsComputer.java | 18 +- .../impl/storemigration/DatabaseMigrator.java | 3 + .../impl/storemigration/MigrationStatus.java | 1 - .../StoreMigrationParticipant.java | 11 -- .../impl/storemigration/StoreUpgrader.java | 29 +-- .../monitoring/MigrationProgressMonitor.java | 2 +- .../VisibleMigrationProgressMonitor.java | 4 - .../AbstractStoreMigrationParticipant.java | 6 - .../participant/CountsMigrator.java | 169 ++++++++++++++++++ .../participant/StoreMigrator.java | 126 ++++--------- .../impl/batchimport/NodeCountsProcessor.java | 13 +- .../impl/batchimport/NodeCountsStage.java | 12 +- .../participant/StoreMigratorIT.java | 10 +- .../test/java/upgrade/StoreUpgraderTest.java | 7 +- .../participant/StoreMigratorTest.java | 28 ++- 15 files changed, 256 insertions(+), 183 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/CountsMigrator.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java index 55e2c75e9b7d8..1b23cb74cdd46 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CountsComputer.java @@ -50,7 +50,7 @@ public static void recomputeCounts( NeoStores stores ) private final int highLabelId; private final int highRelationshipTypeId; private final long lastCommittedTransactionId; - private final MigrationProgressMonitor progressMonitor; + private final MigrationProgressMonitor.Section progressMonitor; public CountsComputer( NeoStores stores ) { @@ -64,11 +64,11 @@ public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, Relatio int highLabelId, int highRelationshipTypeId ) { this( lastCommittedTransactionId, nodes, relationships, highLabelId, highRelationshipTypeId, - new SilentMigrationProgressMonitor() ); + new SilentMigrationProgressMonitor().startSection( "Counts" ) ); } public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships, - int highLabelId, int highRelationshipTypeId, MigrationProgressMonitor progressMonitor ) + int highLabelId, int highRelationshipTypeId, MigrationProgressMonitor.Section progressMonitor ) { this.lastCommittedTransactionId = lastCommittedTransactionId; this.nodes = nodes; @@ -81,27 +81,23 @@ public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, Relatio @Override public void initialize( CountsAccessor.Updater countsUpdater ) { + progressMonitor.start( nodes.getHighestPossibleIdInUse() + relationships.getHighestPossibleIdInUse() ); NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId ); try { // Count nodes - MigrationProgressMonitor.Section nodeSection = progressMonitor.startSection( "node counting" ); - nodeSection.start( nodes.getHighestPossibleIdInUse() ); superviseDynamicExecution( new NodeCountsStage( Configuration.DEFAULT, cache, nodes, highLabelId, countsUpdater, - nodeSection ) ); - nodeSection.completed(); + progressMonitor ) ); // Count relationships - MigrationProgressMonitor.Section relationshipSection = progressMonitor - .startSection( "relationship counting" ); - relationshipSection.start( relationships.getHighestPossibleIdInUse() ); superviseDynamicExecution( new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId, - highRelationshipTypeId, countsUpdater, AUTO, relationshipSection ) ); + highRelationshipTypeId, countsUpdater, AUTO, progressMonitor ) ); } finally { cache.close(); + progressMonitor.completed(); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/DatabaseMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/DatabaseMigrator.java index b0b9cc94e8b64..f9123f1beb62a 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/DatabaseMigrator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/DatabaseMigrator.java @@ -30,6 +30,7 @@ import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.store.format.RecordFormats; import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; +import org.neo4j.kernel.impl.storemigration.participant.CountsMigrator; import org.neo4j.kernel.impl.storemigration.participant.LegacyIndexMigrator; import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator; import org.neo4j.kernel.spi.legacyindex.IndexImplementation; @@ -90,10 +91,12 @@ public void migrate( File storeDir ) labelScanStoreProvider ); LegacyIndexMigrator legacyIndexMigrator = new LegacyIndexMigrator( fs, indexProviders, logProvider ); StoreMigrator storeMigrator = new StoreMigrator( fs, pageCache, config, logService ); + CountsMigrator countsMigrator = new CountsMigrator( fs, pageCache, config, logService ); storeUpgrader.addParticipant( schemaMigrator ); storeUpgrader.addParticipant( legacyIndexMigrator ); storeUpgrader.addParticipant( storeMigrator ); + storeUpgrader.addParticipant( countsMigrator ); storeUpgrader.migrateIfNeeded( storeDir ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/MigrationStatus.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/MigrationStatus.java index e25330ba5c097..05f7530b7dc0e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/MigrationStatus.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/MigrationStatus.java @@ -33,7 +33,6 @@ enum MigrationStatus { migrating, moving, - countsRebuilding, completed; public boolean isNeededFor( MigrationStatus current ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrationParticipant.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrationParticipant.java index 4170844710dfc..7de8e941a7ef0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrationParticipant.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrationParticipant.java @@ -64,17 +64,6 @@ void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.Section void moveMigratedFiles( File migrationDir, File storeDir, String versionToMigrateFrom, String versionToMigrateTo ) throws IOException; - /** - * After a successful migration, and having moved all affected files from {@code upgradeDirectory} over to - * the {@code workingDirectory}, this will rebuild counts if needed. - * @param storeDir directory the store directory of the to move the migrated files to. - * @param progressMonitor - * @param versionToMigrateFrom the version we have migrated from - * @param versionToMigrateTo @throws IOException if unable to move one or more files. - * */ - void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, - String versionToMigrateTo ) throws IOException; - /** * Delete any file from {@code migrationDir} produced during migration. * @param migrationDir the directory where migrated files end up. diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreUpgrader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreUpgrader.java index a22561d007b85..4ef85f41a0178 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreUpgrader.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreUpgrader.java @@ -69,7 +69,6 @@ public class StoreUpgrader public static final String MIGRATION_DIRECTORY = "upgrade"; public static final String MIGRATION_LEFT_OVERS_DIRECTORY = "upgrade_backup"; private static final String MIGRATION_STATUS_FILE = "_status"; - public static final int COUNT_STORE_REBUILD_STEPS = 2; private final UpgradableDatabase upgradableDatabase; private final MigrationProgressMonitor progressMonitor; @@ -121,7 +120,7 @@ public void migrateIfNeeded( File storeDirectory ) } // One or more participants would like to do migration - progressMonitor.started( participants.size() + COUNT_STORE_REBUILD_STEPS ); + progressMonitor.started( participants.size() ); MigrationStatus migrationStatus = MigrationStatus.readMigrationStatus( fileSystem, migrationStateFile ); String versionToMigrateFrom = null; @@ -142,15 +141,6 @@ public void migrateIfNeeded( File storeDirectory ) MigrationStatus.moving.maybeReadInfo( fileSystem, migrationStateFile, versionToMigrateFrom ); moveMigratedFilesToStoreDirectory( participants, migrationDirectory, storeDirectory, versionToMigrateFrom, upgradableDatabase.currentVersion() ); - MigrationStatus.countsRebuilding.setMigrationStatus( fileSystem, migrationStateFile, versionToMigrateFrom ); - } - - if ( MigrationStatus.countsRebuilding.isNeededFor( migrationStatus ) ) - { - versionToMigrateFrom = MigrationStatus.countsRebuilding.maybeReadInfo( - fileSystem, migrationStateFile, versionToMigrateFrom ); - rebuildCountsInStoreDirectory( participants, storeDirectory, versionToMigrateFrom ); - MigrationStatus.completed.setMigrationStatus( fileSystem, migrationStateFile, versionToMigrateFrom ); } cleanup( participants, migrationDirectory ); @@ -209,23 +199,6 @@ private void moveMigratedFilesToStoreDirectory( Iterable participants, File storeDirectory, - String versionToMigrateFrom ) - { - try - { - for ( StoreMigrationParticipant participant : participants ) - { - participant.rebuildCounts( storeDirectory, progressMonitor, versionToMigrateFrom, - upgradableDatabase.currentVersion() ); - } - } - catch ( IOException e ) - { - throw new UnableToUpgradeException( "Unable to move migrated files into place", e ); - } - } - private void migrateToIsolatedDirectory( File storeDir, File migrationDirectory, String versionToMigrateFrom ) { try diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/MigrationProgressMonitor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/MigrationProgressMonitor.java index b62d29b25f8bb..0fda911086942 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/MigrationProgressMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/MigrationProgressMonitor.java @@ -23,7 +23,7 @@ public interface MigrationProgressMonitor { /** * Signals that the migration process has started. - * @param numStages + * @param numStages The number of migration stages is the migration process that we are monitoring. */ void started( int numStages ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/VisibleMigrationProgressMonitor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/VisibleMigrationProgressMonitor.java index 3aaf3a4318f98..238318fec5f75 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/VisibleMigrationProgressMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/monitoring/VisibleMigrationProgressMonitor.java @@ -55,10 +55,6 @@ public Section startSection( String name ) @Override public void completed() { - if ( currentStage < numStages ) - { - log.info( format( "%d stages were not required and have been skipped.", numStages - currentStage ) ); - } log.info( MESSAGE_COMPLETED ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/AbstractStoreMigrationParticipant.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/AbstractStoreMigrationParticipant.java index f086dae3ef15c..d665f779954a8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/AbstractStoreMigrationParticipant.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/AbstractStoreMigrationParticipant.java @@ -52,12 +52,6 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT { } - @Override - public void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, - String versionToMigrateTo ) throws IOException - { - } - @Override public void cleanup( File migrationDir ) throws IOException { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/CountsMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/CountsMigrator.java new file mode 100644 index 0000000000000..b4b8daed6e310 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/CountsMigrator.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.storemigration.participant; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.helpers.collection.Iterables; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.store.CountsComputer; +import org.neo4j.kernel.impl.store.MetaDataStore; +import org.neo4j.kernel.impl.store.MetaDataStore.Position; +import org.neo4j.kernel.impl.store.NeoStores; +import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.StoreFactory; +import org.neo4j.kernel.impl.store.StoreFailureException; +import org.neo4j.kernel.impl.store.StoreFile; +import org.neo4j.kernel.impl.store.StoreType; +import org.neo4j.kernel.impl.store.counts.CountsTracker; +import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.kernel.impl.store.format.StoreVersion; +import org.neo4j.kernel.impl.store.format.standard.StandardV2_3; +import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; +import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy; +import org.neo4j.kernel.impl.storemigration.StoreFileType; +import org.neo4j.kernel.impl.storemigration.StoreUpgrader; +import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; +import org.neo4j.kernel.lifecycle.Lifespan; +import org.neo4j.logging.NullLogProvider; + +import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME; +import static org.neo4j.kernel.impl.store.format.RecordFormatSelector.selectForVersion; +import static org.neo4j.kernel.impl.storemigration.FileOperation.DELETE; +import static org.neo4j.kernel.impl.storemigration.FileOperation.MOVE; + +/** + * Rebuilds the count store during migration. + *

+ * Since the database may or may not reside in the upgrade directory, depending on whether the new format has + * different capabilities or not, we rebuild the count store in the store directory if we fail to open the store in + * the upgrade directory + *

+ * Just one out of many potential participants in a {@link StoreUpgrader migration}. + * + * @see StoreUpgrader + */ +public class CountsMigrator extends AbstractStoreMigrationParticipant +{ + + public static final Iterable COUNTS_STORE_FILES = Iterables + .iterable( StoreFile.COUNTS_STORE_LEFT, StoreFile.COUNTS_STORE_RIGHT ); + private final Config config; + private final LogService logService; + private final FileSystemAbstraction fileSystem; + private final PageCache pageCache; + + public CountsMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Config config, LogService logService ) + { + super( "Count rebuilding" ); + this.fileSystem = fileSystem; + this.pageCache = pageCache; + this.config = config; + this.logService = logService; + } + + @Override + public void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.Section progressMonitor, + String versionToMigrateFrom, String versionToMigrateTo ) throws IOException + { + if ( countStoreRebuildRequired( versionToMigrateFrom ) ) + { + // create counters from scratch + StoreFile.fileOperation( DELETE, fileSystem, migrationDir, migrationDir, COUNTS_STORE_FILES, true, null, + StoreFileType.STORE ); + File neoStore = new File( storeDir, DEFAULT_NAME ); + long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID ); + try + { + rebuildCountsFromScratch( migrationDir, lastTxId, progressMonitor, versionToMigrateTo, pageCache ); + } + catch ( StoreFailureException e ) + { + //This means that we did not perform a full migration, as the formats had the same capabilities. Thus + // we should use the store directory for information when rebuilding the count store. + rebuildCountsFromScratch( storeDir, lastTxId, progressMonitor, versionToMigrateFrom, pageCache ); + } + } + } + + @Override + public void moveMigratedFiles( File migrationDir, File storeDir, String versionToUpgradeFrom, + String versionToUpgradeTo ) throws IOException + { + // Move the migrated ones into the store directory + StoreFile.fileOperation( MOVE, fileSystem, migrationDir, storeDir, COUNTS_STORE_FILES, true, + // allow to skip non existent source files + ExistingTargetStrategy.OVERWRITE, // allow to overwrite target files + StoreFileType.values() ); + // We do not need to move files with the page cache, as the count files always reside on the normal file system. + } + + @Override + public void cleanup( File migrationDir ) throws IOException + { + fileSystem.deleteRecursively( migrationDir ); + } + + @Override + public String toString() + { + return "Kernel Node Count Rebuilder"; + } + + boolean countStoreRebuildRequired( String versionToMigrateFrom ) + { + return StandardV2_3.STORE_VERSION.equals( versionToMigrateFrom ) || + StandardV3_0.STORE_VERSION.equals( versionToMigrateFrom ) || + StoreVersion.HIGH_LIMIT_V3_0_0.versionString().equals( versionToMigrateFrom ) || + StoreVersion.HIGH_LIMIT_V3_0_6.versionString().equals( versionToMigrateFrom ) || + StoreVersion.HIGH_LIMIT_V3_1_0.versionString().equals( versionToMigrateFrom ); + } + + private void rebuildCountsFromScratch( File migrationDir, long lastTxId, + MigrationProgressMonitor.Section progressMonitor, String expectedStoreVersion, PageCache pageCache ) + { + final File storeFileBase = new File( migrationDir, MetaDataStore.DEFAULT_NAME + StoreFactory.COUNTS_STORE ); + + RecordFormats recordFormats = selectForVersion( expectedStoreVersion ); + StoreFactory storeFactory = new StoreFactory( migrationDir, pageCache, fileSystem, recordFormats, + NullLogProvider.getInstance() ); + try ( NeoStores neoStores = storeFactory + .openNeoStores( StoreType.NODE, StoreType.RELATIONSHIP, StoreType.LABEL_TOKEN, + StoreType.RELATIONSHIP_TYPE_TOKEN ) ) + { + NodeStore nodeStore = neoStores.getNodeStore(); + RelationshipStore relationshipStore = neoStores.getRelationshipStore(); + try ( Lifespan life = new Lifespan() ) + { + int highLabelId = (int) neoStores.getLabelTokenStore().getHighId(); + int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId(); + CountsComputer initializer = new CountsComputer( lastTxId, nodeStore, relationshipStore, highLabelId, + highRelationshipTypeId, progressMonitor ); + life.add( new CountsTracker( logService.getInternalLogProvider(), fileSystem, pageCache, config, + storeFileBase ).setInitializer( initializer ) ); + } + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java index 8109185eba425..b571234cc30be 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java @@ -42,7 +42,6 @@ import java.util.function.Supplier; import java.util.stream.StreamSupport; -import org.neo4j.helpers.collection.Iterables; import org.neo4j.io.fs.FileHandle; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileUtils; @@ -54,7 +53,6 @@ import org.neo4j.kernel.impl.api.store.StorePropertyCursor; import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.kernel.impl.store.CountsComputer; import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore.Position; import org.neo4j.kernel.impl.store.NeoStores; @@ -65,16 +63,13 @@ import org.neo4j.kernel.impl.store.StoreFile; import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.impl.store.TransactionId; -import org.neo4j.kernel.impl.store.counts.CountsTracker; import org.neo4j.kernel.impl.store.format.CapabilityType; import org.neo4j.kernel.impl.store.format.FormatFamily; import org.neo4j.kernel.impl.store.format.RecordFormats; -import org.neo4j.kernel.impl.store.format.StoreVersion; import org.neo4j.kernel.impl.store.format.standard.MetaDataRecordFormat; import org.neo4j.kernel.impl.store.format.standard.NodeRecordFormat; import org.neo4j.kernel.impl.store.format.standard.RelationshipRecordFormat; import org.neo4j.kernel.impl.store.format.standard.StandardV2_3; -import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.kernel.impl.store.id.ReadOnlyIdGeneratorFactory; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.PrimitiveRecord; @@ -90,7 +85,6 @@ import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.util.CustomIOConfigValidator; -import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.logging.NullLogProvider; import org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds; import org.neo4j.unsafe.impl.batchimport.BatchImporter; @@ -217,6 +211,9 @@ public void migrate( File storeDir, File migrationDir, MigrationProgressMonitor. lastTxId, lastTxInfo.checksum(), lastTxLogPosition.getLogVersion(), lastTxLogPosition.getByteOffset(), progressMonitor, oldFormat, newFormat ); } + // update necessary neostore records + LogPosition logPosition = readLastTxLogPosition( migrationDir ); + updateOrAddNeoStoreFieldsAsPartOfMigration( migrationDir, storeDir, versionToMigrateTo, logPosition ); } private boolean isDifferentCapabilities( RecordFormats oldFormat, RecordFormats newFormat ) @@ -475,26 +472,7 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir, Reco { File fromPath = new File( storeDir, file.fileName( StoreFileType.STORE ) ); File toPath = new File( migrationDir, file.fileName( StoreFileType.STORE ) ); - int pageSize = pageCache.pageSize(); - try ( PagedFile fromFile = pageCache.map( fromPath, pageSize ); - PagedFile toFile = pageCache.map( toPath, pageSize, StandardOpenOption.CREATE ); - PageCursor fromCursor = fromFile.io( 0L, PagedFile.PF_SHARED_READ_LOCK ); - PageCursor toCursor = toFile.io( 0L, PagedFile.PF_SHARED_WRITE_LOCK ) ) - { - while ( fromCursor.next() ) - { - toCursor.next(); - do - { - fromCursor.copyTo( 0, toCursor, 0, pageSize ); - } - while ( fromCursor.shouldRetry() ); - } - } - catch ( NoSuchFileException e ) - { - // It is okay for the file to not be there. - } + copyWithPageCache( fromPath, toPath ); } // The ID files are to be kept on the normal file system, hence we use fileOperation to copy them. @@ -680,71 +658,20 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT //This means that we had no files only present in the page cache, this is fine. } - // update necessary neostore records - LogPosition logPosition = readLastTxLogPosition( migrationDir ); - updateOrAddNeoStoreFieldsAsPartOfMigration( migrationDir, storeDir, versionToUpgradeTo, logPosition ); - // delete old logs legacyLogs.deleteUnusedLogFiles( storeDir ); } - @Override - public void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, - String versionToMigrateTo ) throws IOException - { - if ( countStoreRebuildRequired( versionToMigrateFrom ) ) - { - // create counters from scratch - Iterable countsStoreFiles = - Iterables.iterable( StoreFile.COUNTS_STORE_LEFT, StoreFile.COUNTS_STORE_RIGHT ); - StoreFile.fileOperation( DELETE, fileSystem, storeDir, storeDir, - countsStoreFiles, true, null, StoreFileType.STORE ); - File neoStore = new File( storeDir, DEFAULT_NAME ); - long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID ); - rebuildCountsFromScratch( storeDir, lastTxId, progressMonitor, versionToMigrateTo, pageCache ); - } - } - - boolean countStoreRebuildRequired( String versionToMigrateFrom ) - { - return StandardV2_3.STORE_VERSION.equals( versionToMigrateFrom ) || - StandardV3_0.STORE_VERSION.equals( versionToMigrateFrom ) || - StoreVersion.HIGH_LIMIT_V3_0_0.versionString().equals( versionToMigrateFrom ) || - StoreVersion.HIGH_LIMIT_V3_0_6.versionString().equals( versionToMigrateFrom ) || - StoreVersion.HIGH_LIMIT_V3_1_0.versionString().equals( versionToMigrateFrom ); - } - - private void rebuildCountsFromScratch( File storeDir, long lastTxId, MigrationProgressMonitor progressMonitor, - String versionToMigrateTo, PageCache pageCache ) - { - final File storeFileBase = new File( storeDir, MetaDataStore.DEFAULT_NAME + StoreFactory.COUNTS_STORE ); - - RecordFormats recordFormats = selectForVersion( versionToMigrateTo ); - StoreFactory storeFactory = new StoreFactory( storeDir, pageCache, fileSystem, recordFormats, - NullLogProvider.getInstance() ); - try ( NeoStores neoStores = storeFactory.openAllNeoStores() ) - { - NodeStore nodeStore = neoStores.getNodeStore(); - RelationshipStore relationshipStore = neoStores.getRelationshipStore(); - try ( Lifespan life = new Lifespan() ) - { - int highLabelId = (int) neoStores.getLabelTokenStore().getHighId(); - int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId(); - CountsComputer initializer = new CountsComputer( lastTxId, nodeStore, relationshipStore, highLabelId, - highRelationshipTypeId, progressMonitor ); - life.add( new CountsTracker( logService.getInternalLogProvider(), fileSystem, pageCache, config, - storeFileBase ).setInitializer( initializer ) ); - } - } - } - private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File storeDir, String versionToMigrateTo, LogPosition lastClosedTxLogPosition ) throws IOException { final File storeDirNeoStore = new File( storeDir, DEFAULT_NAME ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TRANSACTION_ID, + final File migrationDirNeoStore = new File( migrationDir, DEFAULT_NAME ); + copyWithPageCache( storeDirNeoStore, migrationDirNeoStore ); + + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.UPGRADE_TRANSACTION_ID, MetaDataStore.getRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_ID ) ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TIME, System.currentTimeMillis() ); + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.UPGRADE_TIME, System.currentTimeMillis() ); // Store the checksum of the transaction id the upgrade is at right now. Store it both as // LAST_TRANSACTION_CHECKSUM and UPGRADE_TRANSACTION_CHECKSUM. Initially the last transaction and the @@ -760,24 +687,24 @@ private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File // problematic as long as we don't migrate and translate old logs. TransactionId lastTxInfo = readLastTxInformation( migrationDir ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM, lastTxInfo.checksum() ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TRANSACTION_CHECKSUM, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.UPGRADE_TRANSACTION_CHECKSUM, lastTxInfo.checksum() ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, lastTxInfo.commitTimestamp() ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TRANSACTION_COMMIT_TIMESTAMP, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.UPGRADE_TRANSACTION_COMMIT_TIMESTAMP, lastTxInfo.commitTimestamp() ); // add LAST_CLOSED_TRANSACTION_LOG_VERSION and LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET to the migrated // NeoStore - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, lastClosedTxLogPosition.getLogVersion() ); - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, lastClosedTxLogPosition.getByteOffset() ); // Upgrade version in NeoStore - MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.STORE_VERSION, + MetaDataStore.setRecord( pageCache, migrationDirNeoStore, Position.STORE_VERSION, MetaDataStore.versionStringToLong( versionToMigrateTo ) ); } @@ -793,6 +720,27 @@ public String toString() return "Kernel StoreMigrator"; } + private void copyWithPageCache( File sourceFile, File targetFile ) throws IOException + { + // We use the page cache for copying the neostore since it might be on a block device. + int pageSize = pageCache.pageSize(); + try ( PagedFile fromFile = pageCache.map( sourceFile, pageSize ); + PagedFile toFile = pageCache.map( targetFile, pageSize, StandardOpenOption.CREATE ); + PageCursor fromCursor = fromFile.io( 0L, PagedFile.PF_SHARED_READ_LOCK ); + PageCursor toCursor = toFile.io( 0L, PagedFile.PF_SHARED_WRITE_LOCK ) ) + { + while ( fromCursor.next() ) + { + toCursor.next(); + do + { + fromCursor.copyTo( 0, toCursor, 0, pageSize ); + } + while ( fromCursor.shouldRetry() ); + } + } + } + private class BatchImporterProgressMonitor extends CoarseBoundedProgressExecutionMonitor { private final MigrationProgressMonitor.Section progressMonitor; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsProcessor.java index d9db906aa4dc8..30cd87b9acc42 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsProcessor.java @@ -40,18 +40,7 @@ public class NodeCountsProcessor implements RecordProcessor private final CountsAccessor.Updater counts; private final int anyLabel; - public NodeCountsProcessor( NodeStore nodeStore, NodeLabelsCache cache, int highLabelId, - CountsAccessor.Updater counts ) - { - this.nodeStore = nodeStore; - this.cache = cache; - this.anyLabel = highLabelId; - this.counts = counts; - // Instantiate with high id + 1 since we need that extra slot for the ANY count - this.labelCounts = new long[highLabelId + 1]; - } - - public NodeCountsProcessor( NodeStore nodeStore, NodeLabelsCache cache, int highLabelId, + NodeCountsProcessor( NodeStore nodeStore, NodeLabelsCache cache, int highLabelId, CountsAccessor.Updater counts, MigrationProgressMonitor.Section progressMonitor ) { this.nodeStore = nodeStore; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index a5cf787ecf73c..cc755cd1a9544 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -36,15 +36,15 @@ */ public class NodeCountsStage extends Stage { - public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore nodeStore, - int highLabelId, CountsAccessor.Updater countsUpdater, MigrationProgressMonitor.Section progressMonitor, - StatsProvider... - additionalStatsProviders ) + public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore nodeStore, int highLabelId, + CountsAccessor.Updater countsUpdater, MigrationProgressMonitor.Section progressMonitor, + StatsProvider... additionalStatsProviders ) { super( "Node counts", config ); add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) ); add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) ); - add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( - nodeStore, cache, highLabelId, countsUpdater, progressMonitor ), true, additionalStatsProviders ) ); + add( new RecordProcessorStep<>( control(), "COUNT", config, + new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater, progressMonitor ), true, + additionalStatsProviders ) ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorIT.java index a85168d9d72a0..ac13d57a47b17 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorIT.java @@ -151,13 +151,15 @@ public void shouldBeAbleToResumeMigrationOnRebuildingCounts() throws Exception fs.mkdirs( migrationDir ); migrator.migrate( storeDirectory, migrationDir, progressMonitor.startSection( "section" ), versionToMigrateFrom, upgradableDatabase.currentVersion() ); - migrator.moveMigratedFiles( migrationDir, storeDirectory, versionToMigrateFrom, - upgradableDatabase.currentVersion() ); // WHEN simulating resuming the migration progressMonitor = new SilentMigrationProgressMonitor(); - migrator = new StoreMigrator( fs, pageCache, CONFIG, logService ); - migrator.rebuildCounts( storeDirectory, progressMonitor, versionToMigrateFrom, + CountsMigrator countsMigrator = new CountsMigrator( fs, pageCache, CONFIG, logService ); + countsMigrator.migrate( storeDirectory, migrationDir, progressMonitor.startSection( "section" ), + versionToMigrateFrom, upgradableDatabase.currentVersion() ); + migrator.moveMigratedFiles( migrationDir, storeDirectory, versionToMigrateFrom, + upgradableDatabase.currentVersion() ); + countsMigrator.moveMigratedFiles( migrationDir, storeDirectory, versionToMigrateFrom, upgradableDatabase.currentVersion() ); // THEN starting the new store should be successful diff --git a/community/neo4j/src/test/java/upgrade/StoreUpgraderTest.java b/community/neo4j/src/test/java/upgrade/StoreUpgraderTest.java index bfafcd1566af0..b981a9c570495 100644 --- a/community/neo4j/src/test/java/upgrade/StoreUpgraderTest.java +++ b/community/neo4j/src/test/java/upgrade/StoreUpgraderTest.java @@ -63,6 +63,7 @@ import org.neo4j.kernel.impl.storemigration.monitoring.SilentMigrationProgressMonitor; import org.neo4j.kernel.impl.storemigration.monitoring.VisibleMigrationProgressMonitor; import org.neo4j.kernel.impl.storemigration.participant.AbstractStoreMigrationParticipant; +import org.neo4j.kernel.impl.storemigration.participant.CountsMigrator; import org.neo4j.kernel.impl.storemigration.participant.SchemaIndexMigrator; import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator; import org.neo4j.kernel.monitoring.Monitors; @@ -364,7 +365,6 @@ public void upgradeShouldNotLeaveLeftoverAndMigrationDirs() throws Exception public void upgradeShouldGiveProgressMonitorProgressMessages() throws Exception { // Given - fileSystem.deleteFile( new File( dbDirectory, INTERNAL_LOG_FILE ) ); PageCache pageCache = pageCacheRule.getPageCache( fileSystem ); UpgradableDatabase upgradableDatabase = new UpgradableDatabase( fileSystem, new StoreVersionCheck( pageCache ), getRecordFormats() ); @@ -377,8 +377,7 @@ public void upgradeShouldGiveProgressMonitorProgressMessages() throws Exception // Then logProvider.assertContainsLogCallContaining( "Store files" ); logProvider.assertContainsLogCallContaining( "Indexes" ); - logProvider.assertContainsLogCallContaining( "node count" ); - logProvider.assertContainsLogCallContaining( "relationship count" ); + logProvider.assertContainsLogCallContaining( "Count rebuilding" ); logProvider.assertContainsLogCallContaining( "Successfully finished" ); } @@ -447,12 +446,14 @@ private StoreUpgrader newUpgrader( UpgradableDatabase upgradableDatabase, PageCa { NullLogService instance = NullLogService.getInstance(); StoreMigrator defaultMigrator = new StoreMigrator( fileSystem, pageCache, getTuningConfig(), instance ); + CountsMigrator countsMigrator = new CountsMigrator( fileSystem, pageCache, getTuningConfig(), instance ); SchemaIndexMigrator indexMigrator = new SchemaIndexMigrator( fileSystem, schemaIndexProvider ); StoreUpgrader upgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fileSystem, pageCache, NullLogProvider.getInstance() ); upgrader.addParticipant( indexMigrator ); upgrader.addParticipant( defaultMigrator ); + upgrader.addParticipant( countsMigrator ); return upgrader; } diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorTest.java index a973b187c1d26..bfadcdb162d8f 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigratorTest.java @@ -24,13 +24,16 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier; -import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.NullLogService; @@ -39,12 +42,12 @@ import org.neo4j.kernel.impl.store.format.highlimit.v300.HighLimitV3_0_0; import org.neo4j.kernel.impl.storemigration.StoreVersionCheck; import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result; -import org.neo4j.kernel.impl.storemigration.legacylogs.LegacyLogs; import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; import org.neo4j.logging.NullLog; import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.rule.TestDirectory; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -101,23 +104,34 @@ public void shouldNotDoActualStoreMigrationBetween3_0_5_and_next() throws Except public void detectObsoleteCountStoresToRebuildDuringMigration() throws IOException { TestStoreMigrator storeMigrator = new TestStoreMigrator( new DefaultFileSystemAbstraction(), - mock( PageCache.class ), Config.empty(), NullLogService.getInstance(), mock( LegacyLogs.class ) ); + mock( PageCache.class ), Config.empty(), NullLogService.getInstance() ); + Set actualVersions = new TreeSet<>(); + Set expectedVersions = new TreeSet<>( + Arrays.stream( StoreVersion.values() ).map( StoreVersion::versionString ) + .collect( Collectors.toSet() ) ); assertTrue( storeMigrator.countStoreRebuildRequired( StoreVersion.STANDARD_V2_3.versionString() ) ); + actualVersions.add( StoreVersion.STANDARD_V2_3.versionString() ); assertTrue( storeMigrator.countStoreRebuildRequired( StoreVersion.STANDARD_V3_0.versionString() ) ); + actualVersions.add( StoreVersion.STANDARD_V3_0.versionString() ); assertFalse( storeMigrator.countStoreRebuildRequired( StoreVersion.STANDARD_V3_2.versionString() ) ); + actualVersions.add( StoreVersion.STANDARD_V3_2.versionString() ); assertTrue( storeMigrator.countStoreRebuildRequired( StoreVersion.HIGH_LIMIT_V3_0_0.versionString() ) ); + actualVersions.add( StoreVersion.HIGH_LIMIT_V3_0_0.versionString() ); assertTrue( storeMigrator.countStoreRebuildRequired( StoreVersion.HIGH_LIMIT_V3_0_6.versionString() ) ); + actualVersions.add( StoreVersion.HIGH_LIMIT_V3_0_6.versionString() ); assertTrue( storeMigrator.countStoreRebuildRequired( StoreVersion.HIGH_LIMIT_V3_1_0.versionString() ) ); + actualVersions.add( StoreVersion.HIGH_LIMIT_V3_1_0.versionString() ); assertFalse( storeMigrator.countStoreRebuildRequired( StoreVersion.HIGH_LIMIT_V3_2_0.versionString() ) ); + actualVersions.add( StoreVersion.HIGH_LIMIT_V3_2_0.versionString() ); + assertEquals( expectedVersions, actualVersions ); } - private class TestStoreMigrator extends StoreMigrator + private class TestStoreMigrator extends CountsMigrator { - TestStoreMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Config config, LogService logService, - LegacyLogs legacyLogs ) + TestStoreMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Config config, LogService logService ) { - super( fileSystem, pageCache, config, logService, legacyLogs ); + super( fileSystem, pageCache, config, logService ); } @Override