diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/master/MasterImpl.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/master/MasterImpl.java index 8c824058c24ba..7c963fad3e21e 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/master/MasterImpl.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/master/MasterImpl.java @@ -296,6 +296,10 @@ public Response endLockSession( RequestContext context, boolean success ) { assertCorrectEpoch( context ); conversationManager.end( context ); + if ( !success ) + { + conversationManager.stop( context ); + } return spi.packTransactionObligationResponse( context, null ); } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java index d9b2c69baecfc..f9f099c5f6cae 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java @@ -631,7 +631,7 @@ public Locks newInstance() { return CommunityEditionModule.createLockManager( config, logging ); } - } ) ); + }, config ) ); return lockManager; } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockManagerModeSwitcher.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockManagerModeSwitcher.java index d14f21f903698..aed0eec278c08 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockManagerModeSwitcher.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockManagerModeSwitcher.java @@ -21,6 +21,7 @@ import org.neo4j.function.Factory; import org.neo4j.kernel.AvailabilityGuard; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.ha.DelegateInvocationHandler; import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher; import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier; @@ -35,17 +36,19 @@ public class LockManagerModeSwitcher extends AbstractModeSwitcher private final RequestContextFactory requestContextFactory; private final AvailabilityGuard availabilityGuard; private final Factory locksFactory; + private final Config config; public LockManagerModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier, DelegateInvocationHandler delegate, DelegateInvocationHandler master, RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard, - Factory locksFactory ) + Factory locksFactory, Config config ) { super( modeSwitcherNotifier, delegate ); this.master = master; this.requestContextFactory = requestContextFactory; this.availabilityGuard = availabilityGuard; this.locksFactory = locksFactory; + this.config = config; } @Override @@ -58,6 +61,6 @@ protected Locks getMasterImpl( LifeSupport life ) protected Locks getSlaveImpl( LifeSupport life ) { return life.add( new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(), - availabilityGuard ) ); + availabilityGuard, config ) ); } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLockManager.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLockManager.java index 67da7b590f432..43866c8c3f72d 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLockManager.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLockManager.java @@ -20,8 +20,10 @@ package org.neo4j.kernel.ha.lock; import org.neo4j.kernel.AvailabilityGuard; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -31,21 +33,24 @@ public class SlaveLockManager extends LifecycleAdapter implements Locks private final Locks local; private final Master master; private final AvailabilityGuard availabilityGuard; + private final boolean txTerminationAwareLocks; public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master, - AvailabilityGuard availabilityGuard ) + AvailabilityGuard availabilityGuard, Config config ) { this.requestContextFactory = requestContextFactory; this.availabilityGuard = availabilityGuard; this.local = localLocks; this.master = master; + this.txTerminationAwareLocks = config.get( KernelTransactions.tx_termination_aware_locks ); } @Override public Client newClient() { Client client = local.newClient(); - return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard ); + return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard, + txTerminationAwareLocks ); } @Override diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java index e976702ba53b0..2d33f1ba254f2 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java @@ -58,6 +58,7 @@ class SlaveLocksClient implements Locks.Client // Using atomic ints to avoid creating garbage through boxing. private final Map> sharedLocks; private final Map> exclusiveLocks; + private final boolean txTerminationAwareLocks; private boolean initialized = false; public SlaveLocksClient( @@ -65,13 +66,15 @@ public SlaveLocksClient( Locks.Client local, Locks localLockManager, RequestContextFactory requestContextFactory, - AvailabilityGuard availabilityGuard ) + AvailabilityGuard availabilityGuard, + boolean txTerminationAwareLocks ) { this.master = master; this.client = local; this.localLockManager = localLockManager; this.requestContextFactory = requestContextFactory; this.availabilityGuard = availabilityGuard; + this.txTerminationAwareLocks = txTerminationAwareLocks; sharedLocks = new HashMap<>(); exclusiveLocks = new HashMap<>(); } @@ -209,7 +212,17 @@ public void releaseAll() @Override public void stop() { - throw new UnsupportedOperationException( "Lock client stop is unsupported on slave side." ); + if ( txTerminationAwareLocks ) + { + try ( Response ignore = master.endLockSession( newRequestContextFor( client ), false ) ) + { + client.stop(); + } + } + else + { + throw new UnsupportedOperationException( "Lock client stop is unsupported on slave side." ); + } } @Override diff --git a/enterprise/ha/src/test/java/org/neo4j/ha/ClusterTransactionIT.java b/enterprise/ha/src/test/java/org/neo4j/ha/ClusterTransactionIT.java index 8d32f7a57f65b..f04a18e3af9ab 100644 --- a/enterprise/ha/src/test/java/org/neo4j/ha/ClusterTransactionIT.java +++ b/enterprise/ha/src/test/java/org/neo4j/ha/ClusterTransactionIT.java @@ -19,18 +19,28 @@ */ package org.neo4j.ha; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.neo4j.graphdb.DynamicLabel; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.helpers.collection.IteratorUtil; import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.ha.ClusterManager; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifecycleListener; @@ -38,32 +48,34 @@ import org.neo4j.test.ha.ClusterRule; import org.neo4j.tooling.GlobalGraphOperations; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.helpers.Exceptions.contains; +import static org.neo4j.helpers.NamedThreadFactory.named; +import static org.neo4j.helpers.collection.IteratorUtil.single; import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize; public class ClusterTransactionIT { @Rule - public final ClusterRule clusterRule = new ClusterRule( getClass() ); - - private ClusterManager.ManagedCluster cluster; - - @Before - public void setUp() throws Exception - { - cluster = clusterRule.withProvider( clusterOfSize( 3 ) ) - .withSharedSetting( HaSettings.ha_server, ":6001-6005" ) - .withSharedSetting( HaSettings.tx_push_factor, "2" ).startCluster(); - - cluster.await( ClusterManager.allSeesAllAsAvailable() ); - } + public final ClusterRule clusterRule = new ClusterRule( getClass() ) + .withProvider( clusterOfSize( 3 ) ) + .withSharedSetting( HaSettings.ha_server, ":6001-6005" ) + .withSharedSetting( HaSettings.tx_push_factor, "2" ); @Test public void givenClusterWhenShutdownMasterThenCannotStartTransactionOnSlave() throws Throwable { + ClusterManager.ManagedCluster cluster = startCluster(); + final HighlyAvailableGraphDatabase master = cluster.getMaster(); final HighlyAvailableGraphDatabase slave = cluster.getAnySlave(); @@ -119,6 +131,8 @@ public void notifyStatusChanged( Object instance, LifecycleStatus from, Lifecycl @Test public void slaveMustConnectLockManagerToNewMasterAfterTwoOtherClusterMembersRoleSwitch() throws Throwable { + ClusterManager.ManagedCluster cluster = startCluster(); + final HighlyAvailableGraphDatabase initialMaster = cluster.getMaster(); HighlyAvailableGraphDatabase firstSlave = cluster.getAnySlave(); HighlyAvailableGraphDatabase secondSlave = cluster.getAnySlave( firstSlave ); @@ -161,4 +175,130 @@ public void slaveMustConnectLockManagerToNewMasterAfterTwoOtherClusterMembersRol assertThat( IteratorUtil.count( gops.getAllNodes() ), is( 3 ) ); } } + + @Test + public void terminateSlaveTransactionThatWaitsForLockOnMaster() throws Exception + { + clusterRule.withSharedSetting( HaSettings.lock_read_timeout, "1m" ); + clusterRule.withSharedSetting( KernelTransactions.tx_termination_aware_locks, Settings.TRUE ); + + ClusterManager.ManagedCluster cluster = startCluster(); + + final Label label = DynamicLabel.label( "foo" ); + final String property = "bar"; + final String masterValue = "master"; + final String slaveValue = "slave"; + + final HighlyAvailableGraphDatabase master = cluster.getMaster(); + final HighlyAvailableGraphDatabase slave = cluster.getAnySlave(); + + createNodeWithLabel( cluster, label ); + + final CountDownLatch masterTxCommit = new CountDownLatch( 1 ); + Future masterTx = newSingleThreadExecutor( named( "masterTx" ) ).submit( new Runnable() + { + @Override + public void run() + { + try ( Transaction tx = master.beginTx() ) + { + Node node = single( master.findNodes( label ) ); + node.setProperty( property, masterValue ); + await( masterTxCommit ); + tx.success(); + } + } + } ); + + final AtomicReference slaveTxReference = new AtomicReference<>(); + final CountDownLatch slaveTxStarted = new CountDownLatch( 1 ); + Future slaveTx = newSingleThreadExecutor( named( "slaveTx" ) ).submit( new Runnable() + { + @Override + public void run() + { + try ( Transaction tx = slave.beginTx() ) + { + slaveTxReference.set( tx ); + Node node = single( slave.findNodes( label ) ); + slaveTxStarted.countDown(); + node.setProperty( property, slaveValue ); + tx.success(); + } + } + } ); + + slaveTxStarted.await(); + Thread.sleep( 2000 ); + + terminate( slaveTxReference ); + assertTxWasTerminated( slaveTx ); + + masterTxCommit.countDown(); + assertNull( masterTx.get() ); + assertSingleNodeExists( master, label, property, masterValue ); + } + + private void createNodeWithLabel( ClusterManager.ManagedCluster cluster, Label label ) throws InterruptedException + { + HighlyAvailableGraphDatabase master = cluster.getMaster(); + try ( Transaction tx = master.beginTx() ) + { + master.createNode( label ); + tx.success(); + } + + cluster.sync(); + } + + private void assertSingleNodeExists( HighlyAvailableGraphDatabase db, Label label, String property, String value ) + { + try ( Transaction tx = db.beginTx() ) + { + Node node = single( db.findNodes( label ) ); + assertTrue( node.hasProperty( property ) ); + assertEquals( value, node.getProperty( property ) ); + tx.success(); + } + } + + private void terminate( AtomicReference txReference ) + { + Transaction tx = txReference.get(); + assertNotNull( tx ); + tx.terminate(); + } + + private void assertTxWasTerminated( Future txFuture ) throws InterruptedException + { + try + { + txFuture.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + e.printStackTrace(); + assertThat( e.getCause(), instanceOf( TransactionTerminatedException.class ) ); + } + } + + private static void await( CountDownLatch latch ) + { + try + { + assertTrue( latch.await( 2, TimeUnit.MINUTES ) ); + } + catch ( InterruptedException e ) + { + throw new RuntimeException( e ); + } + } + + private ClusterManager.ManagedCluster startCluster() throws Exception + { + ClusterManager.ManagedCluster cluster = clusterRule.startCluster(); + cluster.await( ClusterManager.allSeesAllAsAvailable() ); + return cluster; + } } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplConversationStopFuzzIT.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplConversationStopFuzzIT.java index ef3015240e94e..9f95fbec78aba 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplConversationStopFuzzIT.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplConversationStopFuzzIT.java @@ -225,7 +225,7 @@ else if ( i >= 1 ) } else { - worker.master.endLockSession( worker.requestContext, true ); + endLockSession( worker ); return IDLE; } } @@ -242,7 +242,7 @@ State next( SlaveEmulatorWorker worker ) throws Exception } else { - worker.master.endLockSession( worker.requestContext, true ); + endLockSession( worker ); return IDLE; } } @@ -303,15 +303,16 @@ private int newLockSessionId() { return random.nextInt(); } - } - - static class ConversationTestMasterSPI implements MasterImpl.SPI - { - public ConversationTestMasterSPI() + private static void endLockSession( SlaveEmulatorWorker worker ) { + boolean successfulSession = worker.random.nextBoolean(); + worker.master.endLockSession( worker.requestContext, successfulSession ); } + } + static class ConversationTestMasterSPI implements MasterImpl.SPI + { @Override public boolean isAccessible() { diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplTest.java index 114ec7b98e1dd..41c523316575f 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/master/MasterImplTest.java @@ -61,7 +61,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -172,30 +172,53 @@ public void shouldNotEndLockSessionWhereThereIsAnActiveLockAcquisition() throws final CountDownLatch latch = new CountDownLatch( 1 ); try { - MasterImpl.SPI spi = mockedSpi(); - DefaultConversationSPI conversationSpi = mockedConversationSpi(); - when( spi.isAccessible() ).thenReturn( true ); - Client client = mock( Client.class ); - doAnswer( new Answer() + Client client = newWaitingLocksClient( latch ); + final MasterImpl master = newMasterWithLocksClient( client ); + HandshakeResult handshake = master.handshake( 1, new StoreId() ).response(); + + // WHEN + final RequestContext context = new RequestContext( handshake.epoch(), 1, 2, 0, 0 ); + master.newLockSession( context ); + Future acquireFuture = otherThread.execute( new WorkerCommand() { @Override - public Void answer( InvocationOnMock invocation ) throws Throwable + public Void doWork( Void state ) throws Exception { - latch.await(); + master.acquireExclusiveLock( context, ResourceTypes.NODE, 1L ); return null; } - } ).when( client ).acquireExclusive( any( ResourceType.class ), anyLong() ); - when( conversationSpi.acquireClient() ).thenReturn( client ); - Config config = config( 20 ); - ConversationManager conversationManager = new ConversationManager( conversationSpi, config ); - final MasterImpl master = new MasterImpl( spi, conversationManager, mock( Monitor.class ), config ); - master.start(); + } ); + otherThread.get().waitUntilWaiting(); + master.endLockSession( context, true ); + verify( client, never() ).stop(); + verify( client, never() ).close(); + latch.countDown(); + acquireFuture.get(); + + // THEN + verify( client ).close(); + } + finally + { + latch.countDown(); + } + } + + @Test + public void shouldStopLockSessionOnFailureWhereThereIsAnActiveLockAcquisition() throws Throwable + { + // GIVEN + final CountDownLatch latch = new CountDownLatch( 1 ); + try + { + Client client = newWaitingLocksClient( latch ); + final MasterImpl master = newMasterWithLocksClient( client ); HandshakeResult handshake = master.handshake( 1, new StoreId() ).response(); // WHEN final RequestContext context = new RequestContext( handshake.epoch(), 1, 2, 0, 0 ); master.newLockSession( context ); - Future acquireFuture = otherThread.execute( new WorkerCommand() + Future acquireFuture = otherThread.execute( new WorkerCommand() { @Override public Void doWork( Void state ) throws Exception @@ -206,12 +229,13 @@ public Void doWork( Void state ) throws Exception } ); otherThread.get().waitUntilWaiting(); master.endLockSession( context, false ); - verify( client, times( 0 ) ).close(); + verify( client ).stop(); + verify( client, never() ).close(); latch.countDown(); acquireFuture.get(); // THEN - verify( client, times( 1 ) ).close(); + verify( client ).close(); } finally { @@ -219,6 +243,37 @@ public Void doWork( Void state ) throws Exception } } + private MasterImpl newMasterWithLocksClient( Client client ) throws Throwable + { + SPI spi = mockedSpi(); + DefaultConversationSPI conversationSpi = mockedConversationSpi(); + when( spi.isAccessible() ).thenReturn( true ); + when( conversationSpi.acquireClient() ).thenReturn( client ); + Config config = config( 20 ); + ConversationManager conversationManager = new ConversationManager( conversationSpi, config ); + + MasterImpl master = new MasterImpl( spi, conversationManager, mock( Monitor.class ), config ); + master.start(); + return master; + } + + private Client newWaitingLocksClient( final CountDownLatch latch ) + { + Client client = mock( Client.class ); + + doAnswer( new Answer() + { + @Override + public Void answer( InvocationOnMock invocation ) throws Throwable + { + latch.await(); + return null; + } + } ).when( client ).acquireExclusive( any( ResourceType.class ), anyLong() ); + + return client; + } + @Test public void shouldNotAllowCommitIfThereIsNoMatchingLockSession() throws Throwable { diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java index b846e38b2f912..3a784af4bf43b 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java @@ -24,6 +24,7 @@ import org.neo4j.helpers.Clock; import org.neo4j.kernel.AvailabilityGuard; +import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.impl.locking.Locks; @@ -86,6 +87,6 @@ public void doesNotCreateClientsAfterShutdown() throws Throwable private SlaveLockManager newSlaveLockManager( Locks localLocks ) { - return new SlaveLockManager( localLocks, requestContextFactory, master, availabilityGuard ); + return new SlaveLockManager( localLocks, requestContextFactory, master, availabilityGuard, new Config() ); } } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java index e9a4b94bbc0f4..0bdc0de90cf2e 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java @@ -110,7 +110,7 @@ public void readersCanAcquireLockAsSoonAsItReleasedOnMaster() throws Interrupted private SlaveLocksClient createClient() { return new SlaveLocksClient( master, lockManager.newClient(), lockManager, - requestContextFactory, availabilityGuard ); + requestContextFactory, availabilityGuard, false ); } private static class LockedOnMasterAnswer implements Answer diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java index 09106b731abe5..8968a99833883 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java @@ -80,7 +80,7 @@ public void setUp() throws Exception whenMasterAcquireExclusive().thenReturn( responseOk ); - client = new SlaveLocksClient( master, local, lockManager, requestContextFactory, availabilityGuard ); + client = new SlaveLocksClient( master, local, lockManager, requestContextFactory, availabilityGuard, false ); } private OngoingStubbing> whenMasterAcquireShared()