Skip to content

Commit

Permalink
Removes validation of index updates before commit
Browse files Browse the repository at this point in the history
In preparation for removing upper limit number of nodes that can exist in
any given index.

This removes a bunch of code related to probing for and reserving space in
an index before committing a transaction and otherwise throwing a specific
IndexCapacityExceededException. All that is now gone and the commit
process and most things around index modifications are simpler.
  • Loading branch information
tinwelint committed Dec 21, 2015
1 parent 691e0ac commit b14d78b
Show file tree
Hide file tree
Showing 98 changed files with 441 additions and 2,734 deletions.
Expand Up @@ -43,7 +43,6 @@
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand Down Expand Up @@ -352,8 +351,7 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai
TransactionRepresentationCommitProcess commitProcess =
new TransactionRepresentationCommitProcess(
dependencyResolver.resolveDependency( TransactionAppender.class ),
dependencyResolver.resolveDependency( StorageEngine.class ),
dependencyResolver.resolveDependency( IndexUpdatesValidator.class ) );
dependencyResolver.resolveDependency( StorageEngine.class ) );
TransactionIdStore transactionIdStore = database.getDependencyResolver().resolveDependency(
TransactionIdStore.class );
NodeStore nodes = database.getDependencyResolver().resolveDependency( NeoStores.class ).getNodeStore();
Expand Down
Expand Up @@ -55,7 +55,6 @@
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.index.IndexCapacityExceededException;
import org.neo4j.kernel.api.exceptions.schema.IllegalTokenNameException;
import org.neo4j.kernel.api.exceptions.schema.TooManyLabelsException;
import org.neo4j.kernel.api.index.IndexAccessor;
Expand Down Expand Up @@ -409,7 +408,7 @@ public void shouldReportLabelScanStoreInconsistencies() throws Exception
}

private void write( LabelScanStore labelScanStore, Iterable<NodeLabelUpdate> nodeLabelUpdates )
throws IOException, IndexCapacityExceededException
throws IOException
{
try ( LabelScanWriter writer = labelScanStore.newWriter() )
{
Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand Down Expand Up @@ -395,8 +394,7 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai
TransactionRepresentationCommitProcess commitProcess =
new TransactionRepresentationCommitProcess(
dependencyResolver.resolveDependency( TransactionAppender.class ),
dependencyResolver.resolveDependency( StorageEngine.class ),
dependencyResolver.resolveDependency( IndexUpdatesValidator.class ) );
dependencyResolver.resolveDependency( StorageEngine.class ) );
TransactionIdStore transactionIdStore = database.getDependencyResolver().resolveDependency(
TransactionIdStore.class );
NodeStore nodes = database.getDependencyResolver().resolveDependency( NeoStores.class ).getNodeStore();
Expand Down
Expand Up @@ -61,7 +61,6 @@
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.index.IndexCapacityExceededException;
import org.neo4j.kernel.api.exceptions.schema.IllegalTokenNameException;
import org.neo4j.kernel.api.exceptions.schema.TooManyLabelsException;
import org.neo4j.kernel.api.index.IndexAccessor;
Expand All @@ -74,8 +73,8 @@
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.annotations.Documented;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
Expand All @@ -91,7 +90,6 @@
import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
import org.neo4j.kernel.impl.store.record.NeoStoreRecord;
import org.neo4j.kernel.impl.store.record.UniquePropertyConstraintRule;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
Expand All @@ -100,6 +98,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.store.record.RelationshipTypeTokenRecord;
import org.neo4j.kernel.impl.store.record.SchemaRule;
import org.neo4j.kernel.impl.store.record.UniquePropertyConstraintRule;
import org.neo4j.kernel.impl.util.Bits;
import org.neo4j.kernel.impl.util.MutableInteger;
import org.neo4j.logging.FormattedLog;
Expand Down Expand Up @@ -444,7 +443,7 @@ public void shouldReportLabelScanStoreInconsistencies() throws Exception
}

