Skip to content

Commit

Permalink
Uses import tool counts computer for rebuilding missing counts store
Browse files Browse the repository at this point in the history
This is the main purpose of this commit. However a couple of related
things have also been made in here to improve performance around counts
rebuilding and convenience in the code:

- Parallelized some aspects of counts computer from the import tool.
  Basically the node label are counted and efficiently cached, so that
  the relationship counts can be processed sequentially, using the
  node-label cache. Relationship counting is parallelized into
  reading(single) and counting(multi). Each thread keeping local counts,
  aggregated in the end.
- ProducerStep has been simplified. Less code, more generic and easier to
  extend.
- Convenience around executing and supervising stages by the introduced
  ExecutorSupervisors.

Localy measurements show 4x improvement from before and scalability should
be linear. The limitation is the size of the node cache, which "should" be
OK given that you generally run larger stores on larger machines. Looking
ahead there will be multi-pass support, where parts of the store is
processed in each pass, will be added to the staging framework to remove
problems for huge stores on not-that-huge machines memory-wise.
  • Loading branch information
tinwelint committed Feb 19, 2015
1 parent 8c1a9fa commit cc2e843
Show file tree
Hide file tree
Showing 39 changed files with 704 additions and 379 deletions.
Expand Up @@ -20,65 +20,68 @@
package org.neo4j.kernel.impl.store;

import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.impl.api.CountsRecordState;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.NodeCountsProcessor;
import org.neo4j.unsafe.impl.batchimport.NodeStoreProcessorStage;
import org.neo4j.unsafe.impl.batchimport.RelationshipCountsStage;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;

import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseDynamicExecution;

public class CountsComputer
{
public static CountsRecordState computeCounts( GraphDatabaseAPI api )
public static void computeCounts( GraphDatabaseAPI api )
{
return computeCounts( api.getDependencyResolver().resolveDependency( NeoStoreProvider.class ).evaluate() );
computeCounts( api.getDependencyResolver().resolveDependency( NeoStoreProvider.class ).evaluate() );
}

public static CountsRecordState computeCounts( NeoStore stores )
public static void computeCounts( NeoStore stores )
{
return computeCounts( stores.getNodeStore(), stores.getRelationshipStore() );
computeCounts( stores.getNodeStore(), stores.getRelationshipStore(), stores.getCounts(),
(int)stores.getLabelTokenStore().getHighId(), (int)stores.getRelationshipTypeTokenStore().getHighId() );
}

public static CountsRecordState computeCounts( NodeStore nodeStore, RelationshipStore relationshipStore )
public static void computeCounts( NodeStore nodeStore, RelationshipStore relationshipStore,
CountsTracker countsTracker, int highLabelId, int highRelationshipTypeId )
{
final CountsRecordState result = new CountsRecordState();
new CountsComputer( nodeStore, relationshipStore ).update( result );
return result;
new CountsComputer( nodeStore, relationshipStore, countsTracker,
highLabelId, highRelationshipTypeId ).rebuildCounts();
}

private final NodeStore nodes;
private final RelationshipStore relationships;
private final CountsTracker countsTracker;
private final int highLabelId;
private final int highRelationshipTypeId;

public CountsComputer( NodeStore nodes, RelationshipStore relationships )
public CountsComputer( NodeStore nodes, RelationshipStore relationships, CountsTracker countsTracker,
int highLabelId, int highRelationshipTypeId )
{
this.nodes = nodes;
this.relationships = relationships;
this.countsTracker = countsTracker;
this.highLabelId = highLabelId;
this.highRelationshipTypeId = highRelationshipTypeId;
}

public void update( CountsRecordState target )
public void rebuildCounts()
{
// count nodes
for ( long id = 0, highId = nodes.getHighId(); id <= highId; id++ )
NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId );
try
{
NodeRecord record = nodes.forceGetRecord( id );
if ( record.inUse() )
{
target.addNode( labels( record ) );
}
// Count nodes
superviseDynamicExecution( new NodeStoreProcessorStage( "COUNT NODES", Configuration.DEFAULT, nodes,
new NodeCountsProcessor( nodes, cache, highLabelId, countsTracker ) ) );
// Count relationships
superviseDynamicExecution( new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships,
highLabelId, highRelationshipTypeId, countsTracker ) );
}
// count relationships
for ( long id = 0, highId = relationships.getHighId(); id <= highId; id++ )
finally
{
RelationshipRecord record = relationships.forceGetRecord( id );
if ( record.inUse() )
{
long[] startLabels = labels( nodes.forceGetRecord( record.getFirstNode() ) );
long[] endLabels = labels( nodes.forceGetRecord( record.getSecondNode() ) );
target.addRelationship( startLabels, record.getType(), endLabels );
}
cache.close();
}
}

