From 4f9687e47996b80dea1f00a301176371235116ed Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 7 Jan 2016 13:36:05 +0100 Subject: [PATCH] Retry replication until lock session changes. Previously we would give up after a maximum timeout which isn't suitable under the leader-only lock management strategy (and probably not under any other strategy either). Since the lock session change now exists as a safe and natural abort marker, we use that instead. --- .../neo4j/kernel/api/exceptions/Status.java | 2 + .../replication/session/OperationContext.java | 10 ++++ .../ReplicatedTransactionCommitProcess.java | 53 +++++++++--------- .../server/CoreEdgeClusterSettings.java | 5 -- .../core/EnterpriseCoreEditionModule.java | 19 +++---- ...eplicatedTransactionCommitProcessTest.java | 54 +++++++++++++++++-- 6 files changed, 100 insertions(+), 43 deletions(-) diff --git a/community/storage-engine-api/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/storage-engine-api/src/main/java/org/neo4j/kernel/api/exceptions/Status.java index d98ad3d68c4ca..c8db5406aeae4 100644 --- a/community/storage-engine-api/src/main/java/org/neo4j/kernel/api/exceptions/Status.java +++ b/community/storage-engine-api/src/main/java/org/neo4j/kernel/api/exceptions/Status.java @@ -109,6 +109,8 @@ enum Transaction implements Status ReleaseLocksFailed( DatabaseError, "The transaction was unable to release one or more of its locks." ), AcquireLockTimeout( TransientError, "The transaction was unable to acquire a lock, for instance due to a " + "timeout or the transaction thread being interrupted." ), + LockSessionInvalid( TransientError, "The lock session under which this transaction was started is no longer valid." ), + DeadlockDetected( TransientError, "This transaction, and at least one more transaction, has acquired locks " + "in a way that it will wait indefinitely, and the database has aborted it. Retrying this transaction " + "will most likely be successful."), diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OperationContext.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OperationContext.java index 7725eb8629a05..14088d47183d0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OperationContext.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OperationContext.java @@ -48,4 +48,14 @@ protected LocalSession localSession() { return localSession; } + + + @Override + public String toString() + { + return "OperationContext{" + + "globalSession=" + globalSession + + ", localOperationId=" + localOperationId + + '}'; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java index 785cbde996dc1..fff766be127d1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcess.java @@ -29,35 +29,38 @@ import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; import org.neo4j.coreedge.raft.replication.session.OperationContext; -import org.neo4j.helpers.Clock; +import org.neo4j.coreedge.server.core.CurrentReplicatedLockState; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionToApply; +import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.logging.Log; import org.neo4j.storageengine.api.TransactionApplicationMode; import static org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionFactory.createImmutableReplicatedTransaction; +import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionInvalid; public class ReplicatedTransactionCommitProcess extends LifecycleAdapter implements TransactionCommitProcess { private final Replicator replicator; private final ReplicatedTransactionStateMachine replicatedTxListener; - private final Clock clock; private final long retryIntervalMillis; - private final long maxRetryTimeMillis; + private final CurrentReplicatedLockState currentReplicatedLockState; private final LocalSessionPool sessionPool; + private final Log log; public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPool sessionPool, - ReplicatedTransactionStateMachine replicatedTxListener, Clock clock, - long retryIntervalMillis, long maxRetryTimeMillis ) + ReplicatedTransactionStateMachine replicatedTxListener, + long retryIntervalMillis, CurrentReplicatedLockState currentReplicatedLockState, LogService logging ) { this.sessionPool = sessionPool; this.replicatedTxListener = replicatedTxListener; this.replicator = replicator; - this.clock = clock; this.retryIntervalMillis = retryIntervalMillis; - this.maxRetryTimeMillis = maxRetryTimeMillis; + this.currentReplicatedLockState = currentReplicatedLockState; + this.log = logging.getInternalLog( getClass() ); replicator.subscribe( this.replicatedTxListener ); } @@ -79,20 +82,29 @@ public long commit( final TransactionToApply tx, throw new TransactionFailureException( "Could not create immutable object for replication", e ); } - boolean lastRound = false; - long startTime = clock.currentTimeMillis(); while ( true ) { final Future futureTxId = replicatedTxListener.getFutureTxId( operationContext.localOperationId() ); try { - replicator.replicate( transaction ); + int currentLockSessionId = currentReplicatedLockState.currentLockSession().id(); + int txLockSessionId = tx.transactionRepresentation().getLockSessionId(); + if ( currentLockSessionId != txLockSessionId ) + { + /* It is safe and necessary to give up at this point, since the currently valid lock + session of the cluster has changed, and even if a previous replication of the + transaction content does eventually get replicated (e.g. delayed on the network), + then it will be ignored by the RTSM. So giving up and subsequently releasing + locks (in KTI) is safe. */ - /* The last round should wait for a longer time to keep issues arising from false negatives very rare - * (e.g. local actor thinks commit failed, while it was committed in the cluster). */ - long responseWaitTime = lastRound ? retryIntervalMillis : maxRetryTimeMillis/2; - Long txId = futureTxId.get( responseWaitTime, TimeUnit.MILLISECONDS ); + throw new TransactionFailureException( LockSessionInvalid, + "The lock session in the cluster has changed: " + + "[current lock session id:%d, tx lock session id:%d]", + currentLockSessionId, txLockSessionId ); + } + replicator.replicate( transaction ); + Long txId = futureTxId.get( retryIntervalMillis, TimeUnit.MILLISECONDS ); sessionPool.releaseSession( operationContext ); return txId; @@ -100,21 +112,14 @@ public long commit( final TransactionToApply tx, catch ( InterruptedException | TimeoutException e ) { futureTxId.cancel( false ); - - if ( lastRound ) - { - throw new TransactionFailureException( "Failed to commit transaction within time bound", e ); - } - else if ( (clock.currentTimeMillis() - startTime) >= maxRetryTimeMillis/2 ) - { - lastRound = true; - } } catch ( ReplicationFailedException | ExecutionException e ) { + futureTxId.cancel( false ); throw new TransactionFailureException( "Failed to replicate transaction", e ); } - System.out.println( "Retrying replication" ); + + log.info( "Retrying replication: " + operationContext ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java index 679310771ea0f..0e84f84f3cb10 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java @@ -95,11 +95,6 @@ public String toString() public static final Setting tx_replication_retry_interval = setting( "core_edge.tx_replication_retry_interval", DURATION, "1s" ); - @Description("The maximum time for trying to replicate a transaction and receive a successful response. " + - "Note that the transaction might still have been committed in the cluster.") - public static final Setting tx_replication_timeout = - setting( "core_edge.tx_replication_timeout", DURATION, "30s" ); - @Description("Expected size of core cluster") public static final Setting expected_core_cluster_size = setting( "core_edge.expected_core_cluster_size", INTEGER, "3" ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index be02bfe2921b3..9ae5e76fee2f6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -30,8 +30,6 @@ import org.neo4j.coreedge.catchup.CheckpointerSupplier; import org.neo4j.coreedge.catchup.DataSourceSupplier; import org.neo4j.coreedge.catchup.StoreIdSupplier; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.catchup.storecopy.edge.CopiedStoreRecovery; import org.neo4j.coreedge.discovery.CoreDiscoveryService; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector; @@ -81,7 +79,6 @@ import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.Clock; -import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.DatabaseAvailability; @@ -201,7 +198,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, ReplicatedLockStateMachine replicatedLockStateMachine = new ReplicatedLockStateMachine<>( myself, replicator ); commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine, - dependencies, SYSTEM_CLOCK ); + dependencies, logging ); ReplicatedIdAllocationStateMachine idAllocationStateMachine = null; try @@ -310,11 +307,10 @@ private File createRaftLogsDirectory( File dir, FileSystemAbstraction fileSystem } public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator, - final LocalSessionPool localSessionPool, - CurrentReplicatedLockState - currentReplicatedLockState, - final Dependencies dependencies, - final Clock clock ) + final LocalSessionPool localSessionPool, + final CurrentReplicatedLockState currentReplicatedLockState, + final Dependencies dependencies, + final LogService logging ) { return ( appender, applier, config ) -> { TransactionRepresentationCommitProcess localCommit = @@ -329,9 +325,10 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator.subscribe( replicatedTxStateMachine ); return new ReplicatedTransactionCommitProcess( replicator, localSessionPool, - replicatedTxStateMachine, clock, + replicatedTxStateMachine, config.get( CoreEdgeClusterSettings.tx_replication_retry_interval ), - config.get( CoreEdgeClusterSettings.tx_replication_timeout ) ); + currentReplicatedLockState, logging + ); }; } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java index 36cb4c79a1529..5adf921025a26 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionCommitProcessTest.java @@ -29,10 +29,13 @@ import org.neo4j.coreedge.raft.replication.session.LocalOperationId; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.core.CurrentReplicatedLockState; +import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.api.TransactionToApply; -import org.neo4j.helpers.Clock; +import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import static junit.framework.TestCase.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; @@ -56,12 +59,18 @@ public void shouldReplicateOnlyOnceIfFirstAttemptSuccessful() throws Exception Replicator replicator = mock( Replicator.class ); ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class ); Future future = mock( Future.class ); + + CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class ); + when( lockSession.id() ).thenReturn( 0 ); + CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class ); + when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession ); + when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenReturn( 23l ); when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future ); // when new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), - transactionStateMachine, Clock.SYSTEM_CLOCK, 1, 30 ) + transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() ) .commit( tx(), NULL, INTERNAL ); // then @@ -75,18 +84,57 @@ public void shouldRetryReplicationIfFirstAttemptTimesOut() throws Exception Replicator replicator = mock( Replicator.class ); ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class ); Future future = mock( Future.class ); + + CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class ); + when( lockSession.id() ).thenReturn( 0 ); + CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class ); + when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession ); + when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future ); when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenThrow( TimeoutException.class ).thenReturn( 23l ); // when new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), - transactionStateMachine, Clock.SYSTEM_CLOCK, 1, 30 ) + transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() ) .commit( tx(), NULL, INTERNAL ); // then verify( replicator, times( 2 ) ).replicate( any( ReplicatedTransaction.class ) ); } + @Test + public void shouldNotRetryReplicationIfLockSessionChanges() throws Exception + { + // given + Replicator replicator = mock( Replicator.class ); + ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class ); + Future future = mock( Future.class ); + + CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class ); + when( lockSession.id() ).thenReturn( 0, 1 ); // Lock session id change. + CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class ); + when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession ); + + when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future ); + when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenThrow( TimeoutException.class ); + + // when + try + { + new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ), + transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() ) + .commit( tx(), NULL, INTERNAL ); + fail( "Should have thrown "); + } + catch( TransactionFailureException e ) + { + // expected + } + + // then + verify( replicator, times( 1 ) ).replicate( any( ReplicatedTransaction.class ) ); + } + private TransactionToApply tx() { TransactionRepresentation tx = mock( TransactionRepresentation.class );