Skip to content

Commit

Permalink
Longer timeout for awaiting leader in CoreReplicationIT
Browse files Browse the repository at this point in the history
Add a longer timeout for waiting for the new leader after having
shutdown the old one. This should make the test more reliable in CI.

Cleanup test code.
Improve `await`s and `awaitEx`s methods in Predicates.
  • Loading branch information
davidegrohmann committed Oct 18, 2016
1 parent 9914faa commit 091abe9
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 52 deletions.
24 changes: 19 additions & 5 deletions community/common/src/main/java/org/neo4j/function/Predicates.java
Expand Up @@ -140,16 +140,30 @@ public boolean test( T item )
public static <TYPE> TYPE await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout,
TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException
{
Suppliers.CapturingSupplier<TYPE> composed = Suppliers.compose( supplier, predicate );
await( composed, timeout, timeoutUnit, pollInterval, pollUnit );
return composed.lastInput();
return awaitEx( supplier::get, predicate::test, timeout, timeoutUnit, pollInterval, pollUnit );
}

public static <TYPE> TYPE await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout,
TimeUnit timeoutUnit ) throws TimeoutException
{
Suppliers.CapturingSupplier<TYPE> composed = Suppliers.compose( supplier, predicate );
await( composed, timeout, timeoutUnit );
return awaitEx( supplier::get, predicate::test, timeout, timeoutUnit );
}

public static <TYPE, EXCEPTION extends Exception> TYPE awaitEx( ThrowingSupplier<TYPE,EXCEPTION> supplier,
ThrowingPredicate<TYPE,EXCEPTION> predicate, long timeout, TimeUnit timeoutUnit, long pollInterval,
TimeUnit pollUnit ) throws TimeoutException, EXCEPTION
{
Suppliers.ThrowingCapturingSupplier<TYPE,EXCEPTION> composed = Suppliers.compose( supplier, predicate );
awaitEx( composed, timeout, timeoutUnit, pollInterval, pollUnit );
return composed.lastInput();
}

public static <TYPE, EXCEPTION extends Exception> TYPE awaitEx( ThrowingSupplier<TYPE,EXCEPTION> supplier,
ThrowingPredicate<TYPE,EXCEPTION> predicate, long timeout, TimeUnit timeoutUnit )
throws TimeoutException, EXCEPTION
{
Suppliers.ThrowingCapturingSupplier<TYPE,EXCEPTION> composed = Suppliers.compose( supplier, predicate );
awaitEx( composed, timeout, timeoutUnit );
return composed.lastInput();
}

Expand Down
15 changes: 8 additions & 7 deletions community/common/src/main/java/org/neo4j/function/Suppliers.java
Expand Up @@ -118,9 +118,10 @@ public T get()
};
}

public static <T> CapturingSupplier<T> compose( final Supplier<T> input, final Predicate<T> predicate )
public static <T, E extends Exception> ThrowingCapturingSupplier<T,E> compose( final ThrowingSupplier<T,E> input,
final ThrowingPredicate<T,E> predicate )
{
return new CapturingSupplier<T>( input, predicate );
return new ThrowingCapturingSupplier<>( input, predicate );
}

public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit )
Expand All @@ -129,14 +130,14 @@ public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit )
return () -> currentTimeMillis() <= endTimeInMilliseconds;
}

