Skip to content

Commit

Permalink
Fix TxPushStrategyConfigIT.twoRoundRobin flakyness
Browse files Browse the repository at this point in the history
The test would fail whenever there are failure in the replication
attempt.  Indeed since the push factor is on a best effort mode in
reality we could be more far behind on slaves of what the test is
claiming.  This change will try to consider the failed replications
when asserting.  Such change should make the test more reliable.
  • Loading branch information
davidegrohmann committed Aug 26, 2016
1 parent 8d0d00f commit d3a483e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 90 deletions.
Expand Up @@ -33,29 +33,40 @@
*/ */
public class MasterTransactionCommitProcess implements TransactionCommitProcess public class MasterTransactionCommitProcess implements TransactionCommitProcess
{ {
private final TransactionCommitProcess inner;
private final TransactionPropagator txPropagator; private final TransactionPropagator txPropagator;
private final IntegrityValidator validator; private final IntegrityValidator validator;
private final TransactionCommitProcess inner; private final Monitor monitor;

public interface Monitor
{
void missedReplicas( int number );
}


public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, TransactionPropagator txPropagator,
TransactionPropagator txPropagator, IntegrityValidator validator, Monitor monitor )
IntegrityValidator validator )
{ {
this.inner = commitProcess; this.inner = commitProcess;
this.txPropagator = txPropagator; this.txPropagator = txPropagator;
this.validator = validator; this.validator = validator;
this.monitor = monitor;
} }


@Override @Override
public long commit( TransactionToApply batch, CommitEvent commitEvent, public long commit( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode )
TransactionApplicationMode mode ) throws TransactionFailureException throws TransactionFailureException
{ {
validate( batch ); validate( batch );


long result = inner.commit( batch, commitEvent, mode ); long result = inner.commit( batch, commitEvent, mode );


// Assuming all the transactions come from the same author // Assuming all the transactions come from the same author
txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() ); int missedReplicas = txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() );

if ( missedReplicas > 0 )
{
monitor.missedReplicas( missedReplicas );
}


return result; return result;
} }
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator; import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageEngine;


public class CommitProcessSwitcher extends AbstractComponentSwitcher<TransactionCommitProcess> public class CommitProcessSwitcher extends AbstractComponentSwitcher<TransactionCommitProcess>
Expand All @@ -38,16 +39,18 @@ public class CommitProcessSwitcher extends AbstractComponentSwitcher<Transaction
private final Master master; private final Master master;
private final RequestContextFactory requestContextFactory; private final RequestContextFactory requestContextFactory;
private final DependencyResolver dependencyResolver; private final DependencyResolver dependencyResolver;
private final MasterTransactionCommitProcess.Monitor monitor;


public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master, public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master,
DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory, DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory,
DependencyResolver dependencyResolver ) Monitors monitors, DependencyResolver dependencyResolver )
{ {
super( delegate ); super( delegate );
this.txPropagator = txPropagator; this.txPropagator = txPropagator;
this.master = master; this.master = master;
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
this.monitor = monitors.newMonitor( MasterTransactionCommitProcess.Monitor.class );
} }


@Override @Override
Expand All @@ -63,9 +66,7 @@ protected TransactionCommitProcess getMasterImpl()
dependencyResolver.resolveDependency( TransactionAppender.class ), dependencyResolver.resolveDependency( TransactionAppender.class ),
dependencyResolver.resolveDependency( StorageEngine.class ) ); dependencyResolver.resolveDependency( StorageEngine.class ) );


return new MasterTransactionCommitProcess( IntegrityValidator validator = dependencyResolver.resolveDependency( IntegrityValidator.class );
commitProcess, return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor );
txPropagator,
dependencyResolver.resolveDependency( IntegrityValidator.class ) );
} }
} }
Expand Up @@ -572,7 +572,7 @@ private CommitProcessFactory createCommitProcessFactory( Dependencies dependenci
TransactionCommitProcess.class ); TransactionCommitProcess.class );


CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator, CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator,
master, commitProcessDelegate, requestContextFactory, dependencies ); master, commitProcessDelegate, requestContextFactory, monitors, dependencies );
componentSwitcherContainer.add( commitProcessSwitcher ); componentSwitcherContainer.add( commitProcessSwitcher );


return new HighlyAvailableCommitProcessFactory( commitProcessDelegate ); return new HighlyAvailableCommitProcessFactory( commitProcessDelegate );
Expand Down
Expand Up @@ -162,30 +162,36 @@ public TransactionPropagator( Configuration config, Log log, Slaves slaves, Comm
} }


@Override @Override
public void init() throws Throwable public void init()
{ {
} }


