diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index e8b00f2e2ac9..5f888136b744 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -88,7 +88,8 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config progressRetryStrategy, leaderRetryStrategy, availabilityTimeoutMillis, - globalAvailabilityGuard, logProvider, platformModule.monitors ); + globalAvailabilityGuard, logProvider, + platformModule.monitors ); } public RaftReplicator getReplicator() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java index cccf27d093cd..e2cab05e5421 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java @@ -109,32 +109,18 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception */ channel.putInt( -1 ); } - try + + // write to chunks if empty and there is more to write + while ( txWriter.canWrite() && chunks.isEmpty() ) { - // write to chunks if empty and there is more to write - while ( txWriter.canWrite() && chunks.isEmpty() ) - { - txWriter.write( channel ); - } - // nothing more to write, close the channel to get the potential last buffer - if ( chunks.isEmpty() ) - { - channel.close(); - } - return chunks.poll(); + txWriter.write( channel ); } - catch ( Throwable t ) + // nothing more to write, close the channel to get the potential last buffer + if ( chunks.isEmpty() ) { - try - { - close(); - } - catch ( Exception e ) - { - t.addSuppressed( e ); - } - throw t; + channel.close(); } + return chunks.poll(); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java index 5046f916f58c..ce75afcf20c9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java @@ -25,8 +25,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; import io.netty.util.AttributeKey; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; import java.time.Clock; @@ -50,6 +50,7 @@ public class ReconnectingChannel implements Channel private final Log log; private final Bootstrap bootstrap; + private final EventLoop eventLoop; private final SocketAddress destination; private final TimeoutStrategy connectionBackoffStrategy; @@ -61,14 +62,16 @@ public class ReconnectingChannel implements Channel private TimeoutStrategy.Timeout connectionBackoff; private CappedLogger cappedLogger; - ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log ) + ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log ) { - this( bootstrap, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); + this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); } - private ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log, TimeoutStrategy connectionBackoffStrategy ) + private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log, + TimeoutStrategy connectionBackoffStrategy ) { this.bootstrap = bootstrap; + this.eventLoop = eventLoop; this.destination = destination; this.log = log; this.cappedLogger = new CappedLogger( log ).setTimeLimit( 20, TimeUnit.SECONDS, Clock.systemUTC() ); @@ -157,32 +160,34 @@ private Future write( Object msg, boolean flush ) if ( channel.isActive() ) { - return doWrite( msg, flush ); + if ( flush ) + { + return channel.writeAndFlush( msg ); + } + else + { + return channel.write( msg ); + } } else { - Promise promise = new DefaultPromise<>( bootstrap.config().group().next() ); + Promise promise = eventLoop.newPromise(); BiConsumer writer; - writer = ( channel, message ) -> chain( doWrite( msg, flush ), promise ); + if ( flush ) + { + writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise ); + } + else + { + writer = ( channel, message ) -> chain( channel.write( msg ), promise ); + } deferredWrite( msg, fChannel, promise, true, writer ); return promise; } } - private ChannelFuture doWrite( Object msg, boolean flush ) - { - if ( flush ) - { - return channel.writeAndFlush( msg ); - } - else - { - return channel.write( msg ); - } - } - /** * Chains a channel future to a promise. Used when the returned promise * was not allocated through the channel and cannot be used as the diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index e04c22477d59..1d979a0bf277 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -107,7 +107,7 @@ private Channel channel( AdvertisedSocketAddress destination ) if ( channel == null ) { - channel = new ReconnectingChannel( bootstrap, destination, log ); + channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log ); channel.start(); ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java deleted file mode 100644 index 4048dc4576f3..000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/WritabilityThrottler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.messaging; - -class WritabilityThrottler -{ - private boolean isWritable = true; - - synchronized void awaitWritable() throws InterruptedException - { - while ( !isWritable ) - { - wait(); - } - } - - synchronized void setIsWritable( boolean isWritable ) - { - this.isWritable = isWritable; - if ( isWritable ) - { - notifyAll(); - } - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java index 86f20ef9bd35..38f965727236 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java @@ -41,7 +41,7 @@ public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder ReplicatedContentChunkDecoder() { - setCumulator( new ContentChunkAccumulator() ); + setCumulator( new ContentChunkCumulator() ); } @Override @@ -60,7 +60,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) } } - private class ContentChunkAccumulator implements Cumulator + private class ContentChunkCumulator implements Cumulator { @Override public ByteBuf cumulate( ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java index 83ae677a408f..d7e1d2865beb 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java @@ -90,7 +90,7 @@ public void before() { elg = new NioEventLoopGroup( 0 ); Bootstrap bootstrap = new Bootstrap().channel( NioSocketChannel.class ).group( elg ).handler( childCounter ); - channel = new ReconnectingChannel( bootstrap, listenAddress, log ); + channel = new ReconnectingChannel( bootstrap, elg.next(), listenAddress, log ); } @After diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java deleted file mode 100644 index fc571ce631b7..000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/WritabilityThrottlerTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.messaging; - -import org.junit.Test; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertTrue; - -public class WritabilityThrottlerTest -{ - @Test - public void shouldBeWritableByDefault() throws InterruptedException - { - new WritabilityThrottler().awaitWritable(); - } - - @Test - public void shouldNotifyWhenWritabilityIsSetToTrue() throws ExecutionException, InterruptedException - { - // given - WritabilityThrottler writabilityThrottler = new WritabilityThrottler(); - writabilityThrottler.setIsWritable( false ); - Future waiter = Executors.newSingleThreadExecutor().submit( () -> - { - try - { - writabilityThrottler.awaitWritable(); - return true; - } - catch ( InterruptedException e ) - { - throw new RuntimeException( e ); - } - } ); - waitForTimeout( waiter ); - - // when - writabilityThrottler.setIsWritable( false ); - - // then - waitForTimeout( waiter ); - - // when - writabilityThrottler.setIsWritable( true ); - - // then - assertTrue( waiter.get() ); - } - - private void waitForTimeout( Future waiter ) throws InterruptedException, ExecutionException - { - try - { - waiter.get( 5, TimeUnit.MILLISECONDS ); - throw new IllegalStateException( "Not execpted to completed at this stage." ); - } - catch ( TimeoutException ignore ) - { - } - } -}