Skip to content

Commit

Permalink
Slave - master lock session termination
Browse files Browse the repository at this point in the history
Commit makes it possible to terminate lock sessions that slaves have on master.
This is useful when terminating write transactions on slaves because they have
corresponding lock clients on master.

When SlaveLocksClient is stopped it will end the corresponding lock session on
master and stop local locks client.

Feature is guarded by `experimental.tx_termination_aware_locks` settings which
is off by default.
  • Loading branch information
lutovich committed Jun 8, 2016
1 parent 3a64a6f commit 6470ca2
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 51 deletions.
Expand Up @@ -296,6 +296,10 @@ public Response<Void> endLockSession( RequestContext context, boolean success )
{
assertCorrectEpoch( context );
conversationManager.end( context );
if ( !success )
{
conversationManager.stop( context );
}
return spi.packTransactionObligationResponse( context, null );
}

Expand Down
Expand Up @@ -631,7 +631,7 @@ public Locks newInstance()
{
return CommunityEditionModule.createLockManager( config, logging );
}
} ) );
}, config ) );
return lockManager;
}

Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.function.Factory;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
Expand All @@ -35,17 +36,19 @@ public class LockManagerModeSwitcher extends AbstractModeSwitcher<Locks>
private final RequestContextFactory requestContextFactory;
private final AvailabilityGuard availabilityGuard;
private final Factory<Locks> locksFactory;
private final Config config;

public LockManagerModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<Locks> delegate, DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard,
Factory<Locks> locksFactory )
Factory<Locks> locksFactory, Config config )
{
super( modeSwitcherNotifier, delegate );
this.master = master;
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.locksFactory = locksFactory;
this.config = config;
}

@Override
Expand All @@ -58,6 +61,6 @@ protected Locks getMasterImpl( LifeSupport life )
protected Locks getSlaveImpl( LifeSupport life )
{
return life.add( new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(),
availabilityGuard ) );
availabilityGuard, config ) );
}
}
Expand Up @@ -20,8 +20,10 @@
package org.neo4j.kernel.ha.lock;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

Expand All @@ -31,21 +33,24 @@ public class SlaveLockManager extends LifecycleAdapter implements Locks
private final Locks local;
private final Master master;
private final AvailabilityGuard availabilityGuard;
private final boolean txTerminationAwareLocks;

public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master,
AvailabilityGuard availabilityGuard )
AvailabilityGuard availabilityGuard, Config config )
{
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.local = localLocks;
this.master = master;
this.txTerminationAwareLocks = config.get( KernelTransactions.tx_termination_aware_locks );
}

@Override
public Client newClient()
{
Client client = local.newClient();
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard );
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard,
txTerminationAwareLocks );
}

@Override
Expand Down
Expand Up @@ -58,20 +58,23 @@ class SlaveLocksClient implements Locks.Client
// Using atomic ints to avoid creating garbage through boxing.
private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> sharedLocks;
private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> exclusiveLocks;
private final boolean txTerminationAwareLocks;
private boolean initialized = false;

public SlaveLocksClient(
Master master,
Locks.Client local,
Locks localLockManager,
RequestContextFactory requestContextFactory,
AvailabilityGuard availabilityGuard )
AvailabilityGuard availabilityGuard,
boolean txTerminationAwareLocks )
{
this.master = master;
this.client = local;
this.localLockManager = localLockManager;
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.txTerminationAwareLocks = txTerminationAwareLocks;
sharedLocks = new HashMap<>();
exclusiveLocks = new HashMap<>();
}
Expand Down Expand Up @@ -209,7 +212,17 @@ public void releaseAll()
@Override
public void stop()
{
throw new UnsupportedOperationException( "Lock client stop is unsupported on slave side." );
if ( txTerminationAwareLocks )
{
try ( Response<Void> ignore = master.endLockSession( newRequestContextFor( client ), false ) )
{
client.stop();
}
}
else
{
throw new UnsupportedOperationException( "Lock client stop is unsupported on slave side." );
}
}

@Override
Expand Down
174 changes: 157 additions & 17 deletions enterprise/ha/src/test/java/org/neo4j/ha/ClusterTransactionIT.java
Expand Up @@ -19,51 +19,63 @@
*/
package org.neo4j.ha;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleListener;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.test.ha.ClusterRule;
import org.neo4j.tooling.GlobalGraphOperations;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.helpers.Exceptions.contains;
import static org.neo4j.helpers.NamedThreadFactory.named;
import static org.neo4j.helpers.collection.IteratorUtil.single;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;

public class ClusterTransactionIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() );

private ClusterManager.ManagedCluster cluster;

@Before
public void setUp() throws Exception
{
cluster = clusterRule.withProvider( clusterOfSize( 3 ) )
.withSharedSetting( HaSettings.ha_server, ":6001-6005" )
.withSharedSetting( HaSettings.tx_push_factor, "2" ).startCluster();

cluster.await( ClusterManager.allSeesAllAsAvailable() );
}
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withProvider( clusterOfSize( 3 ) )
.withSharedSetting( HaSettings.ha_server, ":6001-6005" )
.withSharedSetting( HaSettings.tx_push_factor, "2" );

