Skip to content

Commit

Permalink
Sequentially consistent counts store snapshots
Browse files Browse the repository at this point in the history
Restructures the state transitioning of the key/value store backing the
counts store to ensure that the snapshots that get stored on rotation
contain all the transactions up until and including the transaction that
the rotation occurred at.
  • Loading branch information
thobe committed Feb 26, 2015
1 parent 6f0ff78 commit b94dd44
Show file tree
Hide file tree
Showing 56 changed files with 2,162 additions and 1,426 deletions.
Expand Up @@ -36,9 +36,11 @@
import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.StoreAccess;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
import org.neo4j.kernel.impl.store.record.PropertyKeyTokenRecord;
Expand Down Expand Up @@ -79,8 +81,20 @@ public ConsistencySummaryStatistics execute( DirectStoreAccess stores, StringLog
DiffRecordAccess records = recordAccess( stores.nativeStores() );
execute( stores, decorator, records, report );
ownerCheck.scanForOrphanChains( progressFactory );
countsBuilder.checkCounts( stores.nativeStores().getCounts(), new ConsistencyReporter( records, report ),
progressFactory );
CountsAccessor counts = stores.nativeStores().getCounts();
if ( counts instanceof CountsTracker )
{
CountsTracker tracker = (CountsTracker) counts;
try
{
tracker.start();
}
catch ( Exception e )
{
// let's hope it was already started :)
}
}
countsBuilder.checkCounts( counts, new ConsistencyReporter( records, report ), progressFactory );

if ( !summary.isConsistent() )
{
Expand Down
Expand Up @@ -121,7 +121,7 @@ public TransactionRepresentation representation( IdGenerator idGenerator, int ma
long lastCommittedTx, CountsTracker counts )
{
TransactionWriter writer = new TransactionWriter();
try ( CountsAccessor.Updater updater = counts.apply( lastCommittedTx + 1 ) )
try ( CountsAccessor.Updater updater = counts.apply( lastCommittedTx + 1 ).get() )
{
transactionData( new TransactionDataBuilder( writer, updater ), idGenerator );
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import org.neo4j.function.Function;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.LegacyIndexApplier.ProviderLookup;
import org.neo4j.kernel.impl.api.index.IndexingService;
Expand All @@ -29,7 +30,6 @@
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.CacheInvalidationTransactionApplier;
import org.neo4j.kernel.impl.transaction.command.HighIdTransactionApplier;
Expand All @@ -38,6 +38,7 @@
import org.neo4j.kernel.impl.transaction.command.NeoStoreTransactionApplier;
import org.neo4j.kernel.impl.transaction.state.PropertyLoader;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.impl.util.function.Optional;

/**
* Holistic application of {@link TransactionRepresentation transactions} onto the store. Includes application
Expand All @@ -55,6 +56,7 @@ public class TransactionRepresentationStoreApplier
private final ProviderLookup legacyIndexProviderLookup;
private final PropertyLoader propertyLoader;
private final IdOrderingQueue legacyIndexTransactionOrdering;
private final Function<CountsAccessor.Updater, NeoCommandHandler> handlerFactory;

public TransactionRepresentationStoreApplier(
IndexingService indexingService, LabelScanStore labelScanStore, NeoStore neoStore,
Expand All @@ -70,6 +72,7 @@ public TransactionRepresentationStoreApplier(
this.indexConfigStore = indexConfigStore;
this.legacyIndexTransactionOrdering = legacyIndexTransactionOrdering;
this.propertyLoader = new PropertyLoader( neoStore );
this.handlerFactory = new CommandHandlerFactory( neoStore );
}

public void apply( TransactionRepresentation representation, LockGroup locks,
Expand Down Expand Up @@ -110,7 +113,28 @@ public void apply( TransactionRepresentation representation, LockGroup locks,

private NeoCommandHandler getCountsStoreApplier( long transactionId, TransactionApplicationMode mode )
{
CountsTracker counts = neoStore.getCounts();
return new CountsStoreApplier( counts.apply( transactionId ), neoStore.getNodeStore() );
Optional<NeoCommandHandler> handlerOption = neoStore.getCounts().apply( transactionId ).map( handlerFactory );
if ( mode == TransactionApplicationMode.RECOVERY )
{
handlerOption = handlerOption.or( NeoCommandHandler.EMPTY );
}
return handlerOption.get();
}

private static class CommandHandlerFactory implements Function<CountsAccessor.Updater, NeoCommandHandler>
{
private final NeoStore neoStore;

CommandHandlerFactory( NeoStore neoStore )
{
this.neoStore = neoStore;
}

@Override
public NeoCommandHandler apply( CountsAccessor.Updater updater )
throws RuntimeException
{
return new CountsStoreApplier( updater, neoStore.getNodeStore() );
}
}
}
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.locking;

import java.util.concurrent.locks.Lock;

public class LockWrapper implements AutoCloseable
{
public static LockWrapper readLock( java.util.concurrent.locks.ReadWriteLock lock )
Expand All @@ -31,7 +33,7 @@ public static LockWrapper writeLock( java.util.concurrent.locks.ReadWriteLock lo
return new LockWrapper( lock.writeLock() );
}

private final java.util.concurrent.locks.Lock lock;
private java.util.concurrent.locks.Lock lock;

public LockWrapper( java.util.concurrent.locks.Lock lock )
{
Expand All @@ -41,6 +43,15 @@ public LockWrapper( java.util.concurrent.locks.Lock lock )
@Override
public void close()
{
lock.unlock();
if ( lock != null )
{
lock.unlock();
lock = null;
}
}

public Lock get()
{
return lock;
}
}
Expand Up @@ -19,10 +19,8 @@
*/
package org.neo4j.kernel.impl.store;

import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider;
import org.neo4j.kernel.impl.store.kvstore.DataInitializer;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.NodeCountsProcessor;
import org.neo4j.unsafe.impl.batchimport.NodeStoreProcessorStage;
Expand All @@ -32,57 +30,65 @@

import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseDynamicExecution;

public class CountsComputer
public class CountsComputer implements DataInitializer<CountsAccessor.Updater>
{
public static void computeCounts( GraphDatabaseAPI api )
public static void recomputeCounts( NeoStore stores )
{
computeCounts( api.getDependencyResolver().resolveDependency( NeoStoreProvider.class ).evaluate() );
}

public static void computeCounts( NeoStore stores )
{
computeCounts( stores.getNodeStore(), stores.getRelationshipStore(), stores.getCounts(),
(int)stores.getLabelTokenStore().getHighId(), (int)stores.getRelationshipTypeTokenStore().getHighId() );
}

public static void computeCounts( NodeStore nodeStore, RelationshipStore relationshipStore,
CountsTracker countsTracker, int highLabelId, int highRelationshipTypeId )
{
new CountsComputer( nodeStore, relationshipStore, countsTracker,
highLabelId, highRelationshipTypeId ).rebuildCounts();
try ( CountsAccessor.Updater countsUpdater = stores.getCounts().reset() )
{
new CountsComputer( stores ).initialize( countsUpdater );
}
}

private final NodeStore nodes;
private final RelationshipStore relationships;
private final CountsTracker countsTracker;
private final int highLabelId;
private final int highRelationshipTypeId;
private final long lastCommittedTransactionId;

public CountsComputer( NeoStore stores )
{
this( stores.getLastCommittedTransactionId(),
stores.getNodeStore(), stores.getRelationshipStore(),
(int) stores.getLabelTokenStore().getHighId(),
(int) stores.getRelationshipTypeTokenStore().getHighId() );
}

public CountsComputer( NodeStore nodes, RelationshipStore relationships, CountsTracker countsTracker,
int highLabelId, int highRelationshipTypeId )
public CountsComputer( long lastCommittedTransactionId, NodeStore nodes, RelationshipStore relationships,
int highLabelId,
int highRelationshipTypeId )
{
this.lastCommittedTransactionId = lastCommittedTransactionId;
this.nodes = nodes;
this.relationships = relationships;
this.countsTracker = countsTracker;
this.highLabelId = highLabelId;
this.highRelationshipTypeId = highRelationshipTypeId;
}

public void rebuildCounts()
@Override
public void initialize( CountsAccessor.Updater countsUpdater )
{
NodeLabelsCache cache = new NodeLabelsCache( NumberArrayFactory.AUTO, highLabelId );
try ( CountsAccessor.Updater countsUpdater = countsTracker.reset() )
try
{
// Count nodes
superviseDynamicExecution( new NodeStoreProcessorStage( "COUNT NODES", Configuration.DEFAULT, nodes,
new NodeCountsProcessor( nodes, cache, highLabelId, countsUpdater ) ) );
new NodeCountsProcessor( nodes, cache, highLabelId,
countsUpdater ) ) );
// Count relationships
superviseDynamicExecution( new RelationshipCountsStage( Configuration.DEFAULT, cache, relationships,
highLabelId, highRelationshipTypeId, countsUpdater ) );
highLabelId, highRelationshipTypeId,
countsUpdater ) );
}
finally
{
cache.close();
}
}

@Override
public long initialVersion()
{
return lastCommittedTransactionId;
}
}
Expand Up @@ -35,7 +35,9 @@
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.kvstore.DataInitializer;
import org.neo4j.kernel.impl.store.record.NeoStoreRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
Expand Down Expand Up @@ -176,6 +178,21 @@ protected void triggered( Void event )
lastCommittedTx, lastClosedTx ) );
}
};
counts.setInitializer( new DataInitializer<CountsAccessor.Updater>()
{
@Override
public void initialize( CountsAccessor.Updater updater )
{
stringLogger.warn( "Missing counts store, rebuilding it." );
new CountsComputer( NeoStore.this ).initialize( updater );
}

@Override
public long initialVersion()
{
return getLastCommittedTransactionId();
}
} );
try
{
counts.init(); // TODO: move this to LifeCycle
Expand Down Expand Up @@ -967,26 +984,5 @@ public void rebuildCountStoreIfNeeded() throws IOException
{
// TODO: move this to LifeCycle
counts.start();
if ( counts.txId() == -1 )
{
stringLogger.warn( "Missing counts store, rebuilding it." );
try
{
CountsComputer.computeCounts( this );
counts.rotate( getLastCommittedTransactionId() );
}
catch ( Throwable failure )
{
try
{
counts.shutdown();
}
catch ( Throwable err )
{
failure.addSuppressed( err );
}
throw failure;
}
}
}
}

0 comments on commit b94dd44

Please sign in to comment.