Skip to content

Commit

Permalink
Skip response unpacking for END_LOCK_SESSION request
Browse files Browse the repository at this point in the history
Master processes END_LOCK_SESSION request and sends back an obligation with
txId. Slave is responsible to pull all transactions up to that txId.

Stop of a SlaveLocksClient, with tx termination aware locks enabled, sends
END_LOCK_SESSION request to the master. This is done to make sure master
kills the lock session for a terminated or closed transaction.

Safe id freeing for HA slaves involves transaction termination to avoid
potentially unsafe transactions and dirty reads. Such transactions are
terminated as part of update pulling. When tx termination aware locks were
turned on, update pulling terminated transactions and tried to unpack the
END_LOCK_SESSION response. This basically caused update pulling to call itself
and hang waiting for an obligation.

This commit fixes the issue by making END_LOCK_SESSION not unpack the response.
This is safe to do because committed transaction pulls as part of the COMMIT
request, rolled back and terminated transactions do not care about pulling
because they do not change store and do not read any more.
  • Loading branch information
lutovich committed Jul 12, 2016
1 parent 4da5b40 commit 7da6d6c
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 27 deletions.
Expand Up @@ -173,7 +173,26 @@ public Response<Void> call( Master master, RequestContext context, ChannelBuffer
{ {
return master.endLockSession( context, readBoolean( input ) ); return master.endLockSession( context, readBoolean( input ) );
} }
}, VOID_SERIALIZER ), }, VOID_SERIALIZER )
{
@Override
public boolean responseShouldBeUnpacked()
{
/*
END_LOCK_SESSION request can be send in 3 cases:
1) transaction committed successfully
2) transaction rolled back successfully
3) transaction was terminated
Master's response for this call is an obligation to pull up to a specified txId.
Processing/unpacking of this response is not needed in all 3 cases:
1) committed transaction pulls transaction stream as part of COMMIT call
2) rolled back transaction does not care about reading any more
3) terminated transaction does not care about reading any more
*/
return false;
}
},


// ==== // ====
HANDSHAKE( new TargetCaller<Master, HandshakeResult>() HANDSHAKE( new TargetCaller<Master, HandshakeResult>()
Expand Down
112 changes: 107 additions & 5 deletions enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java
Expand Up @@ -27,7 +27,11 @@
import java.io.File; import java.io.File;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;


import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
Expand All @@ -38,24 +42,30 @@
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException; import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.graphdb.factory.TestHighlyAvailableGraphDatabaseFactory; import org.neo4j.graphdb.factory.TestHighlyAvailableGraphDatabaseFactory;
import org.neo4j.helpers.collection.MapUtil; import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase; import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.ha.ClusterManager; import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.shell.ShellClient; import org.neo4j.shell.ShellClient;
import org.neo4j.shell.ShellException; import org.neo4j.shell.ShellException;
import org.neo4j.shell.ShellLobby; import org.neo4j.shell.ShellLobby;
import org.neo4j.shell.ShellSettings; import org.neo4j.shell.ShellSettings;
import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory;


import static java.lang.System.currentTimeMillis;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

import static org.junit.Assert.fail;
import static java.lang.System.currentTimeMillis;

import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize; import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable;
Expand Down Expand Up @@ -120,6 +130,67 @@ public void makeSureUpdatePullerGetsGoingAfterMasterSwitch() throws Throwable
awaitPropagation( 2, commonNodeId, cluster ); awaitPropagation( 2, commonNodeId, cluster );
} }


@Test
public void terminatedTransactionDoesNotForceUpdatePullingWithTxTerminationAwareLocks() throws Throwable
{
int testTxsOnMaster = 42;
File root = testDirectory.directory( testName.getMethodName() );
ClusterManager clusterManager = new ClusterManager.Builder( root )
.withSharedConfig( MapUtil.stringMap(
HaSettings.pull_interval.name(), "0s",
KernelTransactions.tx_termination_aware_locks.name(), Settings.TRUE ) ).build();
clusterManager.start();
cluster = clusterManager.getDefaultCluster();

HighlyAvailableGraphDatabase master = cluster.getMaster();
final HighlyAvailableGraphDatabase slave = cluster.getAnySlave();

createNodeOn( master );
cluster.sync();

long lastClosedTxIdOnMaster = lastClosedTxIdOn( master );
long lastClosedTxIdOnSlave = lastClosedTxIdOn( slave );

final CountDownLatch slaveTxStarted = new CountDownLatch( 1 );
final CountDownLatch slaveShouldCommit = new CountDownLatch( 1 );
final AtomicReference<Transaction> slaveTx = new AtomicReference<>();
Future<?> slaveCommit = Executors.newSingleThreadExecutor().submit( new Runnable()
{
@Override
public void run()
{
try ( Transaction tx = slave.beginTx() )
{
slaveTx.set( tx );
slaveTxStarted.countDown();
await( slaveShouldCommit );
tx.success();
}
}
} );

await( slaveTxStarted );
createNodesOn( master, testTxsOnMaster );

assertNotNull( slaveTx.get() );
slaveTx.get().terminate();
slaveShouldCommit.countDown();

try
{
slaveCommit.get();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( ExecutionException.class ) );
assertThat( e.getCause(), instanceOf( TransientTransactionFailureException.class ) );
}

