Skip to content

Commit

Permalink
Creating uniqueness constraint no longer holds SCHEMA lock
Browse files Browse the repository at this point in the history
while populating the index. From POV of creating the uniqueness constraint
the transaction will still block and potentially fail if the constraint
couldn't be validated on existing data and concurrent updates. The big difference
is that the schema write lock isn't held while populating the constraint index.
This allows other transactions modifying these properties and labels to
progress and complete normally.
  • Loading branch information
tinwelint committed Feb 17, 2017
1 parent 5b326e0 commit 4c1fd59
Show file tree
Hide file tree
Showing 39 changed files with 273 additions and 156 deletions.
Expand Up @@ -29,6 +29,7 @@


import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexConfiguration; import org.neo4j.kernel.api.index.IndexConfiguration;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.index.InternalIndexState;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
Expand Down Expand Up @@ -78,7 +79,9 @@ public IndexAccessors( SchemaIndexProvider provider,
{ {
long indexId = indexRule.getId(); long indexId = indexRule.getId();
IndexConfiguration indexConfig = IndexConfiguration.of( indexRule ); IndexConfiguration indexConfig = IndexConfiguration.of( indexRule );
accessors.put( indexId, provider.getOnlineAccessor( indexId, indexConfig, samplingConfig ) ); accessors.put( indexId, provider.getOnlineAccessor( indexId,
new IndexDescriptor( indexRule.getLabel(), indexRule.getPropertyKey() ),
indexConfig, samplingConfig ) );
} }
} }


Expand Down
Expand Up @@ -592,7 +592,8 @@ public void shouldReportNodesThatAreNotIndexed() throws Exception
for ( IndexRule indexRule : loadAllIndexRules( fixture.directStoreAccess().nativeStores().getSchemaStore() ) ) for ( IndexRule indexRule : loadAllIndexRules( fixture.directStoreAccess().nativeStores().getSchemaStore() ) )
{ {
IndexAccessor accessor = fixture.directStoreAccess().indexes().getOnlineAccessor( IndexAccessor accessor = fixture.directStoreAccess().indexes().getOnlineAccessor(
indexRule.getId(), IndexConfiguration.of( indexRule ), samplingConfig ); indexRule.getId(), new IndexDescriptor( indexRule.getLabel(), indexRule.getPropertyKey() ),
IndexConfiguration.of( indexRule ), samplingConfig );
IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE ); IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE );
updater.remove( asPrimitiveLongSet( indexedNodes ) ); updater.remove( asPrimitiveLongSet( indexedNodes ) );
updater.close(); updater.close();
Expand All @@ -615,9 +616,9 @@ public void shouldReportNodesWithDuplicatePropertyValueInUniqueIndex() throws Ex
IndexSamplingConfig samplingConfig = new IndexSamplingConfig( Config.empty() ); IndexSamplingConfig samplingConfig = new IndexSamplingConfig( Config.empty() );
for ( IndexRule indexRule : loadAllIndexRules( fixture.directStoreAccess().nativeStores().getSchemaStore() ) ) for ( IndexRule indexRule : loadAllIndexRules( fixture.directStoreAccess().nativeStores().getSchemaStore() ) )
{ {
IndexAccessor accessor = fixture.directStoreAccess() IndexAccessor accessor = fixture.directStoreAccess().indexes().getOnlineAccessor( indexRule.getId(),
.indexes() new IndexDescriptor( indexRule.getLabel(), indexRule.getPropertyKey() ),
.getOnlineAccessor( indexRule.getId(), indexConfig, samplingConfig ); indexConfig, samplingConfig );
IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE ); IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE );
updater.process( NodePropertyUpdate.add( 42, 0, "value", new long[]{3} ) ); updater.process( NodePropertyUpdate.add( 42, 0, "value", new long[]{3} ) );
updater.close(); updater.close();
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.junit.Test; import org.junit.Test;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -197,7 +196,7 @@ private NodeRecord nodeWithLabels( long... labelIds )
return engine; return engine;
} }


