From 9a91cd4eebbdb91644b1c1362ec8093dea561295 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Thu, 13 Oct 2016 12:28:40 +0200 Subject: [PATCH] Improve Predicates.await() and simplify code in Cluster --- .../java/org/neo4j/function/Predicates.java | 61 +++++++++---------- .../java/org/neo4j/function/Suppliers.java | 30 ++++++++- .../api/txtracking/TransactionIdTracker.java | 18 ++---- .../kernel/builtinprocs/IndexProcedures.java | 5 -- .../index/BatchingMultipleIndexPopulator.java | 4 -- .../store/countStore/InMemoryCountsStore.java | 6 -- .../core/server/BatchingMessageHandler.java | 12 +--- .../org/neo4j/coreedge/discovery/Cluster.java | 35 +++-------- 8 files changed, 73 insertions(+), 98 deletions(-) diff --git a/community/common/src/main/java/org/neo4j/function/Predicates.java b/community/common/src/main/java/org/neo4j/function/Predicates.java index 6104475c51925..ec0dd0ef50efd 100644 --- a/community/common/src/main/java/org/neo4j/function/Predicates.java +++ b/community/common/src/main/java/org/neo4j/function/Predicates.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; import java.util.function.IntPredicate; import java.util.function.Predicate; @@ -136,36 +137,41 @@ public boolean test( T item ) }; } - public static void await( Supplier supplier, Predicate predicate, long timeout, TimeUnit timeoutUnit, - long pollInterval, TimeUnit pollUnit ) - throws TimeoutException, InterruptedException + public static TYPE await( Supplier supplier, Predicate predicate, long timeout, + TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException { - await( Suppliers.compose( supplier, predicate ), timeout, timeoutUnit, pollInterval, pollUnit ); + Suppliers.CapturingSupplier composed = Suppliers.compose( supplier, predicate ); + await( composed, timeout, timeoutUnit, pollInterval, pollUnit ); + return composed.lastInput(); } - public static void await( Supplier supplier, Predicate predicate, long timeout, TimeUnit timeoutUnit ) - throws TimeoutException, InterruptedException + public static TYPE await( Supplier supplier, Predicate predicate, long timeout, + TimeUnit timeoutUnit ) throws TimeoutException { - await( Suppliers.compose( supplier, predicate ), timeout, timeoutUnit ); + Suppliers.CapturingSupplier composed = Suppliers.compose( supplier, predicate ); + await( composed, timeout, timeoutUnit ); + return composed.lastInput(); } - public static void await( Supplier condition, long timeout, TimeUnit unit ) - throws TimeoutException, InterruptedException + public static void await( Supplier condition, long timeout, TimeUnit unit ) throws TimeoutException { awaitEx( condition::get, timeout, unit ); } - public static void awaitEx( ThrowingSupplier condition, - long timeout, TimeUnit unit ) - throws TimeoutException, InterruptedException, EXCEPTION + public static void awaitEx( ThrowingSupplier condition, + long timeout, TimeUnit unit ) throws TimeoutException, EXCEPTION { awaitEx( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS ); } - public static void awaitEx( ThrowingSupplier condition, - long timeout, TimeUnit unit, long pollInterval, - TimeUnit pollUnit ) - throws TimeoutException, InterruptedException, EXCEPTION + public static void await( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, + TimeUnit pollUnit ) throws TimeoutException + { + awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); + } + + public static void awaitEx( ThrowingSupplier condition, + long timeout, TimeUnit unit, long pollInterval, TimeUnit pollUnit ) throws TimeoutException, EXCEPTION { if ( !tryAwaitEx( condition, timeout, unit, pollInterval, pollUnit ) ) { @@ -174,25 +180,17 @@ public static void awaitEx( ThrowingSupplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, - TimeUnit pollUnit ) throws TimeoutException, InterruptedException - { - awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); - } - public static boolean tryAwait( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, - TimeUnit pollUnit ) throws InterruptedException + TimeUnit pollUnit ) { return tryAwaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); } - public static boolean tryAwaitEx( ThrowingSupplier condition, - long timeout, TimeUnit timeoutUnit, - long pollInterval, TimeUnit pollUnit ) - throws InterruptedException, EXCEPTION + public static boolean tryAwaitEx( ThrowingSupplier condition, + long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) throws EXCEPTION { long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout ); - long pollIntervalMillis = pollUnit.toMillis( pollInterval ); + long pollIntervalNanos = pollUnit.toNanos( pollInterval ); do { @@ -200,23 +198,22 @@ public static boolean tryAwaitEx( ThrowingSupplier { return true; } - Thread.sleep( pollIntervalMillis ); + LockSupport.parkNanos( pollIntervalNanos ); } while ( System.currentTimeMillis() < deadlineMillis ); return false; } public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) - throws InterruptedException { - long sleep = unit.toMillis( checkInterval ); + long sleep = unit.toNanos( checkInterval ); do { if ( condition.getAsBoolean() ) { return; } - Thread.sleep( sleep ); + LockSupport.parkNanos( sleep ); } while ( true ); } diff --git a/community/common/src/main/java/org/neo4j/function/Suppliers.java b/community/common/src/main/java/org/neo4j/function/Suppliers.java index e6fc3d8fc8dad..cf51670a2089c 100644 --- a/community/common/src/main/java/org/neo4j/function/Suppliers.java +++ b/community/common/src/main/java/org/neo4j/function/Suppliers.java @@ -118,9 +118,9 @@ public T get() }; } - public static Supplier compose( final Supplier input, final Predicate predicate ) + public static CapturingSupplier compose( final Supplier input, final Predicate predicate ) { - return () -> predicate.test( input.get() ); + return new CapturingSupplier( input, predicate ); } public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) @@ -128,4 +128,30 @@ public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); return () -> currentTimeMillis() <= endTimeInMilliseconds; } + + static class CapturingSupplier implements Supplier + { + private final Supplier input; + private final Predicate predicate; + + private T current; + + CapturingSupplier( Supplier input, Predicate predicate ) + { + this.input = input; + this.predicate = predicate; + } + + T lastInput() + { + return current; + } + + @Override + public Boolean get() + { + current = input.get(); + return predicate.test( current ); + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java index 304bc45c3512f..e18220289677f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txtracking/TransactionIdTracker.java @@ -74,20 +74,12 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws return; } - try + if ( !tryAwait( () -> oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId(), + timeout.toMillis(), TimeUnit.MILLISECONDS, POLL_INTERVAL, POLL_UNIT ) ) { - if ( !tryAwait( () -> oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId(), - timeout.toMillis(), TimeUnit.MILLISECONDS, POLL_INTERVAL, POLL_UNIT ) ) - { - throw new TransactionFailureException( Status.Transaction.InstanceStateChanged, - "Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId, - transactionIdStore.getLastClosedTransactionId() ); - } - } - catch ( InterruptedException e ) - { - throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e, - "Thread interrupted when starting transaction" ); + throw new TransactionFailureException( Status.Transaction.InstanceStateChanged, + "Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId, + transactionIdStore.getLastClosedTransactionId() ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/IndexProcedures.java b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/IndexProcedures.java index aaf232f591aee..620aef329698d 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/IndexProcedures.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/IndexProcedures.java @@ -120,11 +120,6 @@ private void waitUntilOnline( IndexDescriptor index, IndexSpecifier indexDescrip throw new ProcedureException( Status.Procedure.ProcedureTimedOut, "Index on %s did not come online within %s %s", indexDescription, timeout, timeoutUnits ); } - catch ( InterruptedException e ) - { - throw new ProcedureException( Status.General.DatabaseUnavailable, - "Interrupted waiting for index on %s to come online", indexDescription ); - } } private boolean isOnline( IndexSpecifier indexDescription, IndexDescriptor index ) throws ProcedureException diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java index 2b3901d88105f..2a9c68b757a5d 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java @@ -173,10 +173,6 @@ private void awaitCompletion() { handleTimeout(); } - catch ( InterruptedException e ) - { - handleInterrupt(); - } } /** diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/countStore/InMemoryCountsStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/countStore/InMemoryCountsStore.java index d840dc22d2b9d..16ed40d08f7b7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/countStore/InMemoryCountsStore.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/countStore/InMemoryCountsStore.java @@ -144,12 +144,6 @@ public CountsSnapshot snapshot( long txId ) awaitForever( () -> lastTxId.getHighestGapFreeNumber() >= snapshot.getTxId(), 100, MILLISECONDS ); return snapshot; } - catch ( InterruptedException ex ) - { - Thread.currentThread().interrupt(); - throw Exceptions - .withCause( new UnderlyingStorageException( "Construction of snapshot was interrupted." ), ex ); - } finally { snapshot = null; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java index b19b08d080fde..8a872a2cccf10 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java @@ -29,7 +29,6 @@ import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage; import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.messaging.Inbound.MessageHandler; -import org.neo4j.function.Predicates; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -73,15 +72,8 @@ public void handle( RaftMessages.ClusterIdAwareMessage message ) return; } - try - { - // keep trying to add the message into the queue, give up only if this component has been stopped - awaitForever( () -> stopped || messageQueue.offer( message ), 100, TimeUnit.MILLISECONDS ); - } - catch ( InterruptedException e ) - { - log.warn( "Not expecting to be interrupted.", e ); - } + // keep trying to add the message into the queue, give up only if this component has been stopped + awaitForever( () -> stopped || messageQueue.offer( message ), 100, TimeUnit.MILLISECONDS ); } @Override diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index a561b5ff5dbcb..c432fbe527621 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -47,7 +47,6 @@ import org.neo4j.coreedge.core.state.machines.id.IdGenerationException; import org.neo4j.coreedge.core.state.machines.locks.LeaderOnlyLockManager; import org.neo4j.coreedge.edge.EdgeGraphDatabase; -import org.neo4j.function.Predicates; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionFailureException; @@ -60,15 +59,17 @@ import org.neo4j.test.DbRepresentation; import static java.util.Collections.emptyMap; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toList; import static org.neo4j.concurrent.Futures.combine; +import static org.neo4j.function.Predicates.await; +import static org.neo4j.function.Predicates.notNull; import static org.neo4j.helpers.collection.Iterables.firstOrNull; import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired; public class Cluster { - private static final int DEFAULT_TIMEOUT_MS = 15_000; + private static final int DEFAULT_TIMEOUT_MS = 25_000; private static final int DEFAULT_BACKOFF_MS = 100; private static final int DEFAULT_CLUSTER_SIZE = 3; @@ -275,19 +276,7 @@ public CoreClusterMember awaitLeader( long timeoutMillis ) throws TimeoutExcepti public CoreClusterMember awaitCoreMemberWithRole( long timeoutMillis, Role role ) throws TimeoutException { - long endTimeMillis = timeoutMillis + System.currentTimeMillis(); - - CoreClusterMember db; - while ( (db = getDbWithRole( role )) == null && (System.currentTimeMillis() < endTimeMillis) ) - { - LockSupport.parkNanos( MILLISECONDS.toNanos( 100 ) ); - } - - if ( db == null ) - { - throw new TimeoutException(); - } - return db; + return await( () -> getDbWithRole( role ), notNull(), timeoutMillis, TimeUnit.MILLISECONDS ); } public int numberOfCoreMembersReportedByTopology() @@ -394,12 +383,7 @@ private boolean isLockExpired( Throwable e ) public static List buildAddresses( Set coreServerIds ) { - List addresses = new ArrayList<>(); - for ( Integer i : coreServerIds ) - { - addresses.add( socketAddressForServer( i ) ); - } - return addresses; + return coreServerIds.stream().map( Cluster::socketAddressForServer ).collect( toList() ); } public static AdvertisedSocketAddress socketAddressForServer( int id ) @@ -497,7 +481,7 @@ public static void dataOnMemberEventuallyLooksLike( CoreClusterMember memberThat CoreClusterMember memberToLookLike ) throws TimeoutException, InterruptedException { - Predicates.await( () -> { + await( () -> { try { // We recalculate the DbRepresentation of both source and target, so changes can be picked up @@ -524,11 +508,10 @@ public static void dataMatchesEventually( DbRepresentation sourceRepresentation, { for ( CoreClusterMember targetDB : targetDBs ) { - Predicates.await( () -> { + await( () -> { DbRepresentation representation = DbRepresentation.of( targetDB.database() ); return sourceRepresentation.equals( representation ); - }, - DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); + }, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); } }