Skip to content

Commit

Permalink
Make multi-token schema locking coordinate with signle-token locking.
Browse files Browse the repository at this point in the history
Also introduce coordination between "any entity token" schema locking, and the creation of new token ids.
  • Loading branch information
chrisvest committed Jul 19, 2018
1 parent 32e59a5 commit fc7b36b
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 46 deletions.
Expand Up @@ -55,4 +55,16 @@ public interface Locks
void releaseSharedExplicitIndexLock( long... ids ); void releaseSharedExplicitIndexLock( long... ids );


void releaseSharedLabelLock( long... ids ); void releaseSharedLabelLock( long... ids );

/**
* Shared token locks are held when new tokens are created.
*/
void acquireSharedTokenLock();

/**
* Exclusive token locks are used to hold back the creation of new tokens.
*
* This is useful for when a lock on "all tokens" are needed.
*/
void acquireExclusiveTokenLock();
} }
Expand Up @@ -114,6 +114,18 @@ static boolean isAnyEntityTokenSchema( SchemaDescriptor schema )
return Arrays.equals(schema.getEntityTokenIds(), ANY_ENTITY_TOKEN ); return Arrays.equals(schema.getEntityTokenIds(), ANY_ENTITY_TOKEN );
} }


static long[] schemaTokenLockingIds( SchemaDescriptor schema )
{
// TODO make getEntityTokenIds produce a long array directly, and avoid this extra copying.
int[] entityTokenIds = schema.getEntityTokenIds();
long[] lockingIds = new long[entityTokenIds.length];
for ( int i = 0; i < lockingIds.length; i++ )
{
lockingIds[i] = entityTokenIds[i];
}
return lockingIds;
}

