From d3a483e29073e000306366670d21d75c64e1a8f4 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Fri, 26 Aug 2016 14:45:27 +0200 Subject: [PATCH] Fix TxPushStrategyConfigIT.twoRoundRobin flakyness The test would fail whenever there are failure in the replication attempt. Indeed since the push factor is on a best effort mode in reality we could be more far behind on slaves of what the test is claiming. This change will try to consider the failed replications when asserting. Such change should make the test more reliable. --- .../ha/MasterTransactionCommitProcess.java | 25 ++- .../modeswitch/CommitProcessSwitcher.java | 11 +- .../factory/HighlyAvailableEditionModule.java | 2 +- .../ha/transaction/TransactionPropagator.java | 154 ++++++++++-------- .../kernel/ha/TxPushStrategyConfigIT.java | 20 ++- 5 files changed, 122 insertions(+), 90 deletions(-) diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterTransactionCommitProcess.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterTransactionCommitProcess.java index 4c25ad493db9a..baa095e947f03 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterTransactionCommitProcess.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterTransactionCommitProcess.java @@ -33,29 +33,40 @@ */ public class MasterTransactionCommitProcess implements TransactionCommitProcess { + private final TransactionCommitProcess inner; private final TransactionPropagator txPropagator; private final IntegrityValidator validator; - private final TransactionCommitProcess inner; + private final Monitor monitor; + + public interface Monitor + { + void missedReplicas( int number ); + } - public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, - TransactionPropagator txPropagator, - IntegrityValidator validator ) + public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, TransactionPropagator txPropagator, + IntegrityValidator validator, Monitor monitor ) { this.inner = commitProcess; this.txPropagator = txPropagator; this.validator = validator; + this.monitor = monitor; } @Override - public long commit( TransactionToApply batch, CommitEvent commitEvent, - TransactionApplicationMode mode ) throws TransactionFailureException + public long commit( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode ) + throws TransactionFailureException { validate( batch ); long result = inner.commit( batch, commitEvent, mode ); // Assuming all the transactions come from the same author - txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() ); + int missedReplicas = txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() ); + + if ( missedReplicas > 0 ) + { + monitor.missedReplicas( missedReplicas ); + } return result; } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/modeswitch/CommitProcessSwitcher.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/modeswitch/CommitProcessSwitcher.java index 70880e9512cf5..2b4edbc310e11 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/modeswitch/CommitProcessSwitcher.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/modeswitch/CommitProcessSwitcher.java @@ -30,6 +30,7 @@ import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.state.IntegrityValidator; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.storageengine.api.StorageEngine; public class CommitProcessSwitcher extends AbstractComponentSwitcher @@ -38,16 +39,18 @@ public class CommitProcessSwitcher extends AbstractComponentSwitcher delegate, RequestContextFactory requestContextFactory, - DependencyResolver dependencyResolver ) + Monitors monitors, DependencyResolver dependencyResolver ) { super( delegate ); this.txPropagator = txPropagator; this.master = master; this.requestContextFactory = requestContextFactory; this.dependencyResolver = dependencyResolver; + this.monitor = monitors.newMonitor( MasterTransactionCommitProcess.Monitor.class ); } @Override @@ -63,9 +66,7 @@ protected TransactionCommitProcess getMasterImpl() dependencyResolver.resolveDependency( TransactionAppender.class ), dependencyResolver.resolveDependency( StorageEngine.class ) ); - return new MasterTransactionCommitProcess( - commitProcess, - txPropagator, - dependencyResolver.resolveDependency( IntegrityValidator.class ) ); + IntegrityValidator validator = dependencyResolver.resolveDependency( IntegrityValidator.class ); + return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor ); } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java index 7ab9899672837..fa338fa3bb449 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java @@ -572,7 +572,7 @@ private CommitProcessFactory createCommitProcessFactory( Dependencies dependenci TransactionCommitProcess.class ); CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator, - master, commitProcessDelegate, requestContextFactory, dependencies ); + master, commitProcessDelegate, requestContextFactory, monitors, dependencies ); componentSwitcherContainer.add( commitProcessSwitcher ); return new HighlyAvailableCommitProcessFactory( commitProcessDelegate ); diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/transaction/TransactionPropagator.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/transaction/TransactionPropagator.java index b2868d49d87a7..415cae01e05de 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/transaction/TransactionPropagator.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/transaction/TransactionPropagator.java @@ -162,12 +162,12 @@ public TransactionPropagator( Configuration config, Log log, Slaves slaves, Comm } @Override - public void init() throws Throwable + public void init() { } @Override - public void start() throws Throwable + public void start() { this.slaveCommitters = Executors.newCachedThreadPool( new NamedThreadFactory( "slave-committer" ) ); desiredReplicationFactor = config.getTxPushFactor(); @@ -175,17 +175,23 @@ public void start() throws Throwable } @Override - public void stop() throws Throwable + public void stop() { this.slaveCommitters.shutdown(); } @Override - public void shutdown() throws Throwable + public void shutdown() { } - public void committed( long txId, int authorId ) + /** + * + * @param txId transaction id to replicate + * @param authorId author id for such transaction id + * @return the number of missed replicas (e.g., desired replication factor - number of successful replications) + */ + public int committed( long txId, int authorId ) { int replicationFactor = desiredReplicationFactor; // If the author is not this instance, then we need to push to one less - the committer already has it @@ -197,94 +203,100 @@ public void committed( long txId, int authorId ) if ( replicationFactor == 0 ) { - return; + return replicationFactor; } Collection committers = new HashSet<>(); - try + + // TODO: Move this logic into {@link CommitPusher} + // Commit at the configured amount of slaves in parallel. + int successfulReplications = 0; + Iterator slaveList = filter( replicationStrategy.prioritize( slaves.getSlaves() ).iterator(), authorId ); + CompletionNotifier notifier = new CompletionNotifier(); + + // Start as many initial committers as needed + for ( int i = 0; i < replicationFactor && slaveList.hasNext(); i++ ) { - // TODO: Move this logic into {@link CommitPusher} - // Commit at the configured amount of slaves in parallel. - int successfulReplications = 0; - Iterator slaveList = filter( replicationStrategy.prioritize( slaves.getSlaves() ).iterator(), - authorId ); - CompletionNotifier notifier = new CompletionNotifier(); - - // Start as many initial committers as needed - for ( int i = 0; i < replicationFactor && slaveList.hasNext(); i++ ) - { - Slave slave = slaveList.next(); - Callable slaveCommitter = slaveCommitter( slave, txId, notifier ); - committers.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), slave ) ); - } + Slave slave = slaveList.next(); + Callable slaveCommitter = slaveCommitter( slave, txId, notifier ); + committers.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), slave ) ); + } - // Wait for them and perhaps spawn new ones for failing committers until we're done - // or until we have no more slaves to try out. - Collection toAdd = new ArrayList<>(); - Collection toRemove = new ArrayList<>(); - while ( !committers.isEmpty() && successfulReplications < replicationFactor ) + // Wait for them and perhaps spawn new ones for failing committers until we're done + // or until we have no more slaves to try out. + Collection toAdd = new ArrayList<>(); + Collection toRemove = new ArrayList<>(); + while ( !committers.isEmpty() && successfulReplications < replicationFactor ) + { + toAdd.clear(); + toRemove.clear(); + for ( ReplicationContext context : committers ) { - toAdd.clear(); - toRemove.clear(); - for ( ReplicationContext context : committers ) + if ( !context.future.isDone() ) + { + continue; + } + + if ( isSuccessful( context ) ) + // This committer was successful, increment counter + { + successfulReplications++; + } + else if ( slaveList.hasNext() ) + // This committer failed, spawn another one { - if ( !context.future.isDone() ) + Slave newSlave = slaveList.next(); + Callable slaveCommitter; + try { - continue; + slaveCommitter = slaveCommitter( newSlave, txId, notifier ); } - - if ( isSuccessful( context ) ) - // This committer was successful, increment counter + catch ( Throwable t ) { - successfulReplications++; + log.error( "Unknown error commit master transaction at slave", t ); + return desiredReplicationFactor /* missed them all :( */; } - else if ( slaveList.hasNext() ) - // This committer failed, spawn another one + finally { - Slave newSlave = slaveList.next(); - Callable slaveCommitter = slaveCommitter( newSlave, txId, notifier ); - toAdd.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), newSlave ) ); + // Cancel all ongoing committers in the executor + for ( ReplicationContext committer : committers ) + { + committer.future.cancel( false ); + } } - toRemove.add( context ); - } - // Incorporate the results into committers collection - if ( !toAdd.isEmpty() ) - { - committers.addAll( toAdd ); - } - if ( !toRemove.isEmpty() ) - { - committers.removeAll( toRemove ); + toAdd.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), newSlave ) ); } + toRemove.add( context ); + } - if ( !committers.isEmpty() ) - // There are committers doing work right now, so go and wait for - // any of the committers to be done so that we can reevaluate - // the situation again. - { - notifier.waitForAnyCompletion(); - } + // Incorporate the results into committers collection + if ( !toAdd.isEmpty() ) + { + committers.addAll( toAdd ); + } + if ( !toRemove.isEmpty() ) + { + committers.removeAll( toRemove ); } - // We did the best we could, have we committed successfully on enough slaves? - if ( !(successfulReplications >= replicationFactor) ) + if ( !committers.isEmpty() ) + // There are committers doing work right now, so go and wait for + // any of the committers to be done so that we can reevaluate + // the situation again. { - pushedToTooFewSlaveLogger.info( "Transaction " + txId + " couldn't commit on enough slaves, desired " + - replicationFactor + ", but could only commit at " + successfulReplications ); + notifier.waitForAnyCompletion(); } } - catch ( Throwable t ) - { - log.error( "Unknown error commit master transaction at slave", t ); - } - finally + + // We did the best we could, have we committed successfully on enough slaves? + if ( successfulReplications < replicationFactor ) { - // Cancel all ongoing committers in the executor - for ( ReplicationContext context : committers ) - { - context.future.cancel( false ); - } + pushedToTooFewSlaveLogger + .info( "Transaction " + txId + " couldn't commit on enough slaves, desired " + replicationFactor + + ", but could only commit at " + successfulReplications ); } + + return replicationFactor - successfulReplications; } private Iterator filter( Iterator slaves, final Integer externalAuthorServerId ) diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/TxPushStrategyConfigIT.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/TxPushStrategyConfigIT.java index 676b9140f13a3..9514021b6d2f7 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/TxPushStrategyConfigIT.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/TxPushStrategyConfigIT.java @@ -27,12 +27,14 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.cluster.InstanceId; import org.neo4j.helpers.TransactionTemplate; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.impl.ha.ClusterManager.ManagedCluster; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.test.SuppressOutput; import org.neo4j.test.ha.ClusterRule; @@ -99,7 +101,11 @@ public void twoRoundRobin() throws Exception { ManagedCluster cluster = startCluster( 4, 2, HaSettings.TxPushStrategy.round_robin ); - long txId = getLastTx( cluster.getMaster() ); + HighlyAvailableGraphDatabase master = cluster.getMaster(); + Monitors monitors = master.getDependencyResolver().resolveDependency( Monitors.class ); + AtomicInteger totalMissedReplicas = new AtomicInteger(); + monitors.addMonitorListener( (MasterTransactionCommitProcess.Monitor) totalMissedReplicas::addAndGet ); + long txId = getLastTx( master ); int count = 15; for ( int i = 0; i < count; i++ ) { @@ -115,11 +121,13 @@ public void twoRoundRobin() throws Exception } assertEquals( txId + count, max ); - assertTrue( "There should be members with transactions in the cluster", min != -1 ); - assertTrue( "There should be members with transactions in the cluster", max != -1 ); + assertTrue( "There should be members with transactions in the cluster", min != -1 && max != -1 ); + + int minLaggingBehindThreshold = 1 /* this is the value without errors */ + + totalMissedReplicas.get() /* let's consider the missed replications */; assertThat( "There should at most be a txId gap of 1 among the cluster members since the transaction pushing " + - "goes in a round robin fashion. min:" + min + ", max:" + max, - (int) (max - min), lessThanOrEqualTo( 1 ) ); + "goes in a round robin fashion. min:" + min + ", max:" + max, (int) (max - min), + lessThanOrEqualTo( minLaggingBehindThreshold ) ); } @Test @@ -194,7 +202,7 @@ private ManagedCluster startCluster( int memberCount, final int pushFactor, fina return cluster; } - private void mapMachineIds(final ManagedCluster cluster ) + private void mapMachineIds( ManagedCluster cluster ) { machineIds = new InstanceId[cluster.size()]; machineIds[0] = cluster.getServerId( cluster.getMaster() );