@Override @Override
public void start() throws Throwable public void start()
{ {
this.slaveCommitters = Executors.newCachedThreadPool( new NamedThreadFactory( "slave-committer" ) ); this.slaveCommitters = Executors.newCachedThreadPool( new NamedThreadFactory( "slave-committer" ) );
desiredReplicationFactor = config.getTxPushFactor(); desiredReplicationFactor = config.getTxPushFactor();
replicationStrategy = config.getReplicationStrategy(); replicationStrategy = config.getReplicationStrategy();
} }


@Override @Override
public void stop() throws Throwable public void stop()
{ {
this.slaveCommitters.shutdown(); this.slaveCommitters.shutdown();
} }


@Override @Override
public void shutdown() throws Throwable public void shutdown()
{ {
} }


public void committed( long txId, int authorId ) /**
*
* @param txId transaction id to replicate
* @param authorId author id for such transaction id
* @return the number of missed replicas (e.g., desired replication factor - number of successful replications)
*/
public int committed( long txId, int authorId )
{ {
int replicationFactor = desiredReplicationFactor; int replicationFactor = desiredReplicationFactor;
// If the author is not this instance, then we need to push to one less - the committer already has it // If the author is not this instance, then we need to push to one less - the committer already has it
Expand All @@ -197,94 +203,100 @@ public void committed( long txId, int authorId )


if ( replicationFactor == 0 ) if ( replicationFactor == 0 )
{ {
return; return replicationFactor;
} }
Collection<ReplicationContext> committers = new HashSet<>(); Collection<ReplicationContext> committers = new HashSet<>();
try
// TODO: Move this logic into {@link CommitPusher}
// Commit at the configured amount of slaves in parallel.
int successfulReplications = 0;
Iterator<Slave> slaveList = filter( replicationStrategy.prioritize( slaves.getSlaves() ).iterator(), authorId );
CompletionNotifier notifier = new CompletionNotifier();

// Start as many initial committers as needed
for ( int i = 0; i < replicationFactor && slaveList.hasNext(); i++ )
{ {
// TODO: Move this logic into {@link CommitPusher} Slave slave = slaveList.next();
// Commit at the configured amount of slaves in parallel. Callable<Void> slaveCommitter = slaveCommitter( slave, txId, notifier );
int successfulReplications = 0; committers.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), slave ) );
Iterator<Slave> slaveList = filter( replicationStrategy.prioritize( slaves.getSlaves() ).iterator(), }
authorId );
CompletionNotifier notifier = new CompletionNotifier();

// Start as many initial committers as needed
for ( int i = 0; i < replicationFactor && slaveList.hasNext(); i++ )
{
Slave slave = slaveList.next();
Callable<Void> slaveCommitter = slaveCommitter( slave, txId, notifier );
committers.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), slave ) );
}


// Wait for them and perhaps spawn new ones for failing committers until we're done // Wait for them and perhaps spawn new ones for failing committers until we're done
// or until we have no more slaves to try out. // or until we have no more slaves to try out.
Collection<ReplicationContext> toAdd = new ArrayList<>(); Collection<ReplicationContext> toAdd = new ArrayList<>();
Collection<ReplicationContext> toRemove = new ArrayList<>(); Collection<ReplicationContext> toRemove = new ArrayList<>();
while ( !committers.isEmpty() && successfulReplications < replicationFactor ) while ( !committers.isEmpty() && successfulReplications < replicationFactor )
{
toAdd.clear();
toRemove.clear();
for ( ReplicationContext context : committers )
{ {
toAdd.clear(); if ( !context.future.isDone() )
toRemove.clear(); {
for ( ReplicationContext context : committers ) continue;
}

if ( isSuccessful( context ) )
// This committer was successful, increment counter
{
successfulReplications++;
}
else if ( slaveList.hasNext() )
// This committer failed, spawn another one
{ {
if ( !context.future.isDone() ) Slave newSlave = slaveList.next();
Callable<Void> slaveCommitter;
try
{ {
continue; slaveCommitter = slaveCommitter( newSlave, txId, notifier );
} }

catch ( Throwable t )
if ( isSuccessful( context ) )
// This committer was successful, increment counter
{ {
successfulReplications++; log.error( "Unknown error commit master transaction at slave", t );
return desiredReplicationFactor /* missed them all :( */;
} }
else if ( slaveList.hasNext() ) finally
// This committer failed, spawn another one
{ {
Slave newSlave = slaveList.next(); // Cancel all ongoing committers in the executor
Callable<Void> slaveCommitter = slaveCommitter( newSlave, txId, notifier ); for ( ReplicationContext committer : committers )
toAdd.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), newSlave ) ); {
committer.future.cancel( false );
}
} }
toRemove.add( context );
}


// Incorporate the results into committers collection toAdd.add( new ReplicationContext( slaveCommitters.submit( slaveCommitter ), newSlave ) );
if ( !toAdd.isEmpty() )
{
committers.addAll( toAdd );
}
if ( !toRemove.isEmpty() )
{
committers.removeAll( toRemove );
} }
toRemove.add( context );
}


