Skip to content

Commit

Permalink
Acquired shared locks for labels and relationship types as part of
Browse files Browse the repository at this point in the history
slave commit of transaction that performs data modifications to cover
cases that previously were covered by shared schema lock acquisition on master.

Shared locks will be propagated to master only in case if transaction performs
writes. For cases when transaction is readonly we do not do anything with
shared locks acquired on slaves exactly as we did before.
  • Loading branch information
MishaDemianenko committed Jul 17, 2017
1 parent 1d31e5b commit 380dd10
Show file tree
Hide file tree
Showing 22 changed files with 747 additions and 28 deletions.
Expand Up @@ -543,7 +543,7 @@ private long commit() throws TransactionFailureException
if ( hasChanges() ) if ( hasChanges() )
{ {
// grab all optimistic locks now, locks can't be deferred any further // grab all optimistic locks now, locks can't be deferred any further
statementLocks.prepareForCommit(); statementLocks.prepareForCommit( currentStatement.lockTracer() );
// use pessimistic locks for the rest of the commit process, locks can't be deferred any further // use pessimistic locks for the rest of the commit process, locks can't be deferred any further
Locks.Client commitLocks = statementLocks.pessimistic(); Locks.Client commitLocks = statementLocks.pessimistic();


Expand Down
Expand Up @@ -47,7 +47,7 @@ public Locks.Client optimistic()
} }


@Override @Override
public void prepareForCommit() public void prepareForCommit( LockTracer lockTracer )
{ {
// Locks where grabbed eagerly by client so no need to prepare // Locks where grabbed eagerly by client so no need to prepare
} }
Expand Down
Expand Up @@ -47,8 +47,9 @@ public interface StatementLocks extends AutoCloseable
/** /**
* Prepare the underlying {@link Locks.Client client}(s) for commit. This will grab all locks that have * Prepare the underlying {@link Locks.Client client}(s) for commit. This will grab all locks that have
* previously been taken {@link #optimistic() optimistically}. * previously been taken {@link #optimistic() optimistically}.
* @param lockTracer lock tracer
*/ */
void prepareForCommit(); void prepareForCommit( LockTracer lockTracer );