private static class IndexAccessorStub implements IndexAccessor private static class IndexAccessorStub extends IndexAccessor.Adapter
{ {
private final Map<Object, long[]> entries; private final Map<Object, long[]> entries;


Expand Down Expand Up @@ -291,19 +290,15 @@ public IndexSampler createSampler()
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override
public void close() public void close()
{ {
} }
}; };
} }


@Override @Override
public void close() throws IOException public void drop()
{
}

@Override
public void drop() throws IOException
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
Expand All @@ -315,13 +310,13 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
} }


@Override @Override
public void force() throws IOException public void force()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override @Override
public void flush() throws IOException public void flush()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
Expand All @@ -333,7 +328,7 @@ public BoundedIterable<Long> newAllEntriesReader()
} }


@Override @Override
public ResourceIterator<File> snapshotFiles() throws IOException public ResourceIterator<File> snapshotFiles()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.neo4j.kernel.api.KernelAPI; import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.TokenNameLookup; import org.neo4j.kernel.api.TokenNameLookup;
import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.legacyindex.AutoIndexing; import org.neo4j.kernel.api.legacyindex.AutoIndexing;
Expand Down Expand Up @@ -464,12 +465,16 @@ public void start() throws IOException
transactionLogModule.logFiles(), startupStatistics, transactionLogModule.logFiles(), startupStatistics,
storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore() ); storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore() );


// At the time of writing this comes from the storage engine (IndexStoreView)
PropertyAccessor propertyAccessor = dependencies.resolveDependency( PropertyAccessor.class );

final KernelModule kernelModule = buildKernel( final KernelModule kernelModule = buildKernel(
transactionLogModule.transactionAppender(), transactionLogModule.transactionAppender(),
dependencies.resolveDependency( IndexingService.class ), dependencies.resolveDependency( IndexingService.class ),
storageEngine.storeReadLayer(), storageEngine.storeReadLayer(),
updateableSchemaState, dependencies.resolveDependency( LabelScanStore.class ), updateableSchemaState, dependencies.resolveDependency( LabelScanStore.class ),
storageEngine, indexConfigStore, transactionIdStore, clock ); storageEngine, indexConfigStore, transactionIdStore, clock,
propertyAccessor );


// Do these assignments last so that we can ensure no cyclical dependencies exist // Do these assignments last so that we can ensure no cyclical dependencies exist
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
Expand Down Expand Up @@ -762,7 +767,8 @@ private KernelModule buildKernel( TransactionAppender appender,
UpdateableSchemaState updateableSchemaState, LabelScanStore labelScanStore, UpdateableSchemaState updateableSchemaState, LabelScanStore labelScanStore,
StorageEngine storageEngine, StorageEngine storageEngine,
IndexConfigStore indexConfigStore, IndexConfigStore indexConfigStore,
TransactionIdStore transactionIdStore, Clock clock ) throws KernelException, IOException TransactionIdStore transactionIdStore, Clock clock,
PropertyAccessor propertyAccessor ) throws KernelException, IOException
{ {
TransactionCommitProcess transactionCommitProcess = commitProcessFactory.create( appender, storageEngine, TransactionCommitProcess transactionCommitProcess = commitProcessFactory.create( appender, storageEngine,
config ); config );
Expand All @@ -774,7 +780,7 @@ private KernelModule buildKernel( TransactionAppender appender,
Supplier<KernelAPI> kernelProvider = () -> kernelModule.kernelAPI(); Supplier<KernelAPI> kernelProvider = () -> kernelModule.kernelAPI();


ConstraintIndexCreator constraintIndexCreator = ConstraintIndexCreator constraintIndexCreator =
new ConstraintIndexCreator( kernelProvider, indexingService ); new ConstraintIndexCreator( kernelProvider, indexingService, propertyAccessor );


LegacyIndexStore legacyIndexStore = new LegacyIndexStore( config, LegacyIndexStore legacyIndexStore = new LegacyIndexStore( config,
indexConfigStore, kernelProvider, legacyIndexProviderLookup ); indexConfigStore, kernelProvider, legacyIndexProviderLookup );
Expand Down
Expand Up @@ -26,6 +26,7 @@


import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.BoundedIterable; import org.neo4j.helpers.collection.BoundedIterable;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode; import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.updater.SwallowingIndexUpdater; import org.neo4j.kernel.impl.api.index.updater.SwallowingIndexUpdater;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
Expand Down Expand Up @@ -90,6 +91,17 @@ public interface IndexAccessor extends Closeable
*/ */
ResourceIterator<File> snapshotFiles() throws IOException; ResourceIterator<File> snapshotFiles() throws IOException;


/**
* Verifies that each value in this index is unique.
* Index is guaranteed to not change while this call executes.
*
* @param propertyAccessor {@link PropertyAccessor} for accessing properties from database storage
* in the event of conflicting values.
* @throws IndexEntryConflictException for first detected uniqueness conflict, if any.
* @throws IOException on error reading from source files.
*/
void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) throws IndexEntryConflictException, IOException;

class Adapter implements IndexAccessor class Adapter implements IndexAccessor
{ {
@Override @Override
Expand Down Expand Up @@ -153,6 +165,12 @@ public ResourceIterator<File> snapshotFiles()
{ {
return emptyIterator(); return emptyIterator();
} }

@Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException
{
}
} }


class Delegator implements IndexAccessor class Delegator implements IndexAccessor
Expand Down Expand Up @@ -217,5 +235,12 @@ public String toString()
{ {
return delegate.toString(); return delegate.toString();
} }

@Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException
{
delegate.verifyDeferredConstraints( propertyAccessor );
}
} }
} }
Expand Up @@ -57,10 +57,16 @@ void add( Collection<NodePropertyUpdate> updates )
throws IndexEntryConflictException, IOException; throws IndexEntryConflictException, IOException;