if ( !committers.isEmpty() ) // Incorporate the results into committers collection
// There are committers doing work right now, so go and wait for if ( !toAdd.isEmpty() )
// any of the committers to be done so that we can reevaluate {
// the situation again. committers.addAll( toAdd );
{ }
notifier.waitForAnyCompletion(); if ( !toRemove.isEmpty() )
} {
committers.removeAll( toRemove );
} }


// We did the best we could, have we committed successfully on enough slaves? if ( !committers.isEmpty() )
if ( !(successfulReplications >= replicationFactor) ) // There are committers doing work right now, so go and wait for
// any of the committers to be done so that we can reevaluate
// the situation again.
{ {
pushedToTooFewSlaveLogger.info( "Transaction " + txId + " couldn't commit on enough slaves, desired " + notifier.waitForAnyCompletion();
replicationFactor + ", but could only commit at " + successfulReplications );
} }
} }
catch ( Throwable t )
{ // We did the best we could, have we committed successfully on enough slaves?
log.error( "Unknown error commit master transaction at slave", t ); if ( successfulReplications < replicationFactor )
}
finally
{ {
// Cancel all ongoing committers in the executor pushedToTooFewSlaveLogger
for ( ReplicationContext context : committers ) .info( "Transaction " + txId + " couldn't commit on enough slaves, desired " + replicationFactor +
{ ", but could only commit at " + successfulReplications );
context.future.cancel( false );
}
} }

return replicationFactor - successfulReplications;
} }


private Iterator<Slave> filter( Iterator<Slave> slaves, final Integer externalAuthorServerId ) private Iterator<Slave> filter( Iterator<Slave> slaves, final Integer externalAuthorServerId )
Expand Down
Expand Up @@ -27,12 +27,14 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.helpers.TransactionTemplate; import org.neo4j.helpers.TransactionTemplate;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.impl.ha.ClusterManager.ManagedCluster; import org.neo4j.kernel.impl.ha.ClusterManager.ManagedCluster;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.SuppressOutput; import org.neo4j.test.SuppressOutput;
import org.neo4j.test.ha.ClusterRule; import org.neo4j.test.ha.ClusterRule;


Expand Down Expand Up @@ -99,7 +101,11 @@ public void twoRoundRobin() throws Exception
{ {
ManagedCluster cluster = startCluster( 4, 2, HaSettings.TxPushStrategy.round_robin ); ManagedCluster cluster = startCluster( 4, 2, HaSettings.TxPushStrategy.round_robin );


long txId = getLastTx( cluster.getMaster() ); HighlyAvailableGraphDatabase master = cluster.getMaster();
Monitors monitors = master.getDependencyResolver().resolveDependency( Monitors.class );
AtomicInteger totalMissedReplicas = new AtomicInteger();
monitors.addMonitorListener( (MasterTransactionCommitProcess.Monitor) totalMissedReplicas::addAndGet );
long txId = getLastTx( master );
int count = 15; int count = 15;
for ( int i = 0; i < count; i++ ) for ( int i = 0; i < count; i++ )
{ {
Expand All @@ -115,11 +121,13 @@ public void twoRoundRobin() throws Exception
} }


assertEquals( txId + count, max ); assertEquals( txId + count, max );
assertTrue( "There should be members with transactions in the cluster", min != -1 ); assertTrue( "There should be members with transactions in the cluster", min != -1 && max != -1 );
assertTrue( "There should be members with transactions in the cluster", max != -1 );
int minLaggingBehindThreshold = 1 /* this is the value without errors */ +
totalMissedReplicas.get() /* let's consider the missed replications */;
assertThat( "There should at most be a txId gap of 1 among the cluster members since the transaction pushing " + assertThat( "There should at most be a txId gap of 1 among the cluster members since the transaction pushing " +
"goes in a round robin fashion. min:" + min + ", max:" + max, "goes in a round robin fashion. min:" + min + ", max:" + max, (int) (max - min),
(int) (max - min), lessThanOrEqualTo( 1 ) ); lessThanOrEqualTo( minLaggingBehindThreshold ) );
} }


@Test @Test
Expand Down Expand Up @@ -194,7 +202,7 @@ private ManagedCluster startCluster( int memberCount, final int pushFactor, fina
return cluster; return cluster;
} }


private void mapMachineIds(final ManagedCluster cluster ) private void mapMachineIds( ManagedCluster cluster )
{ {
machineIds = new InstanceId[cluster.size()]; machineIds = new InstanceId[cluster.size()];
machineIds[0] = cluster.getServerId( cluster.getMaster() ); machineIds[0] = cluster.getServerId( cluster.getMaster() );
Expand Down

0 comments on commit d3a483e

Please sign in to comment.