/** /**
* Stop the underlying {@link Locks.Client client}(s). * Stop the underlying {@link Locks.Client client}(s).
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds )
} }
} }


void acquireDeferredLocks() void acquireDeferredLocks( LockTracer lockTracer )
{ {
assertNotStopped(); assertNotStopped();


Expand All @@ -123,7 +123,7 @@ void acquireDeferredLocks()
currentExclusive != lockUnit.isExclusive()) ) currentExclusive != lockUnit.isExclusive()) )
{ {
// New type, i.e. flush the current array down to delegate in one call // New type, i.e. flush the current array down to delegate in one call
flushLocks( current, cursor, currentType, currentExclusive ); flushLocks( lockTracer, current, cursor, currentType, currentExclusive );


cursor = 0; cursor = 0;
currentType = lockUnit.resourceType(); currentType = lockUnit.resourceType();
Expand All @@ -137,21 +137,22 @@ void acquireDeferredLocks()
} }
current[cursor++] = lockUnit.resourceId(); current[cursor++] = lockUnit.resourceId();
} }
flushLocks( current, cursor, currentType, currentExclusive ); flushLocks( lockTracer, current, cursor, currentType, currentExclusive );
} }


private void flushLocks( long[] current, int cursor, ResourceType currentType, boolean exclusive ) private void flushLocks( LockTracer lockTracer, long[] current, int cursor, ResourceType currentType, boolean
exclusive )
{ {
if ( cursor > 0 ) if ( cursor > 0 )
{ {
long[] resourceIds = Arrays.copyOf( current, cursor ); long[] resourceIds = Arrays.copyOf( current, cursor );
if ( exclusive ) if ( exclusive )
{ {
clientDelegate.acquireExclusive( LockTracer.NONE, currentType, resourceIds ); clientDelegate.acquireExclusive( lockTracer, currentType, resourceIds );
} }
else else
{ {
clientDelegate.acquireShared( LockTracer.NONE, currentType, resourceIds ); clientDelegate.acquireShared( lockTracer, currentType, resourceIds );
} }
} }
} }
Expand Down
Expand Up @@ -49,9 +49,9 @@ public Locks.Client optimistic()
} }


@Override @Override
public void prepareForCommit() public void prepareForCommit( LockTracer lockTracer )
{ {
implicit.acquireDeferredLocks(); implicit.acquireDeferredLocks( lockTracer );
} }


@Override @Override
Expand Down
Expand Up @@ -116,7 +116,7 @@ public void shouldDeferAllLocks() throws Exception
expected.add( lockUnit ); expected.add( lockUnit );
} }
actualClient.assertRegisteredLocks( Collections.emptySet() ); actualClient.assertRegisteredLocks( Collections.emptySet() );
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
actualClient.assertRegisteredLocks( expected ); actualClient.assertRegisteredLocks( expected );
Expand Down Expand Up @@ -286,7 +286,7 @@ public void exclusiveLockAcquiredMultipleTimesCanNotBeReleasedAtOnce() throws Ex
client.releaseExclusive( ResourceTypes.NODE, 1 ); client.releaseExclusive( ResourceTypes.NODE, 1 );


// WHEN // WHEN
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) ); actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) );
Expand All @@ -305,7 +305,7 @@ public void sharedLockAcquiredMultipleTimesCanNotBeReleasedAtOnce() throws Excep
client.releaseShared( ResourceTypes.NODE, 1 ); client.releaseShared( ResourceTypes.NODE, 1 );


// WHEN // WHEN
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) ); actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) );
Expand All @@ -324,7 +324,7 @@ public void acquireBothSharedAndExclusiveLockThenReleaseShared()
client.releaseShared( ResourceTypes.NODE, 1 ); client.releaseShared( ResourceTypes.NODE, 1 );


// WHEN // WHEN
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) ); actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) );
Expand All @@ -347,7 +347,7 @@ public void exclusiveLocksAcquiredFirst()
client.acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 42 ); client.acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 42 );


// WHEN // WHEN
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
Set<LockUnit> expectedLocks = new LinkedHashSet<>( Set<LockUnit> expectedLocks = new LinkedHashSet<>(
Expand Down Expand Up @@ -376,7 +376,7 @@ public void acquireBothSharedAndExclusiveLockThenReleaseExclusive()
client.releaseExclusive( ResourceTypes.NODE, 1 ); client.releaseExclusive( ResourceTypes.NODE, 1 );


// WHEN // WHEN
client.acquireDeferredLocks(); client.acquireDeferredLocks( LockTracer.NONE );


// THEN // THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) ); actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) );
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void shouldDoNothingWithClientWhenPreparingForCommitWithNoLocksAcquired()
final DeferringStatementLocks statementLocks = new DeferringStatementLocks( client ); final DeferringStatementLocks statementLocks = new DeferringStatementLocks( client );


// WHEN // WHEN
statementLocks.prepareForCommit(); statementLocks.prepareForCommit( LockTracer.NONE );


// THEN // THEN
verifyNoMoreInteractions( client ); verifyNoMoreInteractions( client );
Expand All @@ -73,7 +73,7 @@ public void shouldPrepareExplicitForCommitWhenLocksAcquire() throws Exception
statementLocks.optimistic().acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 ); statementLocks.optimistic().acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 );
statementLocks.optimistic().acquireExclusive( LockTracer.NONE, ResourceTypes.RELATIONSHIP, 42 ); statementLocks.optimistic().acquireExclusive( LockTracer.NONE, ResourceTypes.RELATIONSHIP, 42 );
verify( client, never() ).acquireExclusive( eq( LockTracer.NONE ), any( ResourceType.class ), anyLong() ); verify( client, never() ).acquireExclusive( eq( LockTracer.NONE ), any( ResourceType.class ), anyLong() );
statementLocks.prepareForCommit(); statementLocks.prepareForCommit( LockTracer.NONE );


// THEN // THEN
verify( client ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 ); verify( client ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 );
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha.cluster.modeswitch;

import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.lock.SlaveStatementLocksFactory;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;

/**
* Statement locks factory switcher that will use original configured locks factory in case if
* instance is master, other wise slave specific locks factory will be provided that have additional
* capabilities of acquiring some shared locks on master during commit
*/
public class StatementLocksFactorySwitcher extends AbstractComponentSwitcher<StatementLocksFactory>
{
private final StatementLocksFactory configuredStatementLocksFactory;

public StatementLocksFactorySwitcher( DelegateInvocationHandler<StatementLocksFactory> delegate,
StatementLocksFactory configuredStatementLocksFactory )
{
super( delegate );
this.configuredStatementLocksFactory = configuredStatementLocksFactory;
}

@Override
protected StatementLocksFactory getMasterImpl()
{
return configuredStatementLocksFactory;
}

@Override
protected StatementLocksFactory getSlaveImpl()
{
return new SlaveStatementLocksFactory( configuredStatementLocksFactory );
}
}
Expand Up @@ -106,6 +106,7 @@
import org.neo4j.kernel.ha.cluster.modeswitch.LockManagerSwitcher; import org.neo4j.kernel.ha.cluster.modeswitch.LockManagerSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.PropertyKeyCreatorSwitcher; import org.neo4j.kernel.ha.cluster.modeswitch.PropertyKeyCreatorSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.RelationshipTypeCreatorSwitcher; import org.neo4j.kernel.ha.cluster.modeswitch.RelationshipTypeCreatorSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.StatementLocksFactorySwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.UpdatePullerSwitcher; import org.neo4j.kernel.ha.cluster.modeswitch.UpdatePullerSwitcher;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.ConversationManager; import org.neo4j.kernel.ha.com.master.ConversationManager;
Expand Down Expand Up @@ -150,6 +151,7 @@
import org.neo4j.kernel.impl.factory.StatementLocksFactorySelector; import org.neo4j.kernel.impl.factory.StatementLocksFactorySelector;
import org.neo4j.kernel.impl.index.IndexConfigStore; import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
Expand Down Expand Up @@ -493,7 +495,7 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
createLockManager( componentSwitcherContainer, config, masterDelegateInvocationHandler, createLockManager( componentSwitcherContainer, config, masterDelegateInvocationHandler,
requestContextFactory, platformModule.availabilityGuard, platformModule.clock, logging ) ); requestContextFactory, platformModule.availabilityGuard, platformModule.clock, logging ) );