static class CapturingSupplier<T> implements Supplier<Boolean>
static class ThrowingCapturingSupplier<T, E extends Exception> implements ThrowingSupplier<Boolean,E>
{
private final Supplier<T> input;
private final Predicate<T> predicate;
private final ThrowingSupplier<T,E> input;
private final ThrowingPredicate<T,E> predicate;

private T current;

CapturingSupplier( Supplier<T> input, Predicate<T> predicate )
ThrowingCapturingSupplier( ThrowingSupplier<T,E> input, ThrowingPredicate<T,E> predicate )
{
this.input = input;
this.predicate = predicate;
Expand All @@ -148,7 +149,7 @@ T lastInput()
}

@Override
public Boolean get()
public Boolean get() throws E
{
current = input.get();
return predicate.test( current );
Expand Down
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
Expand All @@ -47,6 +46,7 @@
import org.neo4j.coreedge.core.state.machines.id.IdGenerationException;
import org.neo4j.coreedge.core.state.machines.locks.LeaderOnlyLockManager;
import org.neo4j.coreedge.edge.EdgeGraphDatabase;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
Expand All @@ -62,6 +62,7 @@
import static java.util.stream.Collectors.toList;
import static org.neo4j.concurrent.Futures.combine;
import static org.neo4j.function.Predicates.await;
import static org.neo4j.function.Predicates.awaitEx;
import static org.neo4j.function.Predicates.notNull;
import static org.neo4j.helpers.collection.Iterables.firstOrNull;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand All @@ -70,7 +71,6 @@
public class Cluster
{
private static final int DEFAULT_TIMEOUT_MS = 25_000;
private static final int DEFAULT_BACKOFF_MS = 100;
private static final int DEFAULT_CLUSTER_SIZE = 3;

private final File parentDir;
Expand Down Expand Up @@ -266,17 +266,17 @@ public CoreClusterMember getDbWithRole( Role role )

public CoreClusterMember awaitLeader() throws TimeoutException
{
return awaitCoreMemberWithRole( DEFAULT_TIMEOUT_MS, Role.LEADER );
return awaitCoreMemberWithRole( Role.LEADER, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
}

public CoreClusterMember awaitLeader( long timeoutMillis ) throws TimeoutException
public CoreClusterMember awaitLeader( long timeout, TimeUnit timeUnit ) throws TimeoutException
{
return awaitCoreMemberWithRole( timeoutMillis, Role.LEADER );
return awaitCoreMemberWithRole( Role.LEADER, timeout, timeUnit );
}

public CoreClusterMember awaitCoreMemberWithRole( long timeoutMillis, Role role ) throws TimeoutException
public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit ) throws TimeoutException
{
return await( () -> getDbWithRole( role ), notNull(), timeoutMillis, TimeUnit.MILLISECONDS );
return await( () -> getDbWithRole( role ), notNull(), timeout, timeUnit );
}

public int numberOfCoreMembersReportedByTopology()
Expand All @@ -295,7 +295,7 @@ public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase, Transaction> op )
throws TimeoutException, InterruptedException
{
// this currently wraps the leader-only strategy, since it is the recommended and only approach
return leaderTx( op );
return leaderTx( op, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
}

private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams, Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
Expand All @@ -311,23 +311,23 @@ private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String>
/**
* Perform a transaction against the leader of the core cluster, retrying as necessary.
*/
private CoreClusterMember leaderTx( BiConsumer<CoreGraphDatabase, Transaction> op )
throws TimeoutException, InterruptedException
private CoreClusterMember leaderTx( BiConsumer<CoreGraphDatabase,Transaction> op, int timeout, TimeUnit timeUnit )
throws TimeoutException
{
long endTime = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS;

do
{
CoreClusterMember member = awaitCoreMemberWithRole( DEFAULT_TIMEOUT_MS, Role.LEADER );
CoreGraphDatabase db = member.database();
if ( db == null )
ThrowingSupplier<CoreClusterMember,RuntimeException> supplier = () -> {
try
{
throw new DatabaseShutdownException();
}
CoreClusterMember member = awaitLeader( timeout, timeUnit );
CoreGraphDatabase db = member.database();
if ( db == null )
{
throw new DatabaseShutdownException();
}

try ( Transaction tx = db.beginTx() )
{
op.accept( db, tx );
try ( Transaction tx = db.beginTx() )
{
op.accept( db, tx );
}
return member;
}
catch ( Throwable e )
Expand All @@ -337,18 +337,15 @@ private CoreClusterMember leaderTx( BiConsumer<CoreGraphDatabase, Transaction> o
// this is not the best, but it helps in debugging
System.err.println( "Transient failure in leader transaction, trying again." );
e.printStackTrace();
// sleep and retry
Thread.sleep( DEFAULT_BACKOFF_MS );
return null;
}
else
{
throw e;
throw new RuntimeException( e );
}
}
}
while ( System.currentTimeMillis() < endTime );

throw new TimeoutException( "Transaction did not succeed in time" );
};
return awaitEx( supplier, notNull()::test, timeout, timeUnit );
}

private boolean isTransientFailure( Throwable e )
Expand Down
Expand Up @@ -193,6 +193,7 @@ public void shouldReplicateTransactionAfterLeaderWasRemovedFromCluster() throws

// when
cluster.removeCoreMember( cluster.awaitLeader() );
cluster.awaitLeader( 1, TimeUnit.MINUTES ); // <- let's give a bit more time for the leader to show up
CoreClusterMember last = cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode();
Expand Down
Expand Up @@ -19,6 +19,9 @@
*/
package org.neo4j.coreedge.scenarios;

import org.junit.Rule;
import org.junit.Test;

import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -35,9 +38,6 @@
import org.neo4j.test.coreedge.ClusterRule;
import org.neo4j.time.Clocks;

import org.junit.Rule;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_frequency;
Expand All @@ -50,8 +50,6 @@

public class CoreToCoreCopySnapshotIT
{
private static final int TIMEOUT_MS = 5000;

@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withNumberOfCoreMembers( 3 )
Expand All @@ -69,12 +67,11 @@ public void shouldBeAbleToDownloadLargerFreshSnapshot() throws Exception
} );

// when
CoreClusterMember follower = cluster.awaitCoreMemberWithRole( TIMEOUT_MS, Role.FOLLOWER );
CoreClusterMember follower = cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 5, TimeUnit.SECONDS );

// shutdown the follower, remove the store, restart
follower.shutdown();
FileUtils.deleteRecursively( follower.storeDir() );
follower.storeDir().mkdir();
follower.start();

// then
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI

executeOnLeaderWithRetry( this::createData, cluster );

CoreClusterMember follower = cluster.awaitCoreMemberWithRole( 2000, Role.FOLLOWER );
CoreClusterMember follower = cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 2, TimeUnit.SECONDS );
// Shutdown server before copying its data, because Windows can't copy open files.
follower.shutdown();

Expand Down Expand Up @@ -493,7 +493,7 @@ private void executeOnLeaderWithRetry( Workload workload, Cluster cluster ) thro
{
try
{
CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ).database();
CoreGraphDatabase coreDB = cluster.awaitLeader( 5L, SECONDS ).database();
try ( Transaction tx = coreDB.beginTx() )
{
workload.doWork( coreDB );
Expand Down
Expand Up @@ -24,6 +24,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.consistency.ConsistencyCheckService;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void edgeTest() throws Exception
Cluster cluster = clusterRule.withNumberOfCoreMembers( 2 ).withNumberOfEdgeMembers( 1 ).startCluster();

// when
final GraphDatabaseService coreDB = cluster.awaitLeader( 5000 ).database();
final GraphDatabaseService coreDB = cluster.awaitLeader( 5, TimeUnit.SECONDS ).database();

try ( Transaction tx = coreDB.beginTx() )
{
Expand Down
Expand Up @@ -90,7 +90,7 @@ public void shouldMonitorCoreEdge() throws Exception
cluster = clusterRule.startCluster();

// when
CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ).database();
CoreGraphDatabase coreDB = cluster.awaitLeader( 5, TimeUnit.SECONDS ).database();

try ( Transaction tx = coreDB.beginTx() )
{
Expand Down

0 comments on commit 091abe9

Please sign in to comment.