Skip to content

Commit

Permalink
Improve Predicates.await() and simplify code in Cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Oct 13, 2016
1 parent 12efc19 commit 9a91cd4
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 98 deletions.
61 changes: 29 additions & 32 deletions community/common/src/main/java/org/neo4j/function/Predicates.java
Expand Up @@ -24,6 +24,7 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;
import java.util.function.Predicate; import java.util.function.Predicate;
Expand Down Expand Up @@ -136,36 +137,41 @@ public boolean test( T item )
}; };
} }


public static <TYPE> void await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout, TimeUnit timeoutUnit, public static <TYPE> TYPE await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout,
long pollInterval, TimeUnit pollUnit ) TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException
throws TimeoutException, InterruptedException
{ {
await( Suppliers.compose( supplier, predicate ), timeout, timeoutUnit, pollInterval, pollUnit ); Suppliers.CapturingSupplier<TYPE> composed = Suppliers.compose( supplier, predicate );
await( composed, timeout, timeoutUnit, pollInterval, pollUnit );
return composed.lastInput();
} }


public static <TYPE> void await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout, TimeUnit timeoutUnit ) public static <TYPE> TYPE await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout,
throws TimeoutException, InterruptedException TimeUnit timeoutUnit ) throws TimeoutException
{ {
await( Suppliers.compose( supplier, predicate ), timeout, timeoutUnit ); Suppliers.CapturingSupplier<TYPE> composed = Suppliers.compose( supplier, predicate );
await( composed, timeout, timeoutUnit );
return composed.lastInput();
} }


public static void await( Supplier<Boolean> condition, long timeout, TimeUnit unit ) public static void await( Supplier<Boolean> condition, long timeout, TimeUnit unit ) throws TimeoutException
throws TimeoutException, InterruptedException
{ {
awaitEx( condition::get, timeout, unit ); awaitEx( condition::get, timeout, unit );
} }


public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition, public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean,EXCEPTION> condition,
long timeout, TimeUnit unit ) long timeout, TimeUnit unit ) throws TimeoutException, EXCEPTION
throws TimeoutException, InterruptedException, EXCEPTION
{ {
awaitEx( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS ); awaitEx( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS );
} }


public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition, public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval,
long timeout, TimeUnit unit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException
TimeUnit pollUnit ) {
throws TimeoutException, InterruptedException, EXCEPTION awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit );
}

public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean,EXCEPTION> condition,
long timeout, TimeUnit unit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException, EXCEPTION
{ {
if ( !tryAwaitEx( condition, timeout, unit, pollInterval, pollUnit ) ) if ( !tryAwaitEx( condition, timeout, unit, pollInterval, pollUnit ) )
{ {
Expand All @@ -174,49 +180,40 @@ public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boole
} }
} }


public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval,
TimeUnit pollUnit ) throws TimeoutException, InterruptedException
{
awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit );
}

public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval,
TimeUnit pollUnit ) throws InterruptedException TimeUnit pollUnit )
{ {
return tryAwaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); return tryAwaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit );
} }


public static <EXCEPTION extends Exception> boolean tryAwaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition, public static <EXCEPTION extends Exception> boolean tryAwaitEx( ThrowingSupplier<Boolean,EXCEPTION> condition,
long timeout, TimeUnit timeoutUnit, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) throws EXCEPTION
long pollInterval, TimeUnit pollUnit )
throws InterruptedException, EXCEPTION
{ {
long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout ); long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout );
long pollIntervalMillis = pollUnit.toMillis( pollInterval ); long pollIntervalNanos = pollUnit.toNanos( pollInterval );


do do
{ {
if ( condition.get() ) if ( condition.get() )
{ {
return true; return true;
} }
Thread.sleep( pollIntervalMillis ); LockSupport.parkNanos( pollIntervalNanos );
} }
while ( System.currentTimeMillis() < deadlineMillis ); while ( System.currentTimeMillis() < deadlineMillis );
return false; return false;
} }


public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit )
throws InterruptedException
{ {
long sleep = unit.toMillis( checkInterval ); long sleep = unit.toNanos( checkInterval );
do do
{ {
if ( condition.getAsBoolean() ) if ( condition.getAsBoolean() )
{ {
return; return;
} }
Thread.sleep( sleep ); LockSupport.parkNanos( sleep );
} }
while ( true ); while ( true );
} }
Expand Down
30 changes: 28 additions & 2 deletions community/common/src/main/java/org/neo4j/function/Suppliers.java
Expand Up @@ -118,14 +118,40 @@ public T get()
}; };
} }