statementLocksFactory = new StatementLocksFactorySelector( lockManager, config, logging ).select(); statementLocksFactory = createStatementLocksFactory( componentSwitcherContainer, config, logging );


propertyKeyTokenHolder = dependencies.satisfyDependency( new DelegatingPropertyKeyTokenHolder( propertyKeyTokenHolder = dependencies.satisfyDependency( new DelegatingPropertyKeyTokenHolder(
createPropertyKeyCreator( config, componentSwitcherContainer, createPropertyKeyCreator( config, componentSwitcherContainer,
Expand Down Expand Up @@ -551,6 +553,24 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true ); procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true );
} }


private StatementLocksFactory createStatementLocksFactory( ComponentSwitcherContainer componentSwitcherContainer,
Config config, LogService logging )
{
StatementLocksFactory configuredStatementLocks = new StatementLocksFactorySelector( lockManager, config, logging ).select();

DelegateInvocationHandler<StatementLocksFactory> locksFactoryDelegate =
new DelegateInvocationHandler<>( StatementLocksFactory.class );
StatementLocksFactory locksFactory =
(StatementLocksFactory) newProxyInstance( StatementLocksFactory.class.getClassLoader(),
new Class[]{StatementLocksFactory.class}, locksFactoryDelegate );

StatementLocksFactorySwitcher
locksSwitcher = new StatementLocksFactorySwitcher( locksFactoryDelegate, configuredStatementLocks );
componentSwitcherContainer.add( locksSwitcher );

return locksFactory;
}

static Predicate<String> fileWatcherFileNameFilter() static Predicate<String> fileWatcherFileNameFilter()
{ {
return Predicates.any( return Predicates.any(
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.kernel.ha.lock; package org.neo4j.kernel.ha.lock;


import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.collection.primitive.PrimitiveLongCollections;
Expand All @@ -43,6 +45,9 @@
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType; import org.neo4j.storageengine.api.lock.ResourceType;


import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.neo4j.kernel.impl.locking.LockType.READ; import static org.neo4j.kernel.impl.locking.LockType.READ;
import static org.neo4j.kernel.impl.locking.LockType.WRITE; import static org.neo4j.kernel.impl.locking.LockType.WRITE;


Expand Down Expand Up @@ -86,9 +91,9 @@ public void acquireShared( LockTracer tracer, ResourceType resourceType, long...
long[] newResourceIds = firstTimeSharedLocks( resourceType, resourceIds ); long[] newResourceIds = firstTimeSharedLocks( resourceType, resourceIds );
if ( newResourceIds.length > 0 ) if ( newResourceIds.length > 0 )
{ {
try ( LockWaitEvent event = tracer.waitForLock( false, resourceType, newResourceIds ) ) try
{ {
acquireSharedOnMaster( resourceType, newResourceIds ); acquireSharedOnMasterFiltered( tracer, resourceType, newResourceIds );
} }
catch ( Throwable failure ) catch ( Throwable failure )
{ {
Expand Down Expand Up @@ -223,6 +228,32 @@ public long activeLockCount()
return client.activeLockCount(); return client.activeLockCount();
} }


void acquireDeferredSharedLocks( LockTracer tracer )
{
assertNotStopped();
Map<ResourceType,List<Long>> deferredLocksMap =
client.activeLocks().filter( activeLock -> ActiveLock.SHARED_MODE.equals( activeLock.mode() ) )
.filter( this::isLabelOrRelationshipType )
.collect( groupingBy( ActiveLock::resourceType, mapping( ActiveLock::resourceId, toList() ) ) );

deferredLocksMap.forEach( ( type, ids ) -> lockResourcesOnMaster( tracer, type, ids ) );
}

private void lockResourcesOnMaster( LockTracer tracer, ResourceType type, List<Long> ids )
{
long[] resourceIds = PrimitiveLongCollections.asArray( ids.iterator() );
try ( LockWaitEvent event = tracer.waitForLock( false, type, resourceIds ) )
{
acquireSharedOnMaster( type, resourceIds );
}
}

private boolean isLabelOrRelationshipType( ActiveLock activeLock )
{
return (activeLock.resourceType() == ResourceTypes.LABEL) ||
(activeLock.resourceType() == ResourceTypes.RELATIONSHIP_TYPE);
}

private void stopLockSessionOnMaster() private void stopLockSessionOnMaster()
{ {
try try
Expand Down Expand Up @@ -318,14 +349,23 @@ private void releaseExclusive( ResourceType resourceType, long[] resourceIds, lo
} }
} }


private void acquireSharedOnMaster( ResourceType resourceType, long... resourceIds ) private void acquireSharedOnMasterFiltered( LockTracer lockTracer, ResourceType resourceType, long... resourceIds )
{ {
if ( resourceType == ResourceTypes.INDEX_ENTRY ) if ( (resourceType == ResourceTypes.INDEX_ENTRY) ||
(resourceType == ResourceTypes.LABEL) ||
(resourceType == ResourceTypes.RELATIONSHIP_TYPE) )
{ {
return; return;
} }
makeSureTxHasBeenInitialized(); try ( LockWaitEvent event = lockTracer.waitForLock( false, resourceType, resourceIds ) )
{
acquireSharedOnMaster( resourceType, resourceIds );
}
}


private void acquireSharedOnMaster( ResourceType resourceType, long[] resourceIds )
{
makeSureTxHasBeenInitialized();
RequestContext requestContext = newRequestContextFor( this ); RequestContext requestContext = newRequestContextFor( this );
try ( Response<LockResult> response = master.acquireSharedLock( requestContext, resourceType, resourceIds ) ) try ( Response<LockResult> response = master.acquireSharedLock( requestContext, resourceType, resourceIds ) )
{ {
Expand Down

0 comments on commit 380dd10

Please sign in to comment.