private long[] labels( NodeRecord node )
{
return NodeLabelsField.get( node, nodes );
}
}
Expand Up @@ -43,7 +43,7 @@ public static abstract class Configuration
}

public static final String TYPE_DESCRIPTOR = "LabelTokenStore";
private static final int RECORD_SIZE = 1/*inUse*/ + 4/*nameId*/;
public static final int RECORD_SIZE = 1/*inUse*/ + 4/*nameId*/;

public LabelTokenStore(
File fileName,
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.record.NeoStoreRecord;
import org.neo4j.kernel.impl.store.record.Record;
Expand Down Expand Up @@ -973,11 +972,7 @@ public void rebuildCountStoreIfNeeded() throws IOException
stringLogger.warn( "Missing counts store, rebuilding it." );
try
{
try ( CountsAccessor.Updater updater = counts.updater() )
{
CountsComputer.computeCounts( nodeStore, relStore )
.accept( new CountsAccessor.Initializer( updater ) );
}
CountsComputer.computeCounts( this );
counts.rotate( getLastCommittedTransactionId() );
}
catch ( Throwable failure )
Expand Down
Expand Up @@ -45,7 +45,7 @@ public static abstract class Configuration
// Historical type descriptor, should be called PropertyKeyTokenStore
public static final String TYPE_DESCRIPTOR = "PropertyIndexStore";

private static final int RECORD_SIZE = 1/*inUse*/ + 4/*prop count*/ + 4/*nameId*/;
public static final int RECORD_SIZE = 1/*inUse*/ + 4/*prop count*/ + 4/*nameId*/;

public PropertyKeyTokenStore(
File fileName,
Expand Down
Expand Up @@ -45,7 +45,7 @@ public static abstract class Configuration
}

public static final String TYPE_DESCRIPTOR = "RelationshipTypeStore";
private static final int RECORD_SIZE = 1/*inUse*/ + 4/*nameId*/;
public static final int RECORD_SIZE = 1/*inUse*/ + 4/*nameId*/;

public RelationshipTypeTokenStore(
File fileName,
Expand Down
Expand Up @@ -35,6 +35,9 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.storemigration.StoreFile;
import org.neo4j.kernel.impl.storemigration.StoreFileType;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.monitoring.Monitors;

Expand Down Expand Up @@ -139,6 +142,11 @@ public File storeFileName( String toAppend )
return new File( neoStoreFileName.getPath() + toAppend );
}

public File storeFileName( StoreFile file, StoreFileType type )
{
return new File( neoStoreFileName.getParentFile(), file.fileName( type ) );
}

public NeoStore newNeoStore( boolean allowCreateEmpty )
{
boolean storeExists = storeExists();
Expand Down Expand Up @@ -621,6 +629,17 @@ private void createEmptyStore( File fileName, String typeAndVersionDescriptor, B
}
}

/**
* I.e. total number of used/unused records + 1
* @param storeFile {@link StoreFile} to get the name from.
* @param recordSize record size of that store.
* @return highId, i.e. an id one greater than the highest id in the store.
*/
public long getHighId( StoreFile storeFile, int recordSize ) throws IOException
{
return IdGeneratorImpl.readHighId( fileSystemAbstraction, storeFileName( storeFile, StoreFileType.ID ) );
}

public abstract static class Configuration
{
public static final Setting<Integer> string_block_size = GraphDatabaseSettings.string_block_size;
Expand Down
Expand Up @@ -197,6 +197,20 @@ public String idFileName()
return fileName( StoreFileType.ID );
}

/**
* @return the last part of the neostore filename, f.ex:
*
* <pre>
* neostore.nodestore.db
* | |
* <-this part-> (yes, including the leading dot)
* </pre>
*/
public String storeFileNamePart()
{
return storeFileNamePart;
}

public static Iterable<StoreFile> legacyStoreFilesForVersion( final String version )
{
Predicate<StoreFile> predicate = new Predicate<StoreFile>()
Expand Down
Expand Up @@ -43,15 +43,16 @@
import org.neo4j.kernel.DefaultIdGeneratorFactory;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.core.Token;
import org.neo4j.kernel.impl.store.CountsComputer;
import org.neo4j.kernel.impl.store.LabelTokenStore;
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.store.NeoStore.Position;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.PropertyKeyTokenStore;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.RelationshipTypeTokenStore;
import org.neo4j.kernel.impl.store.StoreFactory;
import org.neo4j.kernel.impl.store.StoreVersionMismatchHandler;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
Expand Down Expand Up @@ -316,11 +317,11 @@ private void rebuildCountsFromScratch(
{
CountsTracker tracker = life.add( new CountsTracker( logging.getMessagesLog( CountsTracker.class ),
fileSystem, pageCache, storeFileBase ) );
try ( CountsAccessor.Updater updater = tracker.updater() )
{
CountsComputer.computeCounts( nodeStore, relationshipStore )
.accept( new CountsAccessor.Initializer( updater ) );
}
CountsComputer.computeCounts( nodeStore, relationshipStore, tracker,
(int)storeFactory.getHighId( StoreFile.LABEL_TOKEN_STORE,
LabelTokenStore.RECORD_SIZE ),
(int)storeFactory.getHighId( StoreFile.RELATIONSHIP_TYPE_TOKEN_STORE,
RelationshipTypeTokenStore.RECORD_SIZE ) );
tracker.rotate( lastTxId );
}
}
Expand Down
Expand Up @@ -20,6 +20,10 @@

package org.neo4j.kernel.lifecycle;

/**
* Convenient use of a {@link LifeSupport}, effectively making one or more {@link Lifecycle} look and feel
* like one {@link AutoCloseable}.
*/
public class Lifespan implements AutoCloseable
{
private final LifeSupport life = new LifeSupport();
Expand Down
Expand Up @@ -74,7 +74,6 @@
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.extension.KernelExtensions;
import org.neo4j.kernel.extension.UnsatisfiedDependencyStrategies;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap;
import org.neo4j.kernel.impl.api.index.StoreScan;
Expand Down Expand Up @@ -136,6 +135,7 @@
import org.neo4j.kernel.monitoring.Monitors;

import static java.lang.Boolean.parseBoolean;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.map;
import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.helpers.collection.Iterables.map;
Expand Down Expand Up @@ -478,10 +478,8 @@ private void rebuildCounts()
{
throw new UnderlyingStorageException( e );
}
try ( CountsAccessor.Updater updater = counts.updater() )
{
CountsComputer.computeCounts( neoStore ).accept( new CountsAccessor.Initializer( updater ) );
}

CountsComputer.computeCounts( neoStore );
}

private class InitialNodeLabelCreationVisitor implements Visitor<NodeLabelUpdate, IOException>
Expand Down
Expand Up @@ -33,7 +33,7 @@ public class NodeStoreProcessorStep extends StoreProcessorStep<NodeRecord>
protected NodeStoreProcessorStep( StageControl control, String name, Configuration config, NodeStore nodeStore,
StoreProcessor<NodeRecord> processor )
{
super( control, name, config.batchSize(), config.movingAverageSize(), nodeStore, processor );
super( control, name, config.batchSize(), config.movingAverageSize(), nodeStore, processor, false );
this.nodeStore = nodeStore;
}

Expand Down

0 comments on commit cc2e843

Please sign in to comment.