public static <T> Supplier<Boolean> compose( final Supplier<T> input, final Predicate<T> predicate ) public static <T> CapturingSupplier<T> compose( final Supplier<T> input, final Predicate<T> predicate )
{ {
return () -> predicate.test( input.get() ); return new CapturingSupplier<T>( input, predicate );
} }


public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit )
{ {
final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration );
return () -> currentTimeMillis() <= endTimeInMilliseconds; return () -> currentTimeMillis() <= endTimeInMilliseconds;
} }

static class CapturingSupplier<T> implements Supplier<Boolean>
{
private final Supplier<T> input;
private final Predicate<T> predicate;

private T current;

CapturingSupplier( Supplier<T> input, Predicate<T> predicate )
{
this.input = input;
this.predicate = predicate;
}

T lastInput()
{
return current;
}

@Override
public Boolean get()
{
current = input.get();
return predicate.test( current );
}
}
} }
Expand Up @@ -74,20 +74,12 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws
return; return;
} }


try if ( !tryAwait( () -> oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId(),
timeout.toMillis(), TimeUnit.MILLISECONDS, POLL_INTERVAL, POLL_UNIT ) )
{ {
if ( !tryAwait( () -> oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId(), throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
timeout.toMillis(), TimeUnit.MILLISECONDS, POLL_INTERVAL, POLL_UNIT ) ) "Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
{ transactionIdStore.getLastClosedTransactionId() );
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore.getLastClosedTransactionId() );
}
}
catch ( InterruptedException e )
{
throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e,
"Thread interrupted when starting transaction" );
} }
} }


Expand Down
Expand Up @@ -120,11 +120,6 @@ private void waitUntilOnline( IndexDescriptor index, IndexSpecifier indexDescrip
throw new ProcedureException( Status.Procedure.ProcedureTimedOut, throw new ProcedureException( Status.Procedure.ProcedureTimedOut,
"Index on %s did not come online within %s %s", indexDescription, timeout, timeoutUnits ); "Index on %s did not come online within %s %s", indexDescription, timeout, timeoutUnits );
} }
catch ( InterruptedException e )
{
throw new ProcedureException( Status.General.DatabaseUnavailable,
"Interrupted waiting for index on %s to come online", indexDescription );
}
} }


private boolean isOnline( IndexSpecifier indexDescription, IndexDescriptor index ) throws ProcedureException private boolean isOnline( IndexSpecifier indexDescription, IndexDescriptor index ) throws ProcedureException
Expand Down
Expand Up @@ -173,10 +173,6 @@ private void awaitCompletion()
{ {
handleTimeout(); handleTimeout();
} }
catch ( InterruptedException e )
{
handleInterrupt();
}
} }


