Skip to content

Commit

Permalink
Add monitoring of count rebuilding process
Browse files Browse the repository at this point in the history
  • Loading branch information
ragadeeshu committed Jul 5, 2017
1 parent 482eb3b commit b28a467
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.kvstore.DataInitializer;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.SilentMigrationProgressMonitor;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.NodeCountsStage;
import org.neo4j.unsafe.impl.batchimport.RelationshipCountsStage;
Expand All @@ -48,6 +50,7 @@ public static void recomputeCounts( NeoStores stores )
private final int highLabelId;
private final int highRelationshipTypeId;
private final long lastCommittedTransactionId;
private final MigrationProgressMonitor progressMonitor;

public CountsComputer( NeoStores stores )
{
Expand All @@ -58,14 +61,21 @@ public CountsComputer( NeoStores stores )
}

public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
int highLabelId,
int highRelationshipTypeId )
int highLabelId, int highRelationshipTypeId )
{
this( lastCommittedTransactionId, nodes, relationships, highLabelId, highRelationshipTypeId,
new SilentMigrationProgressMonitor() );
}

public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
int highLabelId, int highRelationshipTypeId, MigrationProgressMonitor progressMonitor )
{
this.lastCommittedTransactionId = lastCommittedTransactionId;
this.nodes = nodes;
this.relationships = relationships;
this.highLabelId = highLabelId;
this.highRelationshipTypeId = highRelationshipTypeId;
this.progressMonitor = progressMonitor;
}

@Override
Expand All @@ -75,12 +85,19 @@ public void initialize( CountsAccessor.Updater countsUpdater )
try
{
// Count nodes
MigrationProgressMonitor.Section nodeSection = progressMonitor.startSection( "node counting" );
nodeSection.start( nodes.getHighestPossibleIdInUse() );
superviseDynamicExecution(
new NodeCountsStage( Configuration.DEFAULT, cache, nodes, highLabelId, countsUpdater ) );
new NodeCountsStage( Configuration.DEFAULT, cache, nodes, highLabelId, countsUpdater,
nodeSection ) );
nodeSection.completed();
// Count relationships
MigrationProgressMonitor.Section relationshipSection = progressMonitor
.startSection( "relationship counting" );
relationshipSection.start( relationships.getHighestPossibleIdInUse() );
superviseDynamicExecution(
new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships, highLabelId,
highRelationshipTypeId, countsUpdater, AUTO ) );
highRelationshipTypeId, countsUpdater, AUTO, relationshipSection ) );
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ void moveMigratedFiles( File migrationDir, File storeDir, String versionToMigrat
/**
* After a successful migration, and having moved all affected files from {@code upgradeDirectory} over to
* the {@code workingDirectory}, this will rebuild counts if needed.
* @param storeDir directory the store directory of the to move the migrated files to.
* @param progressMonitor
* @param versionToMigrateFrom the version we have migrated from
* @param versionToMigrateTo
* @throws IOException if unable to move one or more files.
*/
void rebuildCounts( File storeDir, String versionToMigrateFrom, String versionToMigrateTo ) throws IOException;
* @param versionToMigrateTo @throws IOException if unable to move one or more files.
* */
void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, String versionToMigrateTo ) throws IOException;

