Skip to content

Commit

Permalink
Merge pull request #7598 from apcj/election-performance-it
Browse files Browse the repository at this point in the history
Fix ElectionPerformanceIT after production code changes.
  • Loading branch information
apcj committed Jul 21, 2016
2 parents daca766 + b991f98 commit 956c4c2
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,21 @@ public class RaftInstanceBuilder
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.systemUTC(),
NullLogProvider.getInstance() );

private Inbound<RaftMessages.RaftMessage> inbound = handler -> {};
private Inbound<RaftMessages.RaftMessage> inbound = handler -> {
};
private Outbound<CoreMember, RaftMessages.RaftMessage> outbound =
new Outbound<CoreMember, RaftMessages.RaftMessage>()
{
@Override
public void send( CoreMember to, RaftMessages.RaftMessage message )
{
}

@Override
public void send( CoreMember to, Collection<RaftMessages.RaftMessage> raftMessages )
{
}
};
{
@Override
public void send( CoreMember to, RaftMessages.RaftMessage message )
{
}

@Override
public void send( CoreMember to, Collection<RaftMessages.RaftMessage> raftMessages )
{
}
};

private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clock.systemUTC();
Expand All @@ -85,7 +86,7 @@ public void send( CoreMember to, Collection<RaftMessages.RaftMessage> raftMessag
new InMemoryStateStorage<>( new RaftMembershipState() );
private Monitors monitors = new Monitors();
private RaftStateMachine raftStateMachine = new EmptyStateMachine();
private final InFlightMap<Long,RaftLogEntry> inFlightMap;
private final InFlightMap<Long, RaftLogEntry> inFlightMap;

public RaftInstanceBuilder( CoreMember member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder )
{
Expand Down Expand Up @@ -117,7 +118,7 @@ public RaftInstance build()
}
else
{
raftStateMachine.notifyCommitted( outcome.getCommitIndex());
raftStateMachine.notifyCommitted( outcome.getCommitIndex() );
}
}
catch ( IOException e )
Expand Down Expand Up @@ -146,7 +147,7 @@ public RaftInstanceBuilder timeoutService( RenewableTimeoutService renewableTime
return this;
}