private void write( LabelScanStore labelScanStore, Iterable<NodeLabelUpdate> nodeLabelUpdates )
throws IOException, IndexCapacityExceededException
throws IOException
{
try ( LabelScanWriter writer = labelScanStore.newWriter() )
{
Expand Down
Expand Up @@ -65,7 +65,6 @@
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.UpdateableSchemaState;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
Expand Down Expand Up @@ -467,7 +466,6 @@ public void start() throws IOException
KernelModule kernelModule = buildKernel(
transactionLogModule.transactionAppender(),
storageEngine.indexingService(),
storageEngine.indexUpdatesValidator(),
storageEngine.storeReadLayer(),
updateableSchemaState, storageEngine.labelScanStore(),
storageEngine );
Expand All @@ -482,7 +480,6 @@ public void start() throws IOException
dependencies.satisfyDependency( updateableSchemaState );
dependencies.satisfyDependency( storageEngine.cacheAccess() );
dependencies.satisfyDependency( storageEngine.indexingService() );
dependencies.satisfyDependency( storageEngine.indexUpdatesValidator() );
dependencies.satisfyDependency( storageEngine.integrityValidator() );
dependencies.satisfyDependency( storageEngine.labelScanStore() );
dependencies.satisfyDependency( storageEngine.metaDataStore() );
Expand Down Expand Up @@ -762,12 +759,12 @@ public void init() throws Throwable

private KernelModule buildKernel( TransactionAppender appender,
IndexingService indexingService,
IndexUpdatesValidator indexUpdatesValidator, StoreReadLayer storeLayer,
StoreReadLayer storeLayer,
UpdateableSchemaState updateableSchemaState, LabelScanStore labelScanStore,
StorageEngine storageEngine )
{
TransactionCommitProcess transactionCommitProcess = commitProcessFactory.create( appender, storageEngine,
indexUpdatesValidator, config );
config );

/*
* This is used by legacy indexes and constraint indexes whenever a transaction is to be spawned
Expand Down

This file was deleted.

Expand Up @@ -21,11 +21,9 @@

import java.io.IOException;

import org.neo4j.kernel.api.exceptions.index.IndexCapacityExceededException;
import org.neo4j.kernel.impl.api.index.SwallowingIndexUpdater;
import org.neo4j.kernel.impl.api.index.UpdateMode;

import static org.neo4j.register.Register.DoubleLong;
import org.neo4j.register.Register.DoubleLong;

/**
* Used for initial population of an index.
Expand Down Expand Up @@ -53,7 +51,7 @@ public interface IndexPopulator
* @param propertyValue property value for the entry to index.
*/
void add( long nodeId, Object propertyValue )
throws IndexEntryConflictException, IOException, IndexCapacityExceededException;
throws IndexEntryConflictException, IOException;

/**
* Verify constraints for all entries added so far.
Expand Down Expand Up @@ -95,7 +93,7 @@ void add( long nodeId, Object propertyValue )
* as {@link InternalIndexState#ONLINE} so that future invocations of its parent
* {@link SchemaIndexProvider#getInitialState(long)} also returns {@link InternalIndexState#ONLINE}.
*/
void close( boolean populationCompletedSuccessfully ) throws IOException, IndexCapacityExceededException;
void close( boolean populationCompletedSuccessfully ) throws IOException;

/**
* Called then a population failed. The failure string should be stored for future retrieval by
Expand Down
Expand Up @@ -22,27 +22,21 @@
import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.kernel.api.exceptions.index.IndexCapacityExceededException;

/**
* IndexUpdaters are responsible for updating indexes during the commit process. There is one new instance handling
* each commit, created from {@link org.neo4j.kernel.api.index.IndexAccessor}.
* IndexUpdaters are responsible for updating indexes during the commit process. There is one new instance handling
* each commit, created from {@link org.neo4j.kernel.api.index.IndexAccessor}.
*
* First {@link #validate(Iterable)} is called with the tentative changes, in order to allow the index to check if
* there is enough space left (Lucene has a limitation on nr of entries). Then {@link #process(NodePropertyUpdate)} is
* called for each entry, wherein the actual insert is performed.
* {@link #process(NodePropertyUpdate)} is called for each entry, wherein the actual updates are applied.
*
* Each IndexUpdater is not thread-safe, and is assumed to be instantiated per transaction.
* Each IndexUpdater is not thread-safe, and is assumed to be instantiated per transaction.
*/
public interface IndexUpdater extends AutoCloseable
{
Reservation validate( Iterable<NodePropertyUpdate> updates ) throws IOException, IndexCapacityExceededException;

void process( NodePropertyUpdate update )
throws IOException, IndexEntryConflictException, IndexCapacityExceededException;
void process( NodePropertyUpdate update ) throws IOException, IndexEntryConflictException;

@Override
void close() throws IOException, IndexEntryConflictException, IndexCapacityExceededException;
void close() throws IOException, IndexEntryConflictException;

void remove( PrimitiveLongSet nodeIds ) throws IOException;
}
Expand Up @@ -24,7 +24,6 @@

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.kernel.api.direct.AllEntriesLabelScanReader;
import org.neo4j.kernel.api.exceptions.index.IndexCapacityExceededException;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.unsafe.batchinsert.LabelScanWriter;
Expand Down Expand Up @@ -71,7 +70,7 @@ public interface LabelScanStore extends Lifecycle
* Starts the store. After this has been called updates can be processed.
*/
@Override
void start() throws IOException, IndexCapacityExceededException;
void start() throws IOException;

@Override
void stop() throws IOException;
Expand Down
Expand Up @@ -20,12 +20,10 @@
package org.neo4j.kernel.impl.api;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;

public interface CommitProcessFactory
{
TransactionCommitProcess create( TransactionAppender appender, StorageEngine storageEngine,
IndexUpdatesValidator indexUpdatesValidator, Config config );
TransactionCommitProcess create( TransactionAppender appender, StorageEngine storageEngine, Config config );
}
Expand Up @@ -20,39 +20,30 @@
package org.neo4j.kernel.impl.api;

import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.StoreApplyEvent;

import static org.neo4j.kernel.api.exceptions.Status.Transaction.CouldNotCommit;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.CouldNotWriteToLog;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.ValidationFailed;
import static org.neo4j.kernel.impl.api.TransactionToApply.TRANSACTION_ID_NOT_SPECIFIED;

public class TransactionRepresentationCommitProcess implements TransactionCommitProcess
{
private final TransactionAppender appender;
private final StorageEngine storageEngine;
private final IndexUpdatesValidator indexUpdatesValidator;

public TransactionRepresentationCommitProcess( TransactionAppender appender,
StorageEngine storageEngine, IndexUpdatesValidator indexUpdatesValidator )
public TransactionRepresentationCommitProcess( TransactionAppender appender, StorageEngine storageEngine )
{
this.appender = appender;
this.storageEngine = storageEngine;
this.indexUpdatesValidator = indexUpdatesValidator;
}

@Override
public long commit( TransactionToApply batch, CommitEvent commitEvent,
TransactionApplicationMode mode ) throws TransactionFailureException
{
validateIndexUpdatesBeforeCommit( batch );
long lastTxId = appendToLog( batch, commitEvent );
try
{
Expand All @@ -65,57 +56,6 @@ public long commit( TransactionToApply batch, CommitEvent commitEvent,
}
}

private void validateIndexUpdatesBeforeCommit( TransactionToApply batch ) throws TransactionFailureException
{
if ( batch.transactionId() == TRANSACTION_ID_NOT_SPECIFIED )
{
// A normal commit, i.e. new transaction data that is to be committed in db/cluster for the first time
// For the moment this only supports a single transaction, since:
// - we must validate the index updates before each transaction in order to know whether or not
// it can be committed. This will change when busting the lucene limits.
// - each validation depends upon the fact that all previous transactions have been applied
// to the store. This will change when we write index commands to the log directly instead of inferring.
if ( batch.next() != null )
{
throw new UnsupportedOperationException(
"For the time being we only support a single previously uncommitted transaction " +
"to be committed at a time. Batching is fine when replicating transactions currently. " +
"This problem will go away when we bust the id limits" );
}

while ( batch != null )
{
batch.validatedIndexUpdates( validateIndexUpdates( batch.transactionRepresentation() ) );
batch = batch.next();
}
}
else
{
// We will do it as part of applying transactions. Reason is that the batching of transactions
// arose from HA environment where slaves applies transactions in batches. In that environment
// those transactions had already been committed and didn't need to be validated before committing
// on the slave (i.e. here). The second reason is that validating index updates means translating
// physical record updates to logical index commands. In that process there's some amount of reading
// from store and that reading might fail or read stale data if we would validate all transactions
// in this batch before any of them would have been applied. Weird, huh?
// This will change when ValidatedIndexUpdates goes away (busting the lucene id limits)
// so don't worry too much about this.
}
}

private ValidatedIndexUpdates validateIndexUpdates( TransactionRepresentation transactionRepresentation )
throws TransactionFailureException
{
try
{
return indexUpdatesValidator.validate( transactionRepresentation );
}
catch ( Throwable e )
{
throw new TransactionFailureException( ValidationFailed, e, "Validation of index updates failed" );
}
}

private long appendToLog( TransactionToApply batch, CommitEvent commitEvent ) throws TransactionFailureException
{
try ( LogAppendEvent logAppendEvent = commitEvent.beginLogAppend() )
Expand Down

0 comments on commit b14d78b

Please sign in to comment.