@Test
public void givenClusterWhenShutdownMasterThenCannotStartTransactionOnSlave() throws Throwable
{
ClusterManager.ManagedCluster cluster = startCluster();

final HighlyAvailableGraphDatabase master = cluster.getMaster();
final HighlyAvailableGraphDatabase slave = cluster.getAnySlave();

Expand Down Expand Up @@ -119,6 +131,8 @@ public void notifyStatusChanged( Object instance, LifecycleStatus from, Lifecycl
@Test
public void slaveMustConnectLockManagerToNewMasterAfterTwoOtherClusterMembersRoleSwitch() throws Throwable
{
ClusterManager.ManagedCluster cluster = startCluster();

final HighlyAvailableGraphDatabase initialMaster = cluster.getMaster();
HighlyAvailableGraphDatabase firstSlave = cluster.getAnySlave();
HighlyAvailableGraphDatabase secondSlave = cluster.getAnySlave( firstSlave );
Expand Down Expand Up @@ -161,4 +175,130 @@ public void slaveMustConnectLockManagerToNewMasterAfterTwoOtherClusterMembersRol
assertThat( IteratorUtil.count( gops.getAllNodes() ), is( 3 ) );
}
}

@Test
public void terminateSlaveTransactionThatWaitsForLockOnMaster() throws Exception
{
clusterRule.withSharedSetting( HaSettings.lock_read_timeout, "1m" );
clusterRule.withSharedSetting( KernelTransactions.tx_termination_aware_locks, Settings.TRUE );

ClusterManager.ManagedCluster cluster = startCluster();

final Label label = DynamicLabel.label( "foo" );
final String property = "bar";
final String masterValue = "master";
final String slaveValue = "slave";

final HighlyAvailableGraphDatabase master = cluster.getMaster();
final HighlyAvailableGraphDatabase slave = cluster.getAnySlave();

createNodeWithLabel( cluster, label );

final CountDownLatch masterTxCommit = new CountDownLatch( 1 );
Future<?> masterTx = newSingleThreadExecutor( named( "masterTx" ) ).submit( new Runnable()
{
@Override
public void run()
{
try ( Transaction tx = master.beginTx() )
{
Node node = single( master.findNodes( label ) );
node.setProperty( property, masterValue );
await( masterTxCommit );
tx.success();
}
}
} );

final AtomicReference<Transaction> slaveTxReference = new AtomicReference<>();
final CountDownLatch slaveTxStarted = new CountDownLatch( 1 );
Future<?> slaveTx = newSingleThreadExecutor( named( "slaveTx" ) ).submit( new Runnable()
{
@Override
public void run()
{
try ( Transaction tx = slave.beginTx() )
{
slaveTxReference.set( tx );
Node node = single( slave.findNodes( label ) );
slaveTxStarted.countDown();
node.setProperty( property, slaveValue );
tx.success();
}
}
} );

slaveTxStarted.await();
Thread.sleep( 2000 );

terminate( slaveTxReference );
assertTxWasTerminated( slaveTx );

masterTxCommit.countDown();
assertNull( masterTx.get() );
assertSingleNodeExists( master, label, property, masterValue );
}

private void createNodeWithLabel( ClusterManager.ManagedCluster cluster, Label label ) throws InterruptedException
{
HighlyAvailableGraphDatabase master = cluster.getMaster();
try ( Transaction tx = master.beginTx() )
{
master.createNode( label );
tx.success();
}

cluster.sync();
}

private void assertSingleNodeExists( HighlyAvailableGraphDatabase db, Label label, String property, String value )
{
try ( Transaction tx = db.beginTx() )
{
Node node = single( db.findNodes( label ) );
assertTrue( node.hasProperty( property ) );
assertEquals( value, node.getProperty( property ) );
tx.success();
}
}

private void terminate( AtomicReference<Transaction> txReference )
{
Transaction tx = txReference.get();
assertNotNull( tx );
tx.terminate();
}

private void assertTxWasTerminated( Future<?> txFuture ) throws InterruptedException
{
try
{
txFuture.get();
fail( "Exception expected" );
}
catch ( ExecutionException e )
{
e.printStackTrace();
assertThat( e.getCause(), instanceOf( TransactionTerminatedException.class ) );
}
}

private static void await( CountDownLatch latch )
{
try
{
assertTrue( latch.await( 2, TimeUnit.MINUTES ) );
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
}

private ClusterManager.ManagedCluster startCluster() throws Exception
{
ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
cluster.await( ClusterManager.allSeesAllAsAvailable() );
return cluster;
}
}
Expand Up @@ -225,7 +225,7 @@ else if ( i >= 1 )
}
else
{
worker.master.endLockSession( worker.requestContext, true );
endLockSession( worker );
return IDLE;
}
}
Expand All @@ -242,7 +242,7 @@ State next( SlaveEmulatorWorker worker ) throws Exception
}
else
{
worker.master.endLockSession( worker.requestContext, true );
endLockSession( worker );
return IDLE;
}
}
Expand Down Expand Up @@ -303,15 +303,16 @@ private int newLockSessionId()
{
return random.nextInt();
}
}

static class ConversationTestMasterSPI implements MasterImpl.SPI
{

public ConversationTestMasterSPI()
private static void endLockSession( SlaveEmulatorWorker worker )
{
boolean successfulSession = worker.random.nextBoolean();
worker.master.endLockSession( worker.requestContext, successfulSession );
}
}

static class ConversationTestMasterSPI implements MasterImpl.SPI
{
@Override
public boolean isAccessible()
{
Expand Down

0 comments on commit 6470ca2

Please sign in to comment.