/** /**
* Returns true if any of the given entity token ids are part of this schema unit. * Returns true if any of the given entity token ids are part of this schema unit.
* @param entityTokenIds entity token ids to check against. * @param entityTokenIds entity token ids to check against.
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;


import org.neo4j.hashing.HashFunction;
import org.neo4j.internal.kernel.api.TokenNameLookup; import org.neo4j.internal.kernel.api.TokenNameLookup;
import org.neo4j.internal.kernel.api.schema.SchemaComputer; import org.neo4j.internal.kernel.api.schema.SchemaComputer;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor; import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
Expand All @@ -39,32 +38,15 @@


public class MultiTokenSchemaDescriptor implements SchemaDescriptor public class MultiTokenSchemaDescriptor implements SchemaDescriptor
{ {
private static final HashFunction HASH_FUNCTION = HashFunction.incrementalXXH64();
private final int[] entityTokens; private final int[] entityTokens;
private final EntityType entityType; private final EntityType entityType;
private final int[] propertyIds; private final int[] propertyIds;
private final int key;


MultiTokenSchemaDescriptor( int[] entityTokens, EntityType entityType, int[] propertyIds ) MultiTokenSchemaDescriptor( int[] entityTokens, EntityType entityType, int[] propertyIds )
{ {
this.entityTokens = entityTokens; this.entityTokens = entityTokens;
Arrays.sort( this.entityTokens );
this.entityType = entityType; this.entityType = entityType;
this.propertyIds = propertyIds; this.propertyIds = propertyIds;
Arrays.sort( this.propertyIds );

long hash = HASH_FUNCTION.initialise( 0x0123456789abcdefL + entityType.ordinal() );
for ( int entityToken : this.entityTokens )
{
hash = HASH_FUNCTION.update( hash, entityToken );
}
for ( int propertyId : this.propertyIds )
{
hash = HASH_FUNCTION.update( hash, propertyId );
}

hash = HASH_FUNCTION.finalise( hash );
key = HASH_FUNCTION.toInt( hash );
} }


@Override @Override
Expand Down Expand Up @@ -126,13 +108,13 @@ public int[] getEntityTokenIds()
@Override @Override
public int keyId() public int keyId()
{ {
return key; throw new UnsupportedOperationException( this + " does not have a single keyId." );
} }


@Override @Override
public ResourceType keyType() public ResourceType keyType()
{ {
return ResourceTypes.SCHEMA; return entityType == EntityType.NODE ? ResourceTypes.LABEL : ResourceTypes.RELATIONSHIP_TYPE;
} }


@Override @Override
Expand Down
Expand Up @@ -310,7 +310,7 @@ else if ( rule instanceof StoreIndexDescriptor )
for ( int entityTokenId : schemaDescriptor.getEntityTokenIds() ) for ( int entityTokenId : schemaDescriptor.getEntityTokenIds() )
{ {
Set<CapableIndexDescriptor> forLabel = Set<CapableIndexDescriptor> forLabel =
indexDescriptorsByLabel.getIfAbsent( entityTokenId, HashSet::new ); indexDescriptorsByLabel.getIfAbsentPut( entityTokenId, HashSet::new );
forLabel.add( index ); forLabel.add( index );
} }


Expand Down
Expand Up @@ -48,10 +48,11 @@ abstract class IsolatedTransactionTokenCreator implements TokenCreator
public synchronized int createToken( String name ) throws KernelException public synchronized int createToken( String name ) throws KernelException
{ {
Kernel kernel = kernelSupplier.get(); Kernel kernel = kernelSupplier.get();
try ( Transaction transaction = kernel.beginTransaction( Type.implicit, LoginContext.AUTH_DISABLED ) ) try ( Transaction tx = kernel.beginTransaction( Type.implicit, LoginContext.AUTH_DISABLED ) )
{ {
int id = createKey( transaction, name ); tx.locks().acquireSharedTokenLock();
transaction.success(); int id = createKey( tx, name );
tx.success();
return id; return id;
} }
} }
Expand All @@ -62,6 +63,7 @@ public synchronized void createTokens( String[] names, int[] ids, IntPredicate f
Kernel kernel = kernelSupplier.get(); Kernel kernel = kernelSupplier.get();
try ( Transaction tx = kernel.beginTransaction( Type.implicit, LoginContext.AUTH_DISABLED ) ) try ( Transaction tx = kernel.beginTransaction( Type.implicit, LoginContext.AUTH_DISABLED ) )
{ {
tx.locks().acquireSharedTokenLock();
for ( int i = 0; i < ids.length; i++ ) for ( int i = 0; i < ids.length; i++ )
{ {
if ( filter.test( i ) ) if ( filter.test( i ) )
Expand Down
Expand Up @@ -42,7 +42,7 @@ public enum ResourceTypes implements ResourceType
EXPLICIT_INDEX( 5, LockWaitStrategies.INCREMENTAL_BACKOFF ), EXPLICIT_INDEX( 5, LockWaitStrategies.INCREMENTAL_BACKOFF ),
LABEL( 6, LockWaitStrategies.INCREMENTAL_BACKOFF ), LABEL( 6, LockWaitStrategies.INCREMENTAL_BACKOFF ),
RELATIONSHIP_TYPE( 7, LockWaitStrategies.INCREMENTAL_BACKOFF ), RELATIONSHIP_TYPE( 7, LockWaitStrategies.INCREMENTAL_BACKOFF ),
SCHEMA( 8, LockWaitStrategies.INCREMENTAL_BACKOFF ); TOKEN_CREATE( 8, LockWaitStrategies.INCREMENTAL_BACKOFF );


private static final boolean useStrongHashing = private static final boolean useStrongHashing =
FeatureToggles.flag( ResourceTypes.class, "useStrongHashing", false ); FeatureToggles.flag( ResourceTypes.class, "useStrongHashing", false );
Expand Down
Expand Up @@ -410,7 +410,8 @@ public Iterator<IndexReference> indexesGetForLabel( int labelId )
{ {
iterator = ktx.txState().indexDiffSetsByLabel( labelId ).apply( iterator ); iterator = ktx.txState().indexDiffSetsByLabel( labelId ).apply( iterator );
} }
return (Iterator)iterator; //noinspection unchecked
return (Iterator) iterator;
} }


@Override @Override
Expand Down Expand Up @@ -440,7 +441,7 @@ public IndexReference indexGetForName( String name )
{ {
return IndexReference.NO_INDEX; return IndexReference.NO_INDEX;
} }
sharedOptimisticLock( index.schema().keyType(), index.schema().keyId() ); acquireSharedSchemaLock( index.schema() );
return index; return index;
} }


Expand All @@ -457,7 +458,7 @@ public Iterator<IndexReference> indexesGetAll()


return Iterators.map( indexDescriptor -> return Iterators.map( indexDescriptor ->
{ {
sharedOptimisticLock( indexDescriptor.schema().keyType(), indexDescriptor.schema().keyId() ); acquireSharedSchemaLock( indexDescriptor.schema() );
return indexDescriptor; return indexDescriptor;
}, iterator ); }, iterator );
} }
Expand All @@ -466,7 +467,7 @@ public Iterator<IndexReference> indexesGetAll()
public InternalIndexState indexGetState( IndexReference index ) throws IndexNotFoundKernelException public InternalIndexState indexGetState( IndexReference index ) throws IndexNotFoundKernelException
{ {
assertValidIndex( index ); assertValidIndex( index );
sharedOptimisticLock( index.schema().keyType(), index.schema().keyId() ); acquireSharedSchemaLock( index.schema() );
ktx.assertOpen(); ktx.assertOpen();
return indexGetState( (IndexDescriptor) index ); return indexGetState( (IndexDescriptor) index );
} }
Expand All @@ -476,7 +477,7 @@ public PopulationProgress indexGetPopulationProgress( IndexReference index )
throws IndexNotFoundKernelException throws IndexNotFoundKernelException
{ {
assertValidIndex( index ); assertValidIndex( index );
sharedOptimisticLock( index.schema().keyType(), index.schema().keyId() ); acquireSharedSchemaLock( index.schema() );
ktx.assertOpen(); ktx.assertOpen();


if ( ktx.hasTxStateWithChanges() ) if ( ktx.hasTxStateWithChanges() )
Expand All @@ -493,7 +494,7 @@ public PopulationProgress indexGetPopulationProgress( IndexReference index )
@Override @Override
public Long indexGetOwningUniquenessConstraintId( IndexReference index ) public Long indexGetOwningUniquenessConstraintId( IndexReference index )
{ {
sharedOptimisticLock( index.schema().keyType(), index.schema().keyId() ); acquireSharedSchemaLock( index.schema() );
ktx.assertOpen(); ktx.assertOpen();
if ( index instanceof StoreIndexDescriptor ) if ( index instanceof StoreIndexDescriptor )
{ {
Expand All @@ -508,7 +509,7 @@ public Long indexGetOwningUniquenessConstraintId( IndexReference index )
@Override @Override
public long indexGetCommittedId( IndexReference index ) throws SchemaRuleNotFoundException public long indexGetCommittedId( IndexReference index ) throws SchemaRuleNotFoundException
{ {
sharedOptimisticLock( index.schema().keyType(), index.schema().keyId() ); acquireSharedSchemaLock( index.schema() );
ktx.assertOpen(); ktx.assertOpen();
if ( index instanceof StoreIndexDescriptor ) if ( index instanceof StoreIndexDescriptor )
{ {
Expand All @@ -532,7 +533,7 @@ public double indexUniqueValuesSelectivity( IndexReference index ) throws IndexN
{ {
assertValidIndex( index ); assertValidIndex( index );
SchemaDescriptor schema = index.schema(); SchemaDescriptor schema = index.schema();
sharedOptimisticLock( schema.keyType(), schema.keyId() ); acquireSharedSchemaLock( schema );
ktx.assertOpen(); ktx.assertOpen();
return storageReader.indexUniqueValuesPercentage( schema ); return storageReader.indexUniqueValuesPercentage( schema );
} }
Expand All @@ -542,7 +543,7 @@ public long indexSize( IndexReference index ) throws IndexNotFoundKernelExceptio
{ {
assertValidIndex( index ); assertValidIndex( index );
SchemaDescriptor schema = index.schema(); SchemaDescriptor schema = index.schema();
sharedOptimisticLock( schema.keyType(), schema.keyId() ); acquireSharedSchemaLock( schema );
ktx.assertOpen(); ktx.assertOpen();
return storageReader.indexSize( schema ); return storageReader.indexSize( schema );
} }
Expand Down Expand Up @@ -631,7 +632,7 @@ IndexDescriptor indexGetForSchema( SchemaDescriptor descriptor )
{ {
indexes = filter( indexes = filter(
SchemaDescriptor.equalTo( descriptor ), SchemaDescriptor.equalTo( descriptor ),
ktx.txState().indexDiffSetsByLabel( descriptor.keyId() ).apply( indexes ) ); ktx.txState().indexDiffSetsBySchema( descriptor ).apply( indexes ) );
} }
return singleOrNull( indexes ); return singleOrNull( indexes );
} }
Expand All @@ -654,7 +655,7 @@ private boolean checkIndexState( IndexDescriptor index, ReadableDiffSets<IndexDe
@Override @Override
public Iterator<ConstraintDescriptor> constraintsGetForSchema( SchemaDescriptor descriptor ) public Iterator<ConstraintDescriptor> constraintsGetForSchema( SchemaDescriptor descriptor )
{ {
sharedOptimisticLock( descriptor.keyType(), descriptor.keyId() ); acquireSharedSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();
Iterator<ConstraintDescriptor> constraints = storageReader.constraintsGetForSchema( descriptor ); Iterator<ConstraintDescriptor> constraints = storageReader.constraintsGetForSchema( descriptor );
if ( ktx.hasTxStateWithChanges() ) if ( ktx.hasTxStateWithChanges() )
Expand All @@ -668,7 +669,7 @@ public Iterator<ConstraintDescriptor> constraintsGetForSchema( SchemaDescriptor
public boolean constraintExists( ConstraintDescriptor descriptor ) public boolean constraintExists( ConstraintDescriptor descriptor )
{ {
SchemaDescriptor schema = descriptor.schema(); SchemaDescriptor schema = descriptor.schema();
sharedOptimisticLock( schema.keyType(), schema.keyId() ); acquireSharedSchemaLock( schema );
ktx.assertOpen(); ktx.assertOpen();
boolean inStore = storageReader.constraintExists( descriptor ); boolean inStore = storageReader.constraintExists( descriptor );
if ( ktx.hasTxStateWithChanges() ) if ( ktx.hasTxStateWithChanges() )
Expand Down
Expand Up @@ -72,7 +72,7 @@ static <SUPPLIER extends SchemaDescriptorSupplier, EXCEPTION extends Exception>
{ {
SUPPLIER schemaSupplier = schemaSuppliers.next(); SUPPLIER schemaSupplier = schemaSuppliers.next();
SchemaDescriptor schema = schemaSupplier.schema(); SchemaDescriptor schema = schemaSupplier.schema();
if ( node.hasLabel( schema.keyId() ) ) if ( schema.isAffected( node.labels().all() ) )
{ {
if ( nodePropertyIds == null ) if ( nodePropertyIds == null )
{ {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.internal.kernel.api.IndexQuery; import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.IndexReference; import org.neo4j.internal.kernel.api.IndexReference;
import org.neo4j.internal.kernel.api.Locks; import org.neo4j.internal.kernel.api.Locks;
import org.neo4j.internal.kernel.api.NamedToken;
import org.neo4j.internal.kernel.api.NodeLabelIndexCursor; import org.neo4j.internal.kernel.api.NodeLabelIndexCursor;
import org.neo4j.internal.kernel.api.Procedures; import org.neo4j.internal.kernel.api.Procedures;
import org.neo4j.internal.kernel.api.Read; import org.neo4j.internal.kernel.api.Read;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.index.IndexEntityType; import org.neo4j.kernel.impl.index.IndexEntityType;
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.ResourceTypes; import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.EntityType;
import org.neo4j.storageengine.api.StorageReader; import org.neo4j.storageengine.api.StorageReader;
Expand All @@ -96,6 +98,8 @@
import static java.lang.Math.min; import static java.lang.Math.min;
import static org.neo4j.internal.kernel.api.exceptions.schema.ConstraintValidationException.Phase.VALIDATION; import static org.neo4j.internal.kernel.api.exceptions.schema.ConstraintValidationException.Phase.VALIDATION;
import static org.neo4j.internal.kernel.api.exceptions.schema.SchemaKernelException.OperationContext.CONSTRAINT_CREATION; import static org.neo4j.internal.kernel.api.exceptions.schema.SchemaKernelException.OperationContext.CONSTRAINT_CREATION;

import static org.neo4j.internal.kernel.api.schema.SchemaDescriptor.schemaTokenLockingIds;
import static org.neo4j.internal.kernel.api.schema.SchemaDescriptorPredicates.hasProperty; import static org.neo4j.internal.kernel.api.schema.SchemaDescriptorPredicates.hasProperty;
import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_NODE; import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_NODE;
import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_PROPERTY_KEY; import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_PROPERTY_KEY;
Expand Down Expand Up @@ -886,7 +890,7 @@ public IndexReference indexCreate( SchemaDescriptor descriptor,
Optional<String> provider, Optional<String> provider,
Optional<String> name ) throws SchemaKernelException Optional<String> name ) throws SchemaKernelException
{ {
exclusiveSchemaLock( descriptor.keyType(), descriptor.keyId() ); exclusiveSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();
assertValidDescriptor( descriptor, SchemaKernelException.OperationContext.INDEX_CREATION ); assertValidDescriptor( descriptor, SchemaKernelException.OperationContext.INDEX_CREATION );
assertIndexDoesNotExist( SchemaKernelException.OperationContext.INDEX_CREATION, descriptor, name ); assertIndexDoesNotExist( SchemaKernelException.OperationContext.INDEX_CREATION, descriptor, name );
Expand Down Expand Up @@ -916,7 +920,7 @@ public void indexDrop( IndexReference indexReference ) throws SchemaKernelExcept
IndexDescriptor index = (IndexDescriptor) indexReference; IndexDescriptor index = (IndexDescriptor) indexReference;
SchemaDescriptor schema = index.schema(); SchemaDescriptor schema = index.schema();


exclusiveSchemaLock( schema.keyType(), schema.keyId() ); exclusiveSchemaLock( schema );
ktx.assertOpen(); ktx.assertOpen();
try try
{ {
Expand Down Expand Up @@ -954,7 +958,7 @@ public ConstraintDescriptor uniquePropertyConstraintCreate( SchemaDescriptor des
throws SchemaKernelException throws SchemaKernelException
{ {
//Lock //Lock
exclusiveSchemaLock( descriptor.keyType(), descriptor.keyId() ); exclusiveSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();


//Check data integrity //Check data integrity
Expand All @@ -979,7 +983,7 @@ public ConstraintDescriptor nodeKeyConstraintCreate( LabelSchemaDescriptor descr
public ConstraintDescriptor nodeKeyConstraintCreate( LabelSchemaDescriptor descriptor, Optional<String> provider ) throws SchemaKernelException public ConstraintDescriptor nodeKeyConstraintCreate( LabelSchemaDescriptor descriptor, Optional<String> provider ) throws SchemaKernelException
{ {
//Lock //Lock
exclusiveSchemaLock( ResourceTypes.LABEL, descriptor.getLabelId() ); exclusiveSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();


//Check data integrity //Check data integrity
Expand All @@ -1006,7 +1010,7 @@ public ConstraintDescriptor nodePropertyExistenceConstraintCreate( LabelSchemaDe
throws SchemaKernelException throws SchemaKernelException
{ {
//Lock //Lock
exclusiveSchemaLock( ResourceTypes.LABEL, descriptor.getLabelId() ); exclusiveSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();


//verify data integrity //verify data integrity
Expand All @@ -1032,7 +1036,7 @@ public ConstraintDescriptor relationshipPropertyExistenceConstraintCreate( Relat
throws SchemaKernelException throws SchemaKernelException
{ {
//Lock //Lock
exclusiveSchemaLock( ResourceTypes.RELATIONSHIP_TYPE, descriptor.getRelTypeId() ); exclusiveSchemaLock( descriptor );
ktx.assertOpen(); ktx.assertOpen();


//verify data integrity //verify data integrity
Expand Down Expand Up @@ -1148,9 +1152,46 @@ private void sharedSchemaLock( ResourceType type, int tokenId )
ktx.statementLocks().optimistic().acquireShared( ktx.lockTracer(), type, tokenId ); ktx.statementLocks().optimistic().acquireShared( ktx.lockTracer(), type, tokenId );
} }


private void exclusiveSchemaLock( ResourceType type, int tokenId ) private void exclusiveSchemaLock( SchemaDescriptor schema )
{
long[] lockingIds = schemaTokenLockingIds( schema );
ktx.statementLocks().optimistic().acquireExclusive( ktx.lockTracer(), schema.keyType(), lockingIds );

if ( SchemaDescriptor.isAnyEntityTokenSchema( schema ) )
{
exclusiveAnyEntityTokenSchema( schema );
}
}

private void exclusiveAnyEntityTokenSchema( SchemaDescriptor schema )
{ {
ktx.statementLocks().optimistic().acquireExclusive( ktx.lockTracer(), type, tokenId ); // After we get the exclusive token lock, no new tokens can be created. This allows us to grab a lock on all
// the existing tokens, and be sure that we won't miss any updates.
allStoreHolder.acquireExclusiveTokenLock();
ResourceType resourceType;
long[] tokens;
Iterator<NamedToken> itr;
if ( schema.entityType() == EntityType.NODE )
{
resourceType = ResourceTypes.LABEL;
tokens = new long[token.labelCount()];
itr = token.labelsGetAllTokens();
}
else
{
resourceType = ResourceTypes.RELATIONSHIP_TYPE;
tokens = new long[token.relationshipTypeCount()];
itr = token.relationshipTypesGetAllTokens();
}

int i = 0;
while ( itr.hasNext() )
{
tokens[i] = itr.next().id();
}

LockTracer lockTracer = ktx.lockTracer();
ktx.statementLocks().optimistic().acquireExclusive( lockTracer, resourceType, tokens );
} }


private void lockRelationshipNodes( long startNodeId, long endNodeId ) private void lockRelationshipNodes( long startNodeId, long endNodeId )
Expand Down

0 comments on commit fc7b36b

Please sign in to comment.