/** /**
Expand Down
Expand Up @@ -144,12 +144,6 @@ public CountsSnapshot snapshot( long txId )
awaitForever( () -> lastTxId.getHighestGapFreeNumber() >= snapshot.getTxId(), 100, MILLISECONDS ); awaitForever( () -> lastTxId.getHighestGapFreeNumber() >= snapshot.getTxId(), 100, MILLISECONDS );
return snapshot; return snapshot;
} }
catch ( InterruptedException ex )
{
Thread.currentThread().interrupt();
throw Exceptions
.withCause( new UnderlyingStorageException( "Construction of snapshot was interrupted." ), ex );
}
finally finally
{ {
snapshot = null; snapshot = null;
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage; import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage;
import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.messaging.Inbound.MessageHandler; import org.neo4j.coreedge.messaging.Inbound.MessageHandler;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -73,15 +72,8 @@ public void handle( RaftMessages.ClusterIdAwareMessage message )
return; return;
} }


try // keep trying to add the message into the queue, give up only if this component has been stopped
{ awaitForever( () -> stopped || messageQueue.offer( message ), 100, TimeUnit.MILLISECONDS );
// keep trying to add the message into the queue, give up only if this component has been stopped
awaitForever( () -> stopped || messageQueue.offer( message ), 100, TimeUnit.MILLISECONDS );
}
catch ( InterruptedException e )
{
log.warn( "Not expecting to be interrupted.", e );
}
} }


@Override @Override
Expand Down
Expand Up @@ -47,7 +47,6 @@
import org.neo4j.coreedge.core.state.machines.id.IdGenerationException; import org.neo4j.coreedge.core.state.machines.id.IdGenerationException;
import org.neo4j.coreedge.core.state.machines.locks.LeaderOnlyLockManager; import org.neo4j.coreedge.core.state.machines.locks.LeaderOnlyLockManager;
import org.neo4j.coreedge.edge.EdgeGraphDatabase; import org.neo4j.coreedge.edge.EdgeGraphDatabase;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException; import org.neo4j.graphdb.TransactionFailureException;
Expand All @@ -60,15 +59,17 @@
import org.neo4j.test.DbRepresentation; import org.neo4j.test.DbRepresentation;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toList;
import static org.neo4j.concurrent.Futures.combine; import static org.neo4j.concurrent.Futures.combine;
import static org.neo4j.function.Predicates.await;
import static org.neo4j.function.Predicates.notNull;
import static org.neo4j.helpers.collection.Iterables.firstOrNull; import static org.neo4j.helpers.collection.Iterables.firstOrNull;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired; import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired;


public class Cluster public class Cluster
{ {
private static final int DEFAULT_TIMEOUT_MS = 15_000; private static final int DEFAULT_TIMEOUT_MS = 25_000;
private static final int DEFAULT_BACKOFF_MS = 100; private static final int DEFAULT_BACKOFF_MS = 100;
private static final int DEFAULT_CLUSTER_SIZE = 3; private static final int DEFAULT_CLUSTER_SIZE = 3;


Expand Down Expand Up @@ -275,19 +276,7 @@ public CoreClusterMember awaitLeader( long timeoutMillis ) throws TimeoutExcepti


public CoreClusterMember awaitCoreMemberWithRole( long timeoutMillis, Role role ) throws TimeoutException public CoreClusterMember awaitCoreMemberWithRole( long timeoutMillis, Role role ) throws TimeoutException
{ {
long endTimeMillis = timeoutMillis + System.currentTimeMillis(); return await( () -> getDbWithRole( role ), notNull(), timeoutMillis, TimeUnit.MILLISECONDS );

CoreClusterMember db;
while ( (db = getDbWithRole( role )) == null && (System.currentTimeMillis() < endTimeMillis) )
{
LockSupport.parkNanos( MILLISECONDS.toNanos( 100 ) );
}

if ( db == null )
{
throw new TimeoutException();
}
return db;
} }


public int numberOfCoreMembersReportedByTopology() public int numberOfCoreMembersReportedByTopology()
Expand Down Expand Up @@ -394,12 +383,7 @@ private boolean isLockExpired( Throwable e )


public static List<AdvertisedSocketAddress> buildAddresses( Set<Integer> coreServerIds ) public static List<AdvertisedSocketAddress> buildAddresses( Set<Integer> coreServerIds )
{ {
List<AdvertisedSocketAddress> addresses = new ArrayList<>(); return coreServerIds.stream().map( Cluster::socketAddressForServer ).collect( toList() );
for ( Integer i : coreServerIds )
{
addresses.add( socketAddressForServer( i ) );
}
return addresses;
} }


public static AdvertisedSocketAddress socketAddressForServer( int id ) public static AdvertisedSocketAddress socketAddressForServer( int id )
Expand Down Expand Up @@ -497,7 +481,7 @@ public static void dataOnMemberEventuallyLooksLike( CoreClusterMember memberThat
CoreClusterMember memberToLookLike ) CoreClusterMember memberToLookLike )
throws TimeoutException, InterruptedException throws TimeoutException, InterruptedException
{ {
Predicates.await( () -> { await( () -> {
try try
{ {
// We recalculate the DbRepresentation of both source and target, so changes can be picked up // We recalculate the DbRepresentation of both source and target, so changes can be picked up
Expand All @@ -524,11 +508,10 @@ public static void dataMatchesEventually( DbRepresentation sourceRepresentation,
{ {
for ( CoreClusterMember targetDB : targetDBs ) for ( CoreClusterMember targetDB : targetDBs )
{ {
Predicates.await( () -> { await( () -> {
DbRepresentation representation = DbRepresentation.of( targetDB.database() ); DbRepresentation representation = DbRepresentation.of( targetDB.database() );
return sourceRepresentation.equals( representation ); return sourceRepresentation.equals( representation );
}, }, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
} }
} }


Expand Down

0 comments on commit 9a91cd4

Please sign in to comment.