public RaftInstanceBuilder outbound( Outbound<CoreMember,RaftMessages.RaftMessage> outbound )
public RaftInstanceBuilder outbound( Outbound<CoreMember, RaftMessages.RaftMessage> outbound )
{
this.outbound = outbound;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@
import org.junit.Test;

import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.coreedge.raft.RaftStateMachine;
import org.neo4j.coreedge.raft.RaftTestNetwork;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.function.Predicates;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;
Expand All @@ -50,42 +44,9 @@
* ability to perform an election at all should be caught by this test. Very
* rare false positives should not be used as an indication for increasing the
* limits.
*
* Notice the class name: this is _not_ going to be run as part of the main build.
*/
public class ElectionPerformanceTesting
public class ElectionPerformanceIT
{
/**
* This class simply waits for a single entry to have been committed for each member,
* which should be the initial member set entry, making it possible for every member
* to perform elections. We need this before we start disconnecting members.
*/
private class BootstrapWaiter implements RaftStateMachine
{
private AtomicLong count = new AtomicLong();

@Override
public void notifyCommitted( long commitIndex )
{
count.incrementAndGet();
}

@Override
public void notifyNeedFreshSnapshot()
{
}

@Override
public void downloadSnapshot( CoreMember from )
{
}

private void await( long awaitedCount ) throws InterruptedException, TimeoutException
{
Predicates.await( () -> count.get() >= awaitedCount, 30, SECONDS, 100, MILLISECONDS );
}
}

@Test
public void electionPerformance_NormalConditions() throws Throwable
{
Expand All @@ -102,20 +63,18 @@ public void electionPerformance_NormalConditions() throws Throwable

RaftTestNetwork net = new RaftTestNetwork<>( ( i, o ) -> networkLatency );
Set<CoreMember> members = asSet( member( 0 ), member( 1 ), member( 2 ) );
BootstrapWaiter bootstrapWaiter = new BootstrapWaiter();
Fixture fixture = new Fixture( members, net, electionTimeout, heartbeatInterval, bootstrapWaiter );
Fixture fixture = new Fixture( members, net, electionTimeout, heartbeatInterval );
DisconnectLeaderScenario scenario = new DisconnectLeaderScenario( fixture, electionTimeout );

try
{
// when running scenario
fixture.boot();
bootstrapWaiter.await( members.size() );
scenario.run( iterations, 10 * electionTimeout );
}
finally
{
fixture.teardown();
fixture.tearDown();
}

DisconnectLeaderScenario.Result result = scenario.result();
Expand Down Expand Up @@ -145,20 +104,18 @@ public void electionPerformance_RapidConditions() throws Throwable

RaftTestNetwork net = new RaftTestNetwork<>( ( i, o ) -> networkLatency );
Set<CoreMember> members = asSet( member( 0 ), member( 1 ), member( 2 ) );
BootstrapWaiter bootstrapWaiter = new BootstrapWaiter();
Fixture fixture = new Fixture( members, net, electionTimeout, heartbeatInterval, bootstrapWaiter );
Fixture fixture = new Fixture( members, net, electionTimeout, heartbeatInterval );
DisconnectLeaderScenario scenario = new DisconnectLeaderScenario( fixture, electionTimeout );

try
{
// when running scenario
fixture.boot();
bootstrapWaiter.await( members.size() );
scenario.run( iterations, 10 * electionTimeout );
}
finally
{
fixture.teardown();
fixture.tearDown();
}

DisconnectLeaderScenario.Result result = scenario.result();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.RaftInstance;
Expand All @@ -35,18 +37,22 @@
import org.neo4j.coreedge.raft.membership.RaftTestGroup;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
import org.neo4j.function.Predicates;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class Fixture
{
private final Set members = new HashSet<>();
private final Set<CoreMember> members = new HashSet<>();
private final Set<BootstrapWaiter> bootstrapWaiters = new HashSet<>();
private final List<DelayedRenewableTimeoutService> timeoutServices = new ArrayList<>();
final Set<RaftInstance> rafts = new HashSet<>();
final RaftTestNetwork net;
private final List<DelayedRenewableTimeoutService> timeoutServices = new ArrayList<>();

Fixture( Set<CoreMember> memberIds, RaftTestNetwork net, long electionTimeout, long heartbeatInterval,
RaftStateMachine stateMachine ) throws Throwable
Fixture( Set<CoreMember> memberIds, RaftTestNetwork net, long electionTimeout, long heartbeatInterval )
{
this.net = net;

Expand All @@ -59,6 +65,9 @@ public class Fixture

DelayedRenewableTimeoutService timeoutService = createTimeoutService();

BootstrapWaiter waiter = new BootstrapWaiter();
bootstrapWaiters.add( waiter );

RaftInstance raftInstance =
new RaftInstanceBuilder( member, memberIds.size(), RaftTestMemberSetBuilder.INSTANCE )
.electionTimeout( electionTimeout )
Expand All @@ -67,14 +76,14 @@ public class Fixture
.outbound( outbound )
.timeoutService( timeoutService )
.raftLog( new InMemoryRaftLog() )
.stateMachine( stateMachine )
.stateMachine( waiter )
.build();

rafts.add( raftInstance );
}
}

private DelayedRenewableTimeoutService createTimeoutService() throws Throwable
private DelayedRenewableTimeoutService createTimeoutService()
{
DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService(
Clock.systemUTC(), NullLogProvider.getInstance() );
Expand All @@ -87,13 +96,14 @@ private DelayedRenewableTimeoutService createTimeoutService() throws Throwable
return timeoutService;
}

void boot() throws BootstrapException
void boot() throws BootstrapException, TimeoutException, InterruptedException
{
net.start();
Iterables.first( rafts ).bootstrapWithInitialMembers( new RaftTestGroup( members ) );
awaitBootstrapped();
}

public void teardown() throws InterruptedException
public void tearDown()
{
net.stop();
for ( DelayedRenewableTimeoutService timeoutService : timeoutServices )
Expand All @@ -112,4 +122,50 @@ public void teardown() throws InterruptedException
raft.logShippingManager().stop();
}
}

/**
* This class simply waits for a single entry to have been committed,
* which should be the initial member set entry.
*
* If all members of the cluster have committed such an entry, it's possible for any member
* to perform elections. We need to meet this condition before we start disconnecting members.
*/
private static class BootstrapWaiter implements RaftStateMachine
{
private AtomicBoolean bootstrapped = new AtomicBoolean( false );

@Override
public void notifyCommitted( long commitIndex )
{
if ( commitIndex >= 0 )
{
bootstrapped.set( true );
}
}

@Override
public void notifyNeedFreshSnapshot()
{
}

@Override
public void downloadSnapshot( CoreMember from )
{
}

}

private void awaitBootstrapped() throws InterruptedException, TimeoutException
{
Predicates.await( () -> {
for ( BootstrapWaiter bootstrapWaiter : bootstrapWaiters )
{
if ( !bootstrapWaiter.bootstrapped.get() )
{
return false;
}
}
return true;
}, 30, SECONDS, 100, MILLISECONDS );
}
}

0 comments on commit 956c4c2

Please sign in to comment.