assertEquals( lastClosedTxIdOnMaster + testTxsOnMaster, lastClosedTxIdOn( master ) );
assertEquals( lastClosedTxIdOnSlave, lastClosedTxIdOn( slave ) );
}

@Test @Test
public void pullUpdatesShellAppPullsUpdates() throws Throwable public void pullUpdatesShellAppPullsUpdates() throws Throwable
{ {
Expand Down Expand Up @@ -239,9 +310,22 @@ public void leftCluster( InstanceId instanceId, URI member )


private long createNodeOnMaster() private long createNodeOnMaster()
{ {
try ( Transaction tx = cluster.getMaster().beginTx() ) return createNodeOn( cluster.getMaster() );
}

private static void createNodesOn( GraphDatabaseService db, int count )
{
for ( int i = 0; i < count; i++ )
{
createNodeOn( db );
}
}

private static long createNodeOn( GraphDatabaseService db )
{
try ( Transaction tx = db.beginTx() )
{ {
long id = cluster.getMaster().createNode().getId(); long id = db.createNode().getId();
tx.success(); tx.success();
return id; return id;
} }
Expand Down Expand Up @@ -302,4 +386,22 @@ private void setProperty( HighlyAvailableGraphDatabase db, long nodeId, int i )
tx.success(); tx.success();
} }
} }

private void await( CountDownLatch latch )
{
try
{
assertTrue( latch.await( 1, TimeUnit.MINUTES ) );
}
catch ( InterruptedException e )
{
throw new AssertionError( e );
}
}

private long lastClosedTxIdOn( GraphDatabaseAPI db )
{
TransactionIdStore txIdStore = db.getDependencyResolver().resolveDependency( TransactionIdStore.class );
return txIdStore.getLastClosedTransactionId();
}
} }
Expand Up @@ -22,6 +22,7 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;


Expand All @@ -46,6 +47,7 @@
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.MasterClient214; import org.neo4j.kernel.ha.MasterClient214;
import org.neo4j.kernel.ha.com.master.ConversationManager; import org.neo4j.kernel.ha.com.master.ConversationManager;
import org.neo4j.kernel.ha.com.master.HandshakeResult;
import org.neo4j.kernel.ha.com.master.MasterImpl; import org.neo4j.kernel.ha.com.master.MasterImpl;
import org.neo4j.kernel.ha.com.master.MasterImpl.Monitor; import org.neo4j.kernel.ha.com.master.MasterImpl.Monitor;
import org.neo4j.kernel.ha.com.master.MasterImplTest; import org.neo4j.kernel.ha.com.master.MasterImplTest;
Expand Down Expand Up @@ -87,11 +89,14 @@
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_RESPONSE_UNPACKER; import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_RESPONSE_UNPACKER;
import static org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import static org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker.DEFAULT_BATCH_SIZE; import static org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker.DEFAULT_BATCH_SIZE;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;


Expand Down Expand Up @@ -199,6 +204,37 @@ public IndexUpdatesValidator indexUpdatesValidator()
verify( txIdStore, times( TX_LOG_COUNT ) ).transactionClosed( anyLong(), anyLong(), anyLong() ); verify( txIdStore, times( TX_LOG_COUNT ) ).transactionClosed( anyLong(), anyLong(), anyLong() );
} }


@Test
public void endLockSessionDoesNotUnpackResponse() throws Throwable
{
StoreId storeId = new StoreId( 1, 2, 3, 4 );
long txChecksum = 123;
long lastAppliedTx = 5;

ResponseUnpacker responseUnpacker = mock( ResponseUnpacker.class );
MasterImpl.SPI masterImplSPI = MasterImplTest.mockedSpi( storeId );
when( masterImplSPI.packTransactionObligationResponse( any( RequestContext.class ), Matchers.anyObject() ) )
.thenReturn( Response.empty() );
when( masterImplSPI.getTransactionChecksum( anyLong() ) ).thenReturn( txChecksum );

cleanupRule.add( newMasterServer( masterImplSPI ) );

MasterClient214 client = cleanupRule.add( newMasterClient214( storeId, responseUnpacker ) );

HandshakeResult handshakeResult;
try ( Response<HandshakeResult> handshakeResponse = client.handshake( 1, storeId ) )
{
handshakeResult = handshakeResponse.response();
}
verify( responseUnpacker ).unpackResponse( any( Response.class ), any( TxHandler.class ) );
reset( responseUnpacker );

RequestContext context = new RequestContext( handshakeResult.epoch(), 1, 1, lastAppliedTx, txChecksum );

client.endLockSession( context, false );
verify( responseUnpacker, never() ).unpackResponse( any( Response.class ), any( TxHandler.class ) );
}

private static MasterImpl.SPI mockMasterImplSpiWith( StoreId storeId ) private static MasterImpl.SPI mockMasterImplSpiWith( StoreId storeId )
{ {
return when( mock( MasterImpl.SPI.class ).storeId() ).thenReturn( storeId ).getMock(); return when( mock( MasterImpl.SPI.class ).storeId() ).thenReturn( storeId ).getMock();
Expand Down

0 comments on commit 7da6d6c

Please sign in to comment.