Skip to content

Commit

Permalink
Retry replication until lock session changes.
Browse files Browse the repository at this point in the history
Previously we would give up after a maximum timeout which isn't
suitable under the leader-only lock management strategy (and probably
not under any other strategy either). Since the lock session change
now exists as a safe and natural abort marker, we use that instead.
  • Loading branch information
martinfurmanski committed Jan 7, 2016
1 parent 8a09102 commit 4f9687e
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 43 deletions.
Expand Up @@ -109,6 +109,8 @@ enum Transaction implements Status
ReleaseLocksFailed( DatabaseError, "The transaction was unable to release one or more of its locks." ),
AcquireLockTimeout( TransientError, "The transaction was unable to acquire a lock, for instance due to a " +
"timeout or the transaction thread being interrupted." ),
LockSessionInvalid( TransientError, "The lock session under which this transaction was started is no longer valid." ),

DeadlockDetected( TransientError, "This transaction, and at least one more transaction, has acquired locks " +
"in a way that it will wait indefinitely, and the database has aborted it. Retrying this transaction " +
"will most likely be successful."),
Expand Down
Expand Up @@ -48,4 +48,14 @@ protected LocalSession localSession()
{
return localSession;
}


@Override
public String toString()
{
return "OperationContext{" +
"globalSession=" + globalSession +
", localOperationId=" + localOperationId +
'}';
}
}
Expand Up @@ -29,35 +29,38 @@
import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException;
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.raft.replication.session.OperationContext;
import org.neo4j.helpers.Clock;
import org.neo4j.coreedge.server.core.CurrentReplicatedLockState;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.storageengine.api.TransactionApplicationMode;

import static org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionFactory.createImmutableReplicatedTransaction;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionInvalid;

public class ReplicatedTransactionCommitProcess extends LifecycleAdapter implements TransactionCommitProcess
{
private final Replicator replicator;
private final ReplicatedTransactionStateMachine replicatedTxListener;
private final Clock clock;
private final long retryIntervalMillis;
private final long maxRetryTimeMillis;
private final CurrentReplicatedLockState currentReplicatedLockState;
private final LocalSessionPool sessionPool;
private final Log log;

public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPool sessionPool,
ReplicatedTransactionStateMachine replicatedTxListener, Clock clock,
long retryIntervalMillis, long maxRetryTimeMillis )
ReplicatedTransactionStateMachine replicatedTxListener,
long retryIntervalMillis, CurrentReplicatedLockState currentReplicatedLockState, LogService logging )
{
this.sessionPool = sessionPool;
this.replicatedTxListener = replicatedTxListener;
this.replicator = replicator;
this.clock = clock;
this.retryIntervalMillis = retryIntervalMillis;
this.maxRetryTimeMillis = maxRetryTimeMillis;
this.currentReplicatedLockState = currentReplicatedLockState;
this.log = logging.getInternalLog( getClass() );
replicator.subscribe( this.replicatedTxListener );
}

Expand All @@ -79,42 +82,44 @@ public long commit( final TransactionToApply tx,
throw new TransactionFailureException( "Could not create immutable object for replication", e );
}

boolean lastRound = false;
long startTime = clock.currentTimeMillis();
while ( true )
{
final Future<Long> futureTxId = replicatedTxListener.getFutureTxId( operationContext.localOperationId() );
try
{
replicator.replicate( transaction );
int currentLockSessionId = currentReplicatedLockState.currentLockSession().id();
int txLockSessionId = tx.transactionRepresentation().getLockSessionId();
if ( currentLockSessionId != txLockSessionId )
{
/* It is safe and necessary to give up at this point, since the currently valid lock
session of the cluster has changed, and even if a previous replication of the
transaction content does eventually get replicated (e.g. delayed on the network),
then it will be ignored by the RTSM. So giving up and subsequently releasing
locks (in KTI) is safe. */

/* The last round should wait for a longer time to keep issues arising from false negatives very rare
* (e.g. local actor thinks commit failed, while it was committed in the cluster). */
long responseWaitTime = lastRound ? retryIntervalMillis : maxRetryTimeMillis/2;
Long txId = futureTxId.get( responseWaitTime, TimeUnit.MILLISECONDS );
throw new TransactionFailureException( LockSessionInvalid,
"The lock session in the cluster has changed: " +
"[current lock session id:%d, tx lock session id:%d]",
currentLockSessionId, txLockSessionId );
}

replicator.replicate( transaction );
Long txId = futureTxId.get( retryIntervalMillis, TimeUnit.MILLISECONDS );
sessionPool.releaseSession( operationContext );

return txId;
}
catch ( InterruptedException | TimeoutException e )
{
futureTxId.cancel( false );

if ( lastRound )
{
throw new TransactionFailureException( "Failed to commit transaction within time bound", e );
}
else if ( (clock.currentTimeMillis() - startTime) >= maxRetryTimeMillis/2 )
{
lastRound = true;
}
}
catch ( ReplicationFailedException | ExecutionException e )
{
futureTxId.cancel( false );
throw new TransactionFailureException( "Failed to replicate transaction", e );
}
System.out.println( "Retrying replication" );

log.info( "Retrying replication: " + operationContext );
}
}

Expand Down
Expand Up @@ -95,11 +95,6 @@ public String toString()
public static final Setting<Long> tx_replication_retry_interval =
setting( "core_edge.tx_replication_retry_interval", DURATION, "1s" );