/** /**
* Verify constraints for all entries added so far. * Verifies that each value in this index is unique.
* This method is called after the index has been fully populated and is guaranteed to not have
* concurrent changes while executing.
*
* @param propertyAccessor {@link PropertyAccessor} for accessing properties from database storage
* in the event of conflicting values.
* @throws IndexEntryConflictException for first detected uniqueness conflict, if any.
* @throws IOException on error reading from source files.
*/ */
@Deprecated // TODO we want to remove this in 2.1, and properly prevent value collisions. void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) throws IndexEntryConflictException, IOException;
void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException;


/** /**
* Return an updater for applying a set of changes to this index, generally this will be a set of changes from a * Return an updater for applying a set of changes to this index, generally this will be a set of changes from a
Expand Down Expand Up @@ -136,11 +142,6 @@ public void add( Collection<NodePropertyUpdate> updates ) throws IndexEntryConfl
{ {
} }


@Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{
}

@Override @Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
{ {
Expand Down Expand Up @@ -172,5 +173,11 @@ public IndexSample sampleResult()
{ {
return new IndexSample(); return new IndexSample();
} }

@Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException
{
}
} }
} }
Expand Up @@ -87,7 +87,7 @@
* <h3>Online operation</h3> * <h3>Online operation</h3>
* *
* Once the index is online, the database will move to using the * Once the index is online, the database will move to using the
* {@link #getOnlineAccessor(long, IndexConfiguration, IndexSamplingConfig) online accessor} to * {@link #getOnlineAccessor(long, IndexDescriptor, IndexConfiguration, IndexSamplingConfig) online accessor} to
* write to the index. * write to the index.
*/ */
public abstract class SchemaIndexProvider extends LifecycleAdapter implements Comparable<SchemaIndexProvider> public abstract class SchemaIndexProvider extends LifecycleAdapter implements Comparable<SchemaIndexProvider>
Expand All @@ -99,8 +99,8 @@ public abstract class SchemaIndexProvider extends LifecycleAdapter implements Co
private final IndexPopulator singlePopulator = new IndexPopulator.Adapter(); private final IndexPopulator singlePopulator = new IndexPopulator.Adapter();


