From 7da6d6c8edc9f102c43869041eb6367a7092bf90 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 12 Jul 2016 16:01:09 +0200 Subject: [PATCH] Skip response unpacking for END_LOCK_SESSION request Master processes END_LOCK_SESSION request and sends back an obligation with txId. Slave is responsible to pull all transactions up to that txId. Stop of a SlaveLocksClient, with tx termination aware locks enabled, sends END_LOCK_SESSION request to the master. This is done to make sure master kills the lock session for a terminated or closed transaction. Safe id freeing for HA slaves involves transaction termination to avoid potentially unsafe transactions and dirty reads. Such transactions are terminated as part of update pulling. When tx termination aware locks were turned on, update pulling terminated transactions and tried to unpack the END_LOCK_SESSION response. This basically caused update pulling to call itself and hang waiting for an obligation. This commit fixes the issue by making END_LOCK_SESSION not unpack the response. This is safe to do because committed transaction pulls as part of the COMMIT request, rolled back and terminated transactions do not care about pulling because they do not change store and do not read any more. --- .../org/neo4j/kernel/ha/HaRequestType210.java | 21 +++- .../java/org/neo4j/ha/TestPullUpdates.java | 112 +++++++++++++++++- .../neo4j/ha/upgrade/MasterClientTest.java | 36 ++++++ ...minationOfSlavesDuringPullUpdatesTest.java | 68 +++++++---- 4 files changed, 210 insertions(+), 27 deletions(-) diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java index f9cc73308252d..75bfb8e4a61b7 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java @@ -173,7 +173,26 @@ public Response call( Master master, RequestContext context, ChannelBuffer { return master.endLockSession( context, readBoolean( input ) ); } - }, VOID_SERIALIZER ), + }, VOID_SERIALIZER ) + { + @Override + public boolean responseShouldBeUnpacked() + { + /* + END_LOCK_SESSION request can be send in 3 cases: + 1) transaction committed successfully + 2) transaction rolled back successfully + 3) transaction was terminated + + Master's response for this call is an obligation to pull up to a specified txId. + Processing/unpacking of this response is not needed in all 3 cases: + 1) committed transaction pulls transaction stream as part of COMMIT call + 2) rolled back transaction does not care about reading any more + 3) terminated transaction does not care about reading any more + */ + return false; + } + }, // ==== HANDSHAKE( new TargetCaller() diff --git a/enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java b/enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java index 7d45e69665b2f..055fecd891ed0 100644 --- a/enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java +++ b/enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java @@ -27,7 +27,11 @@ import java.io.File; import java.net.URI; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.InstanceId; @@ -38,24 +42,30 @@ import org.neo4j.graphdb.Node; import org.neo4j.graphdb.NotFoundException; import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.TransientTransactionFailureException; import org.neo4j.graphdb.factory.TestHighlyAvailableGraphDatabaseFactory; import org.neo4j.helpers.collection.MapUtil; import org.neo4j.kernel.GraphDatabaseAPI; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.ha.ClusterManager; import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.shell.ShellClient; import org.neo4j.shell.ShellException; import org.neo4j.shell.ShellLobby; import org.neo4j.shell.ShellSettings; import org.neo4j.test.TargetDirectory; +import static java.lang.System.currentTimeMillis; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - -import static java.lang.System.currentTimeMillis; - +import static org.junit.Assert.fail; import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize; import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; @@ -120,6 +130,67 @@ public void makeSureUpdatePullerGetsGoingAfterMasterSwitch() throws Throwable awaitPropagation( 2, commonNodeId, cluster ); } + @Test + public void terminatedTransactionDoesNotForceUpdatePullingWithTxTerminationAwareLocks() throws Throwable + { + int testTxsOnMaster = 42; + File root = testDirectory.directory( testName.getMethodName() ); + ClusterManager clusterManager = new ClusterManager.Builder( root ) + .withSharedConfig( MapUtil.stringMap( + HaSettings.pull_interval.name(), "0s", + KernelTransactions.tx_termination_aware_locks.name(), Settings.TRUE ) ).build(); + clusterManager.start(); + cluster = clusterManager.getDefaultCluster(); + + HighlyAvailableGraphDatabase master = cluster.getMaster(); + final HighlyAvailableGraphDatabase slave = cluster.getAnySlave(); + + createNodeOn( master ); + cluster.sync(); + + long lastClosedTxIdOnMaster = lastClosedTxIdOn( master ); + long lastClosedTxIdOnSlave = lastClosedTxIdOn( slave ); + + final CountDownLatch slaveTxStarted = new CountDownLatch( 1 ); + final CountDownLatch slaveShouldCommit = new CountDownLatch( 1 ); + final AtomicReference slaveTx = new AtomicReference<>(); + Future slaveCommit = Executors.newSingleThreadExecutor().submit( new Runnable() + { + @Override + public void run() + { + try ( Transaction tx = slave.beginTx() ) + { + slaveTx.set( tx ); + slaveTxStarted.countDown(); + await( slaveShouldCommit ); + tx.success(); + } + } + } ); + + await( slaveTxStarted ); + createNodesOn( master, testTxsOnMaster ); + + assertNotNull( slaveTx.get() ); + slaveTx.get().terminate(); + slaveShouldCommit.countDown(); + + try + { + slaveCommit.get(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ExecutionException.class ) ); + assertThat( e.getCause(), instanceOf( TransientTransactionFailureException.class ) ); + } + + assertEquals( lastClosedTxIdOnMaster + testTxsOnMaster, lastClosedTxIdOn( master ) ); + assertEquals( lastClosedTxIdOnSlave, lastClosedTxIdOn( slave ) ); + } + @Test public void pullUpdatesShellAppPullsUpdates() throws Throwable { @@ -239,9 +310,22 @@ public void leftCluster( InstanceId instanceId, URI member ) private long createNodeOnMaster() { - try ( Transaction tx = cluster.getMaster().beginTx() ) + return createNodeOn( cluster.getMaster() ); + } + + private static void createNodesOn( GraphDatabaseService db, int count ) + { + for ( int i = 0; i < count; i++ ) + { + createNodeOn( db ); + } + } + + private static long createNodeOn( GraphDatabaseService db ) + { + try ( Transaction tx = db.beginTx() ) { - long id = cluster.getMaster().createNode().getId(); + long id = db.createNode().getId(); tx.success(); return id; } @@ -302,4 +386,22 @@ private void setProperty( HighlyAvailableGraphDatabase db, long nodeId, int i ) tx.success(); } } + + private void await( CountDownLatch latch ) + { + try + { + assertTrue( latch.await( 1, TimeUnit.MINUTES ) ); + } + catch ( InterruptedException e ) + { + throw new AssertionError( e ); + } + } + + private long lastClosedTxIdOn( GraphDatabaseAPI db ) + { + TransactionIdStore txIdStore = db.getDependencyResolver().resolveDependency( TransactionIdStore.class ); + return txIdStore.getLastClosedTransactionId(); + } } diff --git a/enterprise/ha/src/test/java/org/neo4j/ha/upgrade/MasterClientTest.java b/enterprise/ha/src/test/java/org/neo4j/ha/upgrade/MasterClientTest.java index 075c1810dbf3c..80f07986b7018 100644 --- a/enterprise/ha/src/test/java/org/neo4j/ha/upgrade/MasterClientTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/ha/upgrade/MasterClientTest.java @@ -22,6 +22,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -46,6 +47,7 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.ha.MasterClient214; import org.neo4j.kernel.ha.com.master.ConversationManager; +import org.neo4j.kernel.ha.com.master.HandshakeResult; import org.neo4j.kernel.ha.com.master.MasterImpl; import org.neo4j.kernel.ha.com.master.MasterImpl.Monitor; import org.neo4j.kernel.ha.com.master.MasterImplTest; @@ -87,11 +89,14 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_RESPONSE_UNPACKER; +import static org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import static org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker.DEFAULT_BATCH_SIZE; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -199,6 +204,37 @@ public IndexUpdatesValidator indexUpdatesValidator() verify( txIdStore, times( TX_LOG_COUNT ) ).transactionClosed( anyLong(), anyLong(), anyLong() ); } + @Test + public void endLockSessionDoesNotUnpackResponse() throws Throwable + { + StoreId storeId = new StoreId( 1, 2, 3, 4 ); + long txChecksum = 123; + long lastAppliedTx = 5; + + ResponseUnpacker responseUnpacker = mock( ResponseUnpacker.class ); + MasterImpl.SPI masterImplSPI = MasterImplTest.mockedSpi( storeId ); + when( masterImplSPI.packTransactionObligationResponse( any( RequestContext.class ), Matchers.anyObject() ) ) + .thenReturn( Response.empty() ); + when( masterImplSPI.getTransactionChecksum( anyLong() ) ).thenReturn( txChecksum ); + + cleanupRule.add( newMasterServer( masterImplSPI ) ); + + MasterClient214 client = cleanupRule.add( newMasterClient214( storeId, responseUnpacker ) ); + + HandshakeResult handshakeResult; + try ( Response handshakeResponse = client.handshake( 1, storeId ) ) + { + handshakeResult = handshakeResponse.response(); + } + verify( responseUnpacker ).unpackResponse( any( Response.class ), any( TxHandler.class ) ); + reset( responseUnpacker ); + + RequestContext context = new RequestContext( handshakeResult.epoch(), 1, 1, lastAppliedTx, txChecksum ); + + client.endLockSession( context, false ); + verify( responseUnpacker, never() ).unpackResponse( any( Response.class ), any( TxHandler.class ) ); + } + private static MasterImpl.SPI mockMasterImplSpiWith( StoreId storeId ) { return when( mock( MasterImpl.SPI.class ).storeId() ).thenReturn( storeId ).getMock(); diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/TerminationOfSlavesDuringPullUpdatesTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/TerminationOfSlavesDuringPullUpdatesTest.java index cbf53c3c18524..f026894b41777 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/TerminationOfSlavesDuringPullUpdatesTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/TerminationOfSlavesDuringPullUpdatesTest.java @@ -54,6 +54,7 @@ import static org.junit.Assert.fail; import static org.junit.runners.Parameterized.Parameters; import static org.neo4j.graphdb.DynamicRelationshipType.withName; +import static org.neo4j.kernel.impl.api.KernelTransactions.tx_termination_aware_locks; @RunWith( Parameterized.class ) public class TerminationOfSlavesDuringPullUpdatesTest @@ -68,8 +69,10 @@ public class TerminationOfSlavesDuringPullUpdatesTest .withSharedSetting( HaSettings.tx_push_factor, "0" ); @Parameter - public ReadContestantAction action; + public ReadContestantActions action; @Parameter( 1 ) + public boolean txTerminationAwareLocks; + @Parameter( 2 ) public String name; @Parameters( name = "{1}" ) @@ -77,18 +80,35 @@ public static Iterable data() { return Arrays.asList( new Object[][] { - {new PropertyValueReadContestantAction( longString( 'a' ), longString( 'b' ), - true ), "NodeStringProperty"}, - {new PropertyValueReadContestantAction( longString( 'a' ), longString( 'b' ), - false ), "RelationshipStringProperty"}, - {new PropertyValueReadContestantAction( longArray( 'a' ), longArray( 'b' ), - true ), "NodeArrayProperty"}, - {new PropertyValueReadContestantAction( longArray( 'a' ), longArray( 'b' ), - false ), "RelationshipArrayProperty"}, - {new PropertyKeyReadContestantAction( 'a', 'b', - true ), "NodePropertyKeys"}, - {new PropertyKeyReadContestantAction( 'a', 'b', - false ), "RelationshipPropertyKeys"} + {new PropertyValueActions( longString( 'a' ), longString( 'b' ), true ), + true, "NodeStringProperty[txTerminationAwareLocks=yes]"}, + {new PropertyValueActions( longString( 'a' ), longString( 'b' ), true ), + false, "NodeStringProperty[txTerminationAwareLocks=no]"}, + + {new PropertyValueActions( longString( 'a' ), longString( 'b' ), false ), + true, "RelationshipStringProperty[txTerminationAwareLocks=yes]"}, + {new PropertyValueActions( longString( 'a' ), longString( 'b' ), false ), + false, "RelationshipStringProperty[txTerminationAwareLocks=no]"}, + + {new PropertyValueActions( longArray( 'a' ), longArray( 'b' ), true ), + true, "NodeArrayProperty[txTerminationAwareLocks=yes]"}, + {new PropertyValueActions( longArray( 'a' ), longArray( 'b' ), true ), + false, "NodeArrayProperty[txTerminationAwareLocks=no]"}, + + {new PropertyValueActions( longArray( 'a' ), longArray( 'b' ), false ), + true, "RelationshipArrayProperty[txTerminationAwareLocks=yes]"}, + {new PropertyValueActions( longArray( 'a' ), longArray( 'b' ), false ), + false, "RelationshipArrayProperty[txTerminationAwareLocks=no]"}, + + {new PropertyKeyActions( 'a', 'b', true ), + true, "NodePropertyKeys[txTerminationAwareLocks=yes]"}, + {new PropertyKeyActions( 'a', 'b', true ), + false, "NodePropertyKeys[txTerminationAwareLocks=no]"}, + + {new PropertyKeyActions( 'a', 'b', false ), + true, "RelationshipPropertyKeys[txTerminationAwareLocks=yes]"}, + {new PropertyKeyActions( 'a', 'b', false ), + false, "RelationshipPropertyKeys[txTerminationAwareLocks=no]"} } ); } @@ -99,7 +119,7 @@ public void slavesTerminateOrReadConsistentDataWhenApplyingBatchLargerThanSafeZo long safeZone = TimeUnit.MILLISECONDS.toMillis( 0 ); clusterRule.withSharedSetting( HaSettings.id_reuse_safe_zone_time, String.valueOf( safeZone ) ); // given - final ClusterManager.ManagedCluster cluster = clusterRule.startCluster(); + final ClusterManager.ManagedCluster cluster = startCluster(); HighlyAvailableGraphDatabase master = cluster.getMaster(); // when @@ -133,7 +153,7 @@ public void slavesDontTerminateAndReadConsistentDataWhenApplyingBatchSmallerThan long safeZone = TimeUnit.MINUTES.toMillis( 1 ); clusterRule.withSharedSetting( HaSettings.id_reuse_safe_zone_time, String.valueOf( safeZone ) ); // given - final ClusterManager.ManagedCluster cluster = clusterRule.startCluster(); + final ClusterManager.ManagedCluster cluster = startCluster(); HighlyAvailableGraphDatabase master = cluster.getMaster(); // when @@ -160,7 +180,7 @@ public void slavesDontTerminateAndReadConsistentDataWhenApplyingBatchSmallerThan race.go(); } - private Runnable readContestant( final ReadContestantAction action, final long entityId, + private Runnable readContestant( final ReadContestantActions action, final long entityId, final HighlyAvailableGraphDatabase slave, final AtomicBoolean end ) { return new Runnable() @@ -213,7 +233,7 @@ public void run() }; } - private interface ReadContestantAction + private interface ReadContestantActions { long createInitialEntity( HighlyAvailableGraphDatabase db ); @@ -224,14 +244,14 @@ private interface ReadContestantAction void verifyProperties( HighlyAvailableGraphDatabase db, long entityId ); } - private static class PropertyValueReadContestantAction implements ReadContestantAction + private static class PropertyValueActions implements ReadContestantActions { static final String KEY = "key"; final Object valueA; final Object valueB; final boolean node; - PropertyValueReadContestantAction( Object valueA, Object valueB, boolean node ) + PropertyValueActions( Object valueA, Object valueB, boolean node ) { this.valueA = valueA; this.valueB = valueB; @@ -306,13 +326,13 @@ PropertyContainer getEntity( HighlyAvailableGraphDatabase db, long id ) } } - private static class PropertyKeyReadContestantAction implements ReadContestantAction + private static class PropertyKeyActions implements ReadContestantActions { final char keyPrefixA; final char keyPrefixB; final boolean node; - PropertyKeyReadContestantAction( char keyPrefixA, char keyPrefixB, boolean node ) + PropertyKeyActions( char keyPrefixA, char keyPrefixB, boolean node ) { this.keyPrefixA = keyPrefixA; this.keyPrefixB = keyPrefixB; @@ -398,6 +418,12 @@ PropertyContainer getEntity( HighlyAvailableGraphDatabase db, long id ) } } + private ClusterManager.ManagedCluster startCluster() throws Exception + { + return clusterRule.withSharedSetting( tx_termination_aware_locks, String.valueOf( txTerminationAwareLocks ) ) + .startCluster(); + } + private static Object longArray( char b ) { return longString( b ).toCharArray();