@Description("The maximum time for trying to replicate a transaction and receive a successful response. " +
"Note that the transaction might still have been committed in the cluster.")
public static final Setting<Long> tx_replication_timeout =
setting( "core_edge.tx_replication_timeout", DURATION, "30s" );

@Description("Expected size of core cluster")
public static final Setting<Integer> expected_core_cluster_size =
setting( "core_edge.expected_core_cluster_size", INTEGER, "3" );
Expand Down
Expand Up @@ -30,8 +30,6 @@
import org.neo4j.coreedge.catchup.CheckpointerSupplier;
import org.neo4j.coreedge.catchup.DataSourceSupplier;
import org.neo4j.coreedge.catchup.StoreIdSupplier;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.edge.CopiedStoreRecovery;
import org.neo4j.coreedge.discovery.CoreDiscoveryService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector;
Expand Down Expand Up @@ -81,7 +79,6 @@
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Clock;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.DatabaseAvailability;
Expand Down Expand Up @@ -201,7 +198,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ReplicatedLockStateMachine<CoreMember> replicatedLockStateMachine = new ReplicatedLockStateMachine<>( myself, replicator );

commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine,
dependencies, SYSTEM_CLOCK );
dependencies, logging );

ReplicatedIdAllocationStateMachine idAllocationStateMachine = null;
try
Expand Down Expand Up @@ -310,11 +307,10 @@ private File createRaftLogsDirectory( File dir, FileSystemAbstraction fileSystem
}

public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator,
final LocalSessionPool localSessionPool,
CurrentReplicatedLockState
currentReplicatedLockState,
final Dependencies dependencies,
final Clock clock )
final LocalSessionPool localSessionPool,
final CurrentReplicatedLockState currentReplicatedLockState,
final Dependencies dependencies,
final LogService logging )
{
return ( appender, applier, config ) -> {
TransactionRepresentationCommitProcess localCommit =
Expand All @@ -329,9 +325,10 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator
replicator.subscribe( replicatedTxStateMachine );

return new ReplicatedTransactionCommitProcess( replicator, localSessionPool,
replicatedTxStateMachine, clock,
replicatedTxStateMachine,
config.get( CoreEdgeClusterSettings.tx_replication_retry_interval ),
config.get( CoreEdgeClusterSettings.tx_replication_timeout ) );
currentReplicatedLockState, logging
);
};
}

Expand Down
Expand Up @@ -29,10 +29,13 @@
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.CurrentReplicatedLockState;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;

import static junit.framework.TestCase.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
Expand All @@ -56,12 +59,18 @@ public void shouldReplicateOnlyOnceIfFirstAttemptSuccessful() throws Exception
Replicator replicator = mock( Replicator.class );
ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class );
Future future = mock( Future.class );

CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class );
when( lockSession.id() ).thenReturn( 0 );
CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class );
when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession );

when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenReturn( 23l );
when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future );

// when
new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ),
transactionStateMachine, Clock.SYSTEM_CLOCK, 1, 30 )
transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() )
.commit( tx(), NULL, INTERNAL );

// then
Expand All @@ -75,18 +84,57 @@ public void shouldRetryReplicationIfFirstAttemptTimesOut() throws Exception
Replicator replicator = mock( Replicator.class );
ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class );
Future future = mock( Future.class );

CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class );
when( lockSession.id() ).thenReturn( 0 );
CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class );
when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession );

when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future );
when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenThrow( TimeoutException.class ).thenReturn( 23l );

// when
new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ),
transactionStateMachine, Clock.SYSTEM_CLOCK, 1, 30 )
transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() )
.commit( tx(), NULL, INTERNAL );

// then
verify( replicator, times( 2 ) ).replicate( any( ReplicatedTransaction.class ) );
}

@Test
public void shouldNotRetryReplicationIfLockSessionChanges() throws Exception
{
// given
Replicator replicator = mock( Replicator.class );
ReplicatedTransactionStateMachine transactionStateMachine = mock( ReplicatedTransactionStateMachine.class );
Future future = mock( Future.class );

CurrentReplicatedLockState.LockSession lockSession = mock( CurrentReplicatedLockState.LockSession.class );
when( lockSession.id() ).thenReturn( 0, 1 ); // Lock session id change.
CurrentReplicatedLockState currentReplicatedLockState = mock( CurrentReplicatedLockState.class );
when( currentReplicatedLockState.currentLockSession() ).thenReturn( lockSession );

when( transactionStateMachine.getFutureTxId( any( LocalOperationId.class ) ) ).thenReturn( future );
when( future.get( anyInt(), any( TimeUnit.class ) ) ).thenThrow( TimeoutException.class );

// when
try
{
new ReplicatedTransactionCommitProcess( replicator, new LocalSessionPool( coreMember ),
transactionStateMachine, 1, currentReplicatedLockState, NullLogService.getInstance() )
.commit( tx(), NULL, INTERNAL );
fail( "Should have thrown ");
}
catch( TransactionFailureException e )
{
// expected
}

// then
verify( replicator, times( 1 ) ).replicate( any( ReplicatedTransaction.class ) );
}

private TransactionToApply tx()
{
TransactionRepresentation tx = mock( TransactionRepresentation.class );
Expand Down

0 comments on commit 4f9687e

Please sign in to comment.