/**
* Delete any file from {@code migrationDir} produced during migration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.kernel.impl.storemigration;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -216,7 +215,8 @@ private void rebuildCountsInStoreDirectory( List<StoreMigrationParticipant> part
{
for ( StoreMigrationParticipant participant : participants )
{
participant.rebuildCounts( storeDirectory, versionToMigrateFrom, upgradableDatabase.currentVersion() );
participant.rebuildCounts( storeDirectory, progressMonitor, versionToMigrateFrom,
upgradableDatabase.currentVersion() );
}
}
catch ( IOException e )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
}

@Override
public void rebuildCounts( File storeDir, String versionToMigrateFrom, String versionToMigrateTo ) throws IOException
public void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, String versionToMigrateTo ) throws IOException
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public void moveMigratedFiles( File migrationDir, File storeDir, String versionT
}

@Override
public void rebuildCounts( File storeDir, String versionToMigrateFrom, String versionToMigrateTo ) throws
public void rebuildCounts( File storeDir, MigrationProgressMonitor progressMonitor, String versionToMigrateFrom, String versionToMigrateTo ) throws
IOException
{
if ( countStoreRebuildRequired( versionToMigrateFrom ) )
Expand All @@ -700,7 +700,7 @@ public void rebuildCounts( File storeDir, String versionToMigrateFrom, String ve
countsStoreFiles, true, null, StoreFileType.STORE );
File neoStore = new File( storeDir, DEFAULT_NAME );
long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID );
rebuildCountsFromScratch( storeDir, lastTxId, versionToMigrateTo, pageCache );
rebuildCountsFromScratch( storeDir, lastTxId, progressMonitor, versionToMigrateTo, pageCache );
}
}

Expand All @@ -713,8 +713,8 @@ boolean countStoreRebuildRequired( String versionToMigrateFrom )
StoreVersion.HIGH_LIMIT_V3_1_0.versionString().equals( versionToMigrateFrom );
}

private void rebuildCountsFromScratch( File storeDir, long lastTxId, String versionToMigrateTo,
PageCache pageCache )
private void rebuildCountsFromScratch( File storeDir, long lastTxId, MigrationProgressMonitor progressMonitor,
String versionToMigrateTo, PageCache pageCache )
{
final File storeFileBase = new File( storeDir, MetaDataStore.DEFAULT_NAME + StoreFactory.COUNTS_STORE );

Expand All @@ -730,7 +730,7 @@ private void rebuildCountsFromScratch( File storeDir, long lastTxId, String vers
int highLabelId = (int) neoStores.getLabelTokenStore().getHighId();
int highRelationshipTypeId = (int) neoStores.getRelationshipTypeTokenStore().getHighId();
CountsComputer initializer = new CountsComputer( lastTxId, nodeStore, relationshipStore, highLabelId,
highRelationshipTypeId );
highRelationshipTypeId, progressMonitor );
life.add( new CountsTracker( logService.getInternalLogProvider(), fileSystem, pageCache, config,
storeFileBase ).setInitializer( initializer ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import javax.swing.*;

import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.NodeLabelsField;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;

/**
Expand All @@ -34,6 +37,7 @@ public class NodeCountsProcessor implements RecordProcessor<NodeRecord>
{
private final NodeStore nodeStore;
private final long[] labelCounts;
private MigrationProgressMonitor.Section progressMonitor;
private final NodeLabelsCache cache;
private final CountsAccessor.Updater counts;
private final int anyLabel;
Expand All @@ -49,6 +53,18 @@ public NodeCountsProcessor( NodeStore nodeStore, NodeLabelsCache cache, int high
this.labelCounts = new long[highLabelId + 1];
}

public NodeCountsProcessor( NodeStore nodeStore, NodeLabelsCache cache, int highLabelId,
CountsAccessor.Updater counts, MigrationProgressMonitor.Section progressMonitor )
{
this.nodeStore = nodeStore;
this.cache = cache;
this.anyLabel = highLabelId;
this.counts = counts;
// Instantiate with high id + 1 since we need that extra slot for the ANY count
this.labelCounts = new long[highLabelId + 1];
this.progressMonitor = progressMonitor;
}

@Override
public boolean process( NodeRecord node )
{
Expand All @@ -62,6 +78,7 @@ public boolean process( NodeRecord node )
cache.put( node.getId(), labels );
}
labelCounts[anyLabel]++;
progressMonitor.progress( 1 );

// No need to update the store, we're just reading things here
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
Expand All @@ -36,12 +37,14 @@
public class NodeCountsStage extends Stage
{
public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore nodeStore,
int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders )
int highLabelId, CountsAccessor.Updater countsUpdater, MigrationProgressMonitor.Section progressMonitor,
StatsProvider...
additionalStatsProviders )
{
super( "Node counts", config );
add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
nodeStore, cache, highLabelId, countsUpdater, progressMonitor ), true, additionalStatsProviders ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.SilentMigrationProgressMonitor;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.unsafe.impl.batchimport.cache.GatheringMemoryStatsVisitor;
Expand All @@ -66,7 +68,6 @@
import static java.lang.Long.max;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure;
Expand Down Expand Up @@ -229,14 +230,16 @@ public void doImport( Input input ) throws IOException
neoStore, highNodeId );

// Count nodes per label and labels per node
MigrationProgressMonitor progressMonitor = new SilentMigrationProgressMonitor();
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache );
executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(),
neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) );
neoStore.getLabelRepository().getHighId(), countsUpdater, progressMonitor.startSection( "Nodes" ),
memoryUsageStats ) );
// Count label-[type]->label
executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore,
neoStore.getLabelRepository().getHighId(),
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) );
neoStore.getLabelRepository().getHighId(), neoStore.getRelationshipTypeRepository().getHighId(),
countsUpdater, AUTO, progressMonitor.startSection( "Relationships" ) ) );

// We're done, do some final logging about it
long totalTimeMillis = currentTimeMillis() - startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
Expand All @@ -42,17 +43,20 @@ public class ProcessRelationshipCountsDataStep extends ProcessorStep<Relationshi
private final int highRelationshipTypeId;
private final CountsAccessor.Updater countsUpdater;
private final NumberArrayFactory cacheFactory;
private final MigrationProgressMonitor.Section progressMonitor;

public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache cache,
Configuration config, int highLabelId, int highRelationshipTypeId,
CountsAccessor.Updater countsUpdater, NumberArrayFactory cacheFactory )
public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache cache, Configuration config, int
highLabelId, int highRelationshipTypeId,
CountsAccessor.Updater countsUpdater, NumberArrayFactory cacheFactory,
MigrationProgressMonitor.Section progressMonitor )
{
super( control, "COUNT", config, 0 );
this.cache = cache;
this.highLabelId = highLabelId;
this.highRelationshipTypeId = highRelationshipTypeId;
this.countsUpdater = countsUpdater;
this.cacheFactory = cacheFactory;
this.progressMonitor = progressMonitor;
}

@Override
Expand All @@ -64,6 +68,7 @@ protected void process( RelationshipRecord[] batch, BatchSender sender )
RelationshipRecord relationship = batch[i];
processor.process( relationship.getFirstNode(), relationship.getType(), relationship.getSecondNode() );
}
progressMonitor.progress( batch.length );
}

private RelationshipCountsProcessor processor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
Expand All @@ -35,16 +36,15 @@
*/
public class RelationshipCountsStage extends Stage
{
public RelationshipCountsStage( Configuration config, NodeLabelsCache cache,
RelationshipStore relationshipStore,
public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, RelationshipStore relationshipStore,
int highLabelId, int highRelationshipTypeId, CountsAccessor.Updater countsUpdater,
NumberArrayFactory cacheFactory )
NumberArrayFactory cacheFactory, MigrationProgressMonitor.Section progressMonitor )
{
super( "Relationship counts", config );
add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ),
relationshipStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, relationshipStore, null ) );
add( new ProcessRelationshipCountsDataStep( control(), cache, config,
highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory ) );
highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory, progressMonitor ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void shouldBeAbleToResumeMigrationOnRebuildingCounts() throws Exception
// WHEN simulating resuming the migration
progressMonitor = new SilentMigrationProgressMonitor();
migrator = new StoreMigrator( fs, pageCache, CONFIG, logService );
migrator.rebuildCounts( storeDirectory, versionToMigrateFrom, upgradableDatabase.currentVersion() );
migrator.rebuildCounts( storeDirectory, progressMonitor, versionToMigrateFrom,
upgradableDatabase.currentVersion() );

// THEN starting the new store should be successful
StoreFactory storeFactory =
Expand Down

0 comments on commit b28a467

Please sign in to comment.