From 1cfb229191aa34f951cb58f1e87e510b7b8ad942 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Tue, 21 Aug 2018 10:06:59 +0200 Subject: [PATCH] Replace Throttler The new throttler resides in the ReconnectingChannel and it uses Nettys isWritable to pause writing if necessary --- .../causalclustering/ReplicationModule.java | 3 +- .../core/CausalClusteringSettings.java | 4 - .../core/replication/RaftReplicator.java | 51 +--- .../core/replication/Throttler.java | 77 ------ .../messaging/ReconnectingChannel.java | 63 +++-- .../messaging/SenderService.java | 2 +- .../messaging/WritabilityThrottler.java | 46 ++++ .../core/replication/RaftReplicatorTest.java | 26 +- .../core/replication/ThrottlerTest.java | 229 ------------------ .../messaging/ReconnectingChannelIT.java | 2 +- .../messaging/WritabilityThrottlerTest.java | 87 +++++++ 11 files changed, 186 insertions(+), 404 deletions(-) delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Throttler.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java delete mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/ThrottlerTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index 40723b9b9564b..e8b00f2e2ac99 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -72,7 +72,6 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession ); progressTracker = new ProgressTrackerImpl( myGlobalSession ); - long replicationLimit = config.get( CausalClusteringSettings.replication_total_size_limit ); Duration initialBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_base ); Duration upperBoundBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_limit ); Duration leaderBackoff = config.get( CausalClusteringSettings.replication_leader_retry_timeout ); @@ -89,7 +88,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config progressRetryStrategy, leaderRetryStrategy, availabilityTimeoutMillis, - globalAvailabilityGuard, logProvider, replicationLimit, platformModule.monitors ); + globalAvailabilityGuard, logProvider, platformModule.monitors ); } public RaftReplicator getReplicator() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 86fb7adfcb4c0..d47ad4be53465 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -275,10 +275,6 @@ public HostnameResolver getHostnameResolver( LogProvider logProvider, LogProvide public static final Setting replicated_lock_token_state_size = setting( "causal_clustering.replicated_lock_token_state_size", INTEGER, "1000" ); - @Description( "The maximum amount of data which can be in the replication stage concurrently." ) - public static final Setting replication_total_size_limit = - setting( "causal_clustering.replication_total_size_limit", BYTES, "128M" ); - @Description( "The initial timeout until replication is retried. The timeout will increase exponentially." ) public static final Setting replication_retry_timeout_base = setting( "causal_clustering.replication_retry_timeout_base", DURATION, "10s" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 928d829f827bb..2007fa7672d03 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -56,13 +56,12 @@ public class RaftReplicator implements Replicator, LeaderListener private final LeaderLocator leaderLocator; private final TimeoutStrategy leaderTimeoutStrategy; private final Log log; - private final Throttler throttler; private final ReplicationMonitor replicationMonitor; private final long availabilityTimeoutMillis; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, long availabilityTimeoutMillis, - AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Monitors monitors ) + AvailabilityGuard availabilityGuard, LogProvider logProvider, Monitors monitors ) { this.me = me; this.outbound = outbound; @@ -72,7 +71,6 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound replicate( ReplicatedContent command, boolean trackResult { throw new ReplicationFailureException( "Replication aborted since no leader was available", e ); } - - if ( command.size().isPresent() ) - { - try - { - return throttler.invoke( () -> replicate0( command, trackResult, originalLeader ), command.size().get() ); - } - catch ( InterruptedException e ) - { - throw new ReplicationFailureException( "Interrupted while waiting for replication credits", e ); - } - } - else - { - return replicate0( command, trackResult, originalLeader ); - } + return replicate0( command, trackResult, originalLeader ); } private Future replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException @@ -114,8 +97,6 @@ private Future replicate0( ReplicatedContent command, boolean trackResul replicationMonitor.startReplication(); try { - assertNoLeaderSwitch( leader ); - OperationContext session = sessionPool.acquireSession(); DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() ); @@ -126,7 +107,7 @@ private Future replicate0( ReplicatedContent command, boolean trackResul int attempts = 0; try { - do + while ( true ) { attempts++; if ( attempts > 1 ) @@ -139,10 +120,11 @@ private Future replicate0( ReplicatedContent command, boolean trackResul { // blocking at least until the send has succeeded or failed before retrying outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true ); - - leaderTimeout = leaderTimeoutStrategy.newTimeout(); - progress.awaitReplication( progressTimeout.getMillis() ); + if ( progress.isReplicated() ) + { + break; + } progressTimeout.increment(); leader = leaderLocator.getLeader(); } @@ -153,7 +135,6 @@ private Future replicate0( ReplicatedContent command, boolean trackResul leaderTimeout.increment(); } } - while ( !progress.isReplicated() ); } catch ( InterruptedException e ) { @@ -182,24 +163,6 @@ private Future replicate0( ReplicatedContent command, boolean trackResul } - private void assertNoLeaderSwitch( MemberId originalLeader ) throws ReplicationFailureException - { - MemberId currentLeader; - try - { - currentLeader = leaderLocator.getLeader(); - } - catch ( NoLeaderFoundException e ) - { - throw new ReplicationFailureException( "Replication aborted since no leader was available", e ); - } - - if ( !currentLeader.equals( originalLeader ) ) - { - throw new ReplicationFailureException( "Replication aborted since a leader switch was detected" ); - } - } - @Override public void onLeaderSwitch( LeaderInfo leaderInfo ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Throttler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Throttler.java deleted file mode 100644 index 7846a6fb568bf..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Throttler.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.core.replication; - -import org.neo4j.function.ThrowingSupplier; - -/** - * Throttles calls to underlying invocation based on available credits. - */ -class Throttler -{ - private final long creditLimit; - private long currentCredit; - - Throttler( long creditLimit ) - { - this.creditLimit = creditLimit; - } - - private synchronized void acquire( long credits ) throws InterruptedException - { - while ( currentCredit >= creditLimit ) - { - wait(); - } - - currentCredit += credits; - - if ( currentCredit < creditLimit ) - { - notify(); - } - } - - private synchronized void release( long credits ) - { - currentCredit -= credits; - - if ( currentCredit < creditLimit ) - { - notify(); - } - } - - R invoke( ThrowingSupplier call, long credits ) throws E, InterruptedException - { - acquire( credits ); - try - { - return call.get(); - } - finally - { - release( credits ); - } - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java index ce75afcf20c91..3937ef948afc4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java @@ -25,8 +25,10 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.EventLoop; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; import java.time.Clock; @@ -50,9 +52,9 @@ public class ReconnectingChannel implements Channel private final Log log; private final Bootstrap bootstrap; - private final EventLoop eventLoop; private final SocketAddress destination; private final TimeoutStrategy connectionBackoffStrategy; + private final WritabilityThrottler writabilityThrottler; private volatile io.netty.channel.Channel channel; private volatile ChannelFuture fChannel; @@ -62,21 +64,21 @@ public class ReconnectingChannel implements Channel private TimeoutStrategy.Timeout connectionBackoff; private CappedLogger cappedLogger; - ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log ) + ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log ) { - this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); + this( bootstrap, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ), new WritabilityThrottler() ); } - private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log, - TimeoutStrategy connectionBackoffStrategy ) + private ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log, TimeoutStrategy connectionBackoffStrategy, + WritabilityThrottler writabilityThrottler ) { this.bootstrap = bootstrap; - this.eventLoop = eventLoop; this.destination = destination; this.log = log; this.cappedLogger = new CappedLogger( log ).setTimeLimit( 20, TimeUnit.SECONDS, Clock.systemUTC() ); this.connectionBackoffStrategy = connectionBackoffStrategy; this.connectionBackoff = connectionBackoffStrategy.newTimeout(); + this.writabilityThrottler = writabilityThrottler; } void start() @@ -97,6 +99,16 @@ else if ( fChannel != null && !fChannel.isDone() ) fChannel = bootstrap.connect( destination.socketAddress() ); channel = fChannel.channel(); + writabilityThrottler.setIsWritable( channel.isWritable() ); + channel.pipeline().addFirst( new ChannelInboundHandlerAdapter() + { + @Override + public void channelWritabilityChanged( ChannelHandlerContext ctx ) throws Exception + { + writabilityThrottler.setIsWritable( ctx.channel().isWritable() ); + super.channelWritabilityChanged( ctx ); + } + } ); fChannel.addListener( ( ChannelFuture f ) -> { @@ -160,31 +172,40 @@ private Future write( Object msg, boolean flush ) if ( channel.isActive() ) { - if ( flush ) - { - return channel.writeAndFlush( msg ); - } - else - { - return channel.write( msg ); - } + return awaitAndWrite( msg, flush ); } else { - Promise promise = eventLoop.newPromise(); + Promise promise = new DefaultPromise<>( bootstrap.config().group().next() ); BiConsumer writer; + writer = ( channel, message ) -> chain( awaitAndWrite( msg, flush ), promise ); + + deferredWrite( msg, fChannel, promise, true, writer ); + return promise; + } + } + + private ChannelFuture awaitAndWrite( Object msg, boolean flush ) + { + try + { + writabilityThrottler.awaitWritable(); if ( flush ) { - writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise ); + return channel.writeAndFlush( msg ); } else { - writer = ( channel, message ) -> chain( channel.write( msg ), promise ); + return channel.write( msg ); } - - deferredWrite( msg, fChannel, promise, true, writer ); - return promise; + } + catch ( InterruptedException e ) + { + log.warn( "Interrupted while awaiting writability" ); + Thread.currentThread().interrupt(); + channel.close(); + return channel.voidPromise().setFailure( e ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index 1d979a0bf2772..e04c22477d595 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -107,7 +107,7 @@ private Channel channel( AdvertisedSocketAddress destination ) if ( channel == null ) { - channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log ); + channel = new ReconnectingChannel( bootstrap, destination, log ); channel.start(); ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java new file mode 100644 index 0000000000000..67eb19790651d --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.messaging; + +import java.util.concurrent.atomic.AtomicBoolean; + +class WritabilityThrottler +{ + private AtomicBoolean isWritable = new AtomicBoolean( true ); + + synchronized void awaitWritable() throws InterruptedException + { + if ( !isWritable.get() ) + { + wait(); + } + } + + synchronized void setIsWritable( boolean isWritable ) + { + if ( this.isWritable.compareAndSet( !isWritable, isWritable ) && isWritable ) + { + notify(); + } + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index 5d64c0daca68a..98d270179d4a8 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -252,34 +252,10 @@ public void shouldFailIfNoLeaderIsAvailable() throws NoLeaderFoundException } } - @Test - public void shouldFailIfLeaderSwitches() throws NoLeaderFoundException - { - // given - when( leaderLocator.getLeader() ).thenReturn( leader ).thenReturn( anotherLeader ); - - CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); - CapturingOutbound outbound = new CapturingOutbound<>(); - - RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); - - // when - try - { - ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); - replicator.replicate( content, true ); - fail( "should have thrown" ); - } - catch ( ReplicationFailureException e ) - { - assertEquals( "Replication aborted since a leader switch was detected", e.getMessage() ); - } - } - private RaftReplicator getReplicator( CapturingOutbound outbound, CapturingProgressTracker capturedProgress, Monitors monitors ) { return new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, - 10, databaseAvailabilityGuard, NullLogProvider.getInstance(), REPLICATION_LIMIT, monitors ); + 10, databaseAvailabilityGuard, NullLogProvider.getInstance(), monitors ); } private ReplicatingThread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/ThrottlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/ThrottlerTest.java deleted file mode 100644 index 35a64091483b3..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/ThrottlerTest.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.core.replication; - -import org.junit.After; -import org.junit.Test; - -import java.util.HashSet; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; - -import org.neo4j.function.ThrowingSupplier; - -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.neo4j.test.assertion.Assert.assertEventually; - -public class ThrottlerTest -{ - private ExecutorService es = Executors.newCachedThreadPool(); - private ExecutorCompletionService ecs = new ExecutorCompletionService<>( es ); - - @After - public void after() throws InterruptedException - { - es.shutdown(); - es.awaitTermination( 1, MINUTES ); - } - - @Test - public void shouldAllowInvocationWhenCreditsAvailable() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Counter counter = new Counter(); - - // when - int count = ecs.submit( () -> throttler.invoke( counter, 1000 ) ).get( 1, MINUTES ); - - // then - assertEquals( 1, count ); - } - - @Test - public void shouldAllowSequentialInvocations() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Counter counter = new Counter(); - - // when - HashSet set = new HashSet<>(); - set.add( ecs.submit( () -> throttler.invoke( counter, 1000 ) ).get( 1, MINUTES ) ); - set.add( ecs.submit( () -> throttler.invoke( counter, 1000 ) ).get( 1, MINUTES ) ); - set.add( ecs.submit( () -> throttler.invoke( counter, 1000 ) ).get( 1, MINUTES ) ); - - // then - assertThat( set, hasItems( 1, 2, 3 ) ); - } - - @Test - public void shouldAllowOneInvocationOversteppingTheLimit() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Counter counter = new Counter(); - ecs.submit( () -> throttler.invoke( counter, 500 ) ).get( 1, MINUTES ); - assertEventually( counter::count, equalTo( 1 ), 1, MINUTES ); - - // when - int count = ecs.submit( () -> throttler.invoke( counter, 800 ) ).get( 1, MINUTES ); - - // then - assertEquals( 2, count ); - } - - @Test - public void shouldBlockInvocationWhenCreditsNotAvailable() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Blocker blocker = new Blocker(); - Future call1 = ecs.submit( () -> throttler.invoke( blocker, 1200 ) ); - assertEventually( blocker::count, equalTo( 1 ), 1, MINUTES ); - - // when - Future call2 = ecs.submit( () -> throttler.invoke( blocker, 800 ) ); - Thread.sleep( 10 ); - - // then - assertEquals( 1, blocker.count() ); - assertFalse( call1.isDone() ); - assertFalse( call2.isDone() ); - - // cleanup - blocker.release( 2 ); - call1.get( 1, MINUTES ); - call2.get( 1, MINUTES ); - } - - @Test - public void shouldInvokeWhenCreditsBecomeAvailable() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Blocker blocker = new Blocker(); - - // when - Future call1 = ecs.submit( () -> throttler.invoke( blocker, 1200 ) ); - - // then - assertEventually( blocker::count, equalTo( 1 ), 1, MINUTES ); - - // when - blocker.release( 1 ); - Future call2 = ecs.submit( () -> throttler.invoke( blocker, 800 ) ); - - // then - call1.get( 1, MINUTES ); - assertEventually( blocker::count, equalTo( 2 ), 1, MINUTES ); - assertFalse( call2.isDone() ); - - // cleanup - blocker.release( 1 ); - call2.get( 1, MINUTES ); - } - - @Test - public void shouldInvokeMultipleWhenCreditsBecomeAvailable() throws Exception - { - // given - Throttler throttler = new Throttler( 1000 ); - Blocker blocker = new Blocker(); - - // when - Future call1 = ecs.submit( () -> throttler.invoke( blocker, 2000 ) ); - - // then - assertEventually( blocker::count, equalTo( 1 ), 1, MINUTES ); - - // when - Future call2 = ecs.submit( () -> throttler.invoke( blocker, 400 ) ); - Future call3 = ecs.submit( () -> throttler.invoke( blocker, 400 ) ); - Thread.sleep( 10 ); - - // then - assertEquals( 1, blocker.count() ); - - // when - blocker.release( 1 ); - - // then - call1.get( 1, MINUTES ); - assertEventually( blocker::count, equalTo( 3 ), 1, MINUTES ); - - // cleanup - blocker.release( 2 ); - call2.get( 1, MINUTES ); - call3.get( 1, MINUTES ); - } - - static class Counter implements ThrowingSupplier - { - private final AtomicInteger count = new AtomicInteger(); - - @Override - public Integer get() - { - return count.incrementAndGet(); - } - - public int count() - { - return count.get(); - } - } - - static class Blocker implements ThrowingSupplier - { - private final Semaphore semaphore = new Semaphore( 0 ); - private final AtomicInteger count = new AtomicInteger(); - - @Override - public Integer get() throws Exception - { - count.incrementAndGet(); - semaphore.acquire(); - return semaphore.availablePermits(); - } - - void release( int permits ) - { - semaphore.release( permits ); - } - - int count() - { - return count.get(); - } - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java index d7e1d2865bebe..83ae677a408f9 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java @@ -90,7 +90,7 @@ public void before() { elg = new NioEventLoopGroup( 0 ); Bootstrap bootstrap = new Bootstrap().channel( NioSocketChannel.class ).group( elg ).handler( childCounter ); - channel = new ReconnectingChannel( bootstrap, elg.next(), listenAddress, log ); + channel = new ReconnectingChannel( bootstrap, listenAddress, log ); } @After diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java new file mode 100644 index 0000000000000..fc571ce631b75 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.messaging; + +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; + +public class WritabilityThrottlerTest +{ + @Test + public void shouldBeWritableByDefault() throws InterruptedException + { + new WritabilityThrottler().awaitWritable(); + } + + @Test + public void shouldNotifyWhenWritabilityIsSetToTrue() throws ExecutionException, InterruptedException + { + // given + WritabilityThrottler writabilityThrottler = new WritabilityThrottler(); + writabilityThrottler.setIsWritable( false ); + Future waiter = Executors.newSingleThreadExecutor().submit( () -> + { + try + { + writabilityThrottler.awaitWritable(); + return true; + } + catch ( InterruptedException e ) + { + throw new RuntimeException( e ); + } + } ); + waitForTimeout( waiter ); + + // when + writabilityThrottler.setIsWritable( false ); + + // then + waitForTimeout( waiter ); + + // when + writabilityThrottler.setIsWritable( true ); + + // then + assertTrue( waiter.get() ); + } + + private void waitForTimeout( Future waiter ) throws InterruptedException, ExecutionException + { + try + { + waiter.get( 5, TimeUnit.MILLISECONDS ); + throw new IllegalStateException( "Not execpted to completed at this stage." ); + } + catch ( TimeoutException ignore ) + { + } + } +}