@Override @Override
public IndexAccessor getOnlineAccessor( long indexId, IndexConfiguration config, public IndexAccessor getOnlineAccessor( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig ) IndexConfiguration config, IndexSamplingConfig samplingConfig )
{ {
return singleWriter; return singleWriter;
} }
Expand Down Expand Up @@ -151,8 +151,8 @@ public abstract IndexPopulator getPopulator( long indexId, IndexDescriptor descr
/** /**
* Used for updating an index once initial population has completed. * Used for updating an index once initial population has completed.
*/ */
public abstract IndexAccessor getOnlineAccessor( long indexId, IndexConfiguration config, public abstract IndexAccessor getOnlineAccessor( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig ) throws IOException; IndexConfiguration config, IndexSamplingConfig samplingConfig ) throws IOException;


/** /**
* Returns a failure previously gotten from {@link IndexPopulator#markAsFailed(String)} * Returns a failure previously gotten from {@link IndexPopulator#markAsFailed(String)}
Expand Down
Expand Up @@ -25,13 +25,15 @@


import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintVerificationFailedKernelException; import org.neo4j.kernel.api.exceptions.schema.ConstraintVerificationFailedKernelException;
import org.neo4j.kernel.api.index.IndexConfiguration; import org.neo4j.kernel.api.index.IndexConfiguration;
import org.neo4j.kernel.api.index.IndexDescriptor; import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.index.InternalIndexState;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.PopulationProgress;
Expand Down Expand Up @@ -147,4 +149,10 @@ public IndexConfiguration config()
{ {
return getDelegate().config(); return getDelegate().config();
} }

@Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{
getDelegate().verifyDeferredConstraints( accessor );
}
} }
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.kernel.api.index.IndexDescriptor; import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.index.InternalIndexState;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.impl.api.index.updater.DelegatingIndexUpdater; import org.neo4j.kernel.impl.api.index.updater.DelegatingIndexUpdater;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
Expand Down Expand Up @@ -442,6 +443,20 @@ private void assertOpen() throws IndexProxyAlreadyClosedKernelException
} }
} }


@Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{
lock.readLock().lock();
try
{
delegate.verifyDeferredConstraints( accessor );
}
finally
{
lock.readLock().unlock();
}
}

private class LockingIndexUpdater extends DelegatingIndexUpdater private class LockingIndexUpdater extends DelegatingIndexUpdater
{ {
private LockingIndexUpdater( IndexUpdater delegate ) private LockingIndexUpdater( IndexUpdater delegate )
Expand Down
Expand Up @@ -109,7 +109,7 @@ public void run()
multiPopulator.replaceIndexCounts( 0, 0, 0 ); multiPopulator.replaceIndexCounts( 0, 0, 0 );


indexAllNodes(); indexAllNodes();
verifyDeferredConstraints(); monitor.indexPopulationScanComplete();
if ( cancelled ) if ( cancelled )
{ {
multiPopulator.cancel(); multiPopulator.cancel();
Expand Down Expand Up @@ -149,12 +149,6 @@ public PopulationProgress getPopulationProgress()
return storeScan.getProgress(); return storeScan.getProgress();
} }


private void verifyDeferredConstraints()
{
monitor.verifyDeferredConstraints();
multiPopulator.verifyAllDeferredConstraints( storeView );
}

public Future<Void> cancel() public Future<Void> cancel()
{ {
// Stop the population // Stop the population
Expand Down
Expand Up @@ -25,6 +25,7 @@


import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintVerificationFailedKernelException; import org.neo4j.kernel.api.exceptions.schema.ConstraintVerificationFailedKernelException;
Expand All @@ -34,6 +35,7 @@
import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.index.InternalIndexState;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.PopulationProgress;
Expand Down Expand Up @@ -109,4 +111,9 @@ public interface IndexProxy
ResourceIterator<File> snapshotFiles() throws IOException; ResourceIterator<File> snapshotFiles() throws IOException;


IndexConfiguration config(); IndexConfiguration config();

default void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{
throw new IllegalStateException( this.toString() );
}
} }

0 comments on commit 4c1fd59

Please sign in to comment.