diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java index 355b9df5659c4..1b320698d1d7f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java @@ -26,13 +26,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; public class FileChunkEncoder extends MessageToByteEncoder { @Override protected void encode( ChannelHandlerContext ctx, FileChunk chunk, ByteBuf out ) throws Exception { - FileChunk.marshal().marshal( chunk, new NetworkFlushableByteBuf( out ) ); + FileChunk.marshal().marshal( chunk, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java index 7c5c4d7947e96..5d4c998740332 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java @@ -34,7 +34,7 @@ import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; @@ -102,7 +102,7 @@ public static class Encoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, GetIndexFilesRequest msg, ByteBuf out ) throws Exception { - new IndexSnapshotRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); + new IndexSnapshotRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java index c148792053a1a..53daad62c47ec 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java @@ -35,7 +35,7 @@ import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; @@ -108,7 +108,7 @@ public static class Encoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, GetStoreFileRequest msg, ByteBuf out ) throws Exception { - new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); + new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java index fab6ca371811d..155c98590674c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java @@ -28,7 +28,7 @@ import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; public class GetStoreIdResponseEncoder extends MessageToByteEncoder @@ -36,6 +36,6 @@ public class GetStoreIdResponseEncoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, StoreId storeId, ByteBuf out ) throws Exception { - StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkFlushableByteBuf( out ) ); + StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java index e83775b82e7f9..32720dfe6cf3f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder @@ -34,6 +34,6 @@ public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder @@ -35,6 +35,6 @@ public class TxPullRequestEncoder extends MessageToByteEncoder protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception { out.writeLong( request.previousTxId() ); - StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) ); + StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new BoundedNetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java index 77fc4f0a725c9..d120e439c75ae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter; @@ -35,7 +35,7 @@ public class TxPullResponseEncoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception { - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel ); new LogEntryWriter( channel ).serialize( response.tx() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java index 4041dfea74c12..ffeef80935327 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java @@ -31,7 +31,7 @@ import java.util.LinkedList; import java.util.List; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.StringMarshal; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; @@ -97,7 +97,7 @@ public static ReplicatedTokenRequest unmarshal( ByteBuf buffer ) public static byte[] commandBytes( Collection commands ) { ByteBuf commandBuffer = Unpooled.buffer(); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( commandBuffer ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( commandBuffer ); try { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java index da0cace081299..cccf27d093cdc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; @@ -10,14 +32,14 @@ import java.util.Queue; import org.neo4j.causalclustering.helper.ErrorHandler; -import org.neo4j.causalclustering.messaging.BoundedNetworkChannel; +import org.neo4j.causalclustering.messaging.ChunkingNetworkChannel; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; class ChunkedTransaction implements ChunkedInput { private static final int CHUNK_SIZE = 32 * 1024; private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter; - private BoundedNetworkChannel channel; + private ChunkingNetworkChannel channel; private Queue chunks = new LinkedList<>(); ChunkedTransaction( TransactionRepresentation tx ) @@ -80,7 +102,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception if ( channel == null ) { // Ensure that the written buffers does not overflow the allocators chunk size. - channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks ); + channel = new ChunkingNetworkChannel( allocator, CHUNK_SIZE, chunks ); /* Unknown length. The reason for sending this int is to avoid conflicts with Raft V1. This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1. diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java index 3fec22cd17543..691088f6f0007 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java @@ -28,8 +28,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.neo4j.causalclustering.messaging.ByteBufBacked; import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder; import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -70,20 +72,43 @@ public static void marshal( WritableChannel writableChannel, ByteArrayReplicated public static void marshal( WritableChannel writableChannel, TransactionRepresentationReplicatedTransaction replicatedTransaction ) throws IOException { - /* - Unknown length. This method will never be used in production. When a ReplicatedTransaction is serialized it has already passed over the network - and a more efficient marshalling is used in ByteArrayReplicatedTransaction. - */ - ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( replicatedTransaction.tx() ); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 ); - OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream ); + if ( writableChannel instanceof ByteBufBacked ) + { + /* + * Marshals more efficiently if Channel is going over the network. In practice, this means maintaining support for + * RaftV1 without loosing performance + */ + ByteBuf buffer = ((ByteBufBacked) writableChannel).byteBuf(); + int metaDataIndex = buffer.writerIndex(); + int txStartIndex = metaDataIndex + Integer.BYTES; + // leave room for length to be set later. + buffer.writerIndex( txStartIndex ); + writeTx( writableChannel, replicatedTransaction.tx() ); + int txLength = buffer.writerIndex() - txStartIndex; + buffer.setInt( metaDataIndex, txLength ); + } + else + { + /* + * Unknown length. This should only be reached in tests. When a ReplicatedTransaction is marshaled to file it has already passed over the network + * and is of a different type. More efficient marshalling is used in ByteArrayReplicatedTransaction. + */ + ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 ); + OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream ); + writeTx( outputStreamWritableChannel, replicatedTransaction.tx() ); + int length = outputStream.size(); + writableChannel.putInt( length ); + writableChannel.put( outputStream.toByteArray(), length ); + } + } + + private static void writeTx( WritableChannel writableChannel, TransactionRepresentation tx ) throws IOException + { + ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( tx ); while ( txWriter.canWrite() ) { - txWriter.write( outputStreamWritableChannel ); + txWriter.write( writableChannel ); } - int length = outputStream.size(); - writableChannel.putInt( length ); - writableChannel.put( outputStream.toByteArray(), length ); } public static ChunkedInput encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java index 91d405f3b86b4..1c773f21d7050 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java @@ -26,13 +26,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; public class CoreSnapshotEncoder extends MessageToByteEncoder { @Override protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, ByteBuf out ) throws Exception { - new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( out ) ); + new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java similarity index 76% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java index 61c0400458fbb..b56bfb066e987 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java @@ -24,14 +24,12 @@ import io.netty.buffer.ByteBuf; -import java.io.Flushable; - -import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.storageengine.api.WritableChannel; import static java.lang.String.format; import static org.neo4j.io.ByteUnit.mebiBytes; -public class NetworkFlushableChannelNetty4 implements FlushableChannel +public class BoundedNetworkWritableChannel implements WritableChannel, ByteBufBacked { /** * This implementation puts an upper limit to the size of the state serialized in the buffer. The default @@ -45,12 +43,12 @@ public class NetworkFlushableChannelNetty4 implements FlushableChannel private final long sizeLimit; - public NetworkFlushableChannelNetty4( ByteBuf delegate ) + public BoundedNetworkWritableChannel( ByteBuf delegate ) { this( delegate, DEFAULT_SIZE_LIMIT ); } - public NetworkFlushableChannelNetty4( ByteBuf delegate, long sizeLimit ) + public BoundedNetworkWritableChannel( ByteBuf delegate, long sizeLimit ) { this.delegate = delegate; this.initialWriterIndex = delegate.writerIndex(); @@ -58,13 +56,7 @@ public NetworkFlushableChannelNetty4( ByteBuf delegate, long sizeLimit ) } @Override - public Flushable prepareForFlush() - { - return null; - } - - @Override - public FlushableChannel put( byte value ) throws MessageTooBigException + public WritableChannel put( byte value ) throws MessageTooBigException { checkSize( Byte.BYTES ); delegate.writeByte( value ); @@ -72,7 +64,7 @@ public FlushableChannel put( byte value ) throws MessageTooBigException } @Override - public FlushableChannel putShort( short value ) throws MessageTooBigException + public WritableChannel putShort( short value ) throws MessageTooBigException { checkSize( Short.BYTES ); delegate.writeShort( value ); @@ -80,7 +72,7 @@ public FlushableChannel putShort( short value ) throws MessageTooBigException } @Override - public FlushableChannel putInt( int value ) throws MessageTooBigException + public WritableChannel putInt( int value ) throws MessageTooBigException { checkSize( Integer.BYTES ); delegate.writeInt( value ); @@ -88,7 +80,7 @@ public FlushableChannel putInt( int value ) throws MessageTooBigException } @Override - public FlushableChannel putLong( long value ) throws MessageTooBigException + public WritableChannel putLong( long value ) throws MessageTooBigException { checkSize( Long.BYTES ); delegate.writeLong( value ); @@ -96,7 +88,7 @@ public FlushableChannel putLong( long value ) throws MessageTooBigException } @Override - public FlushableChannel putFloat( float value ) throws MessageTooBigException + public WritableChannel putFloat( float value ) throws MessageTooBigException { checkSize( Float.BYTES ); delegate.writeFloat( value ); @@ -104,7 +96,7 @@ public FlushableChannel putFloat( float value ) throws MessageTooBigException } @Override - public FlushableChannel putDouble( double value ) throws MessageTooBigException + public WritableChannel putDouble( double value ) throws MessageTooBigException { checkSize( Double.BYTES ); delegate.writeDouble( value ); @@ -112,18 +104,13 @@ public FlushableChannel putDouble( double value ) throws MessageTooBigException } @Override - public FlushableChannel put( byte[] value, int length ) throws MessageTooBigException + public WritableChannel put( byte[] value, int length ) throws MessageTooBigException { checkSize( length ); delegate.writeBytes( value, 0, length ); return this; } - @Override - public void close() - { - } - private void checkSize( int additional ) throws MessageTooBigException { int writtenSoFar = delegate.writerIndex() - initialWriterIndex; @@ -135,4 +122,10 @@ private void checkSize( int additional ) throws MessageTooBigException sizeLimit, additional, delegate.writerIndex(), initialWriterIndex, writtenSoFar ) ); } } + + @Override + public ByteBuf byteBuf() + { + return delegate; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java new file mode 100644 index 0000000000000..28c422e4bf16e --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.buffer.ByteBuf; + +public interface ByteBufBacked +{ + ByteBuf byteBuf(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java similarity index 97% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java index 84180217312b7..f53509c317ac7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java @@ -36,7 +36,7 @@ * Uses provided allocator to create {@link ByteBuf}. The buffers will be split if maximum size is reached. The full buffer is then added * to the provided output and a new buffer is allocated. If the output queue is bounded then writing to this channel may block! */ -public class BoundedNetworkChannel implements WritableChannel, AutoCloseable +public class ChunkingNetworkChannel implements WritableChannel, AutoCloseable { private static final int DEFAULT_INIT_CHUNK_SIZE = 512; private final ByteBufAllocator allocator; @@ -51,7 +51,7 @@ public class BoundedNetworkChannel implements WritableChannel, AutoCloseable * @param maxChunkSize when reached the current buffer will be moved to the @param outputQueue and a new {@link ByteBuf} is allocated * @param outputQueue full or flushed buffers are added here. If this queue is bounded then writing to this channel may block! */ - public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue outputQueue ) + public ChunkingNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue outputQueue ) { Objects.requireNonNull( allocator, "allocator cannot be null" ); Objects.requireNonNull( outputQueue, "outputQueue cannot be null" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java similarity index 74% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java index bf84b71b15e91..6144f86905f04 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java @@ -27,77 +27,68 @@ import io.netty.buffer.ByteBuf; import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.storageengine.api.WritableChannel; -public class NetworkFlushableByteBuf implements FlushableChannel +public class NetworkWritableChannel implements WritableChannel, ByteBufBacked { private final ByteBuf delegate; - public NetworkFlushableByteBuf( ByteBuf byteBuf ) + public NetworkWritableChannel( ByteBuf byteBuf ) { this.delegate = byteBuf; } @Override - public FlushableChannel put( byte value ) + public WritableChannel put( byte value ) { delegate.writeByte( value ); return this; } @Override - public FlushableChannel putShort( short value ) + public WritableChannel putShort( short value ) { delegate.writeShort( value ); return this; } @Override - public FlushableChannel putInt( int value ) + public WritableChannel putInt( int value ) { delegate.writeInt( value ); return this; } @Override - public FlushableChannel putLong( long value ) + public WritableChannel putLong( long value ) { delegate.writeLong( value ); return this; } @Override - public FlushableChannel putFloat( float value ) + public WritableChannel putFloat( float value ) { delegate.writeFloat( value ); return this; } @Override - public FlushableChannel putDouble( double value ) + public WritableChannel putDouble( double value ) { delegate.writeDouble( value ); return this; } @Override - public FlushableChannel put( byte[] value, int length ) + public WritableChannel put( byte[] value, int length ) { delegate.writeBytes( value, 0, length ); return this; } @Override - public void close() - { - } - - @Override - public Flushable prepareForFlush() - { - return null; - } - - public ByteBuf buffer() + public ByteBuf byteBuf() { return delegate; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java index a12bd1b5d467b..ab7c825975b9c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -30,7 +30,7 @@ import java.io.IOException; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.function.ThrowingConsumer; import org.neo4j.storageengine.api.WritableChannel; @@ -173,7 +173,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception return null; } ByteBuf buffer = allocator.buffer(); - marshaller.accept( new NetworkFlushableChannelNetty4( buffer ) ); + marshaller.accept( new BoundedNetworkWritableChannel( buffer ) ); isEndOfInput = true; offset = buffer.readableBytes(); return buffer; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java index 94f5377ea51fe..b6eb3864883e4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java @@ -29,11 +29,12 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.core.state.machines.tx.TransactionRepresentationReplicatedTransaction; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; +import org.neo4j.io.ByteUnit; public class RaftMessageEncoder extends MessageToByteEncoder { @@ -53,7 +54,7 @@ public synchronized void encode( ChannelHandlerContext ctx, ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); memberMarshal.marshal( message.from(), channel ); @@ -65,9 +66,9 @@ private static class Handler implements RaftMessages.Handler { private final ChannelMarshal marshal; private final MemberId.Marshal memberMarshal; - private final NetworkFlushableByteBuf channel; + private final NetworkWritableChannel channel; - Handler( ChannelMarshal marshal, MemberId.Marshal memberMarshal, NetworkFlushableByteBuf channel ) + Handler( ChannelMarshal marshal, MemberId.Marshal memberMarshal, NetworkWritableChannel channel ) { this.marshal = marshal; this.memberMarshal = memberMarshal; @@ -147,18 +148,8 @@ public Void handle( RaftMessages.AppendEntries.Response appendResponse ) @Override public Void handle( RaftMessages.NewEntry.Request newEntryRequest ) throws Exception { - ReplicatedContent content = newEntryRequest.content(); - ByteBuf buffer = channel.buffer(); - int contentStartIndex = buffer.writerIndex() + 1; - marshal.marshal( content, channel ); - if ( content instanceof TransactionRepresentationReplicatedTransaction ) - { - // TransactionRepresentationReplicatedTransaction does not support marshal because it has unknown size - int contentEndIndex = buffer.writerIndex(); - int size = contentEndIndex - contentStartIndex - Integer.BYTES; // the integer is the length integer which should be excluded - buffer.setInt( contentStartIndex, size ); - } - + BoundedNetworkWritableChannel sizeBoundChannel = new BoundedNetworkWritableChannel( channel.byteBuf(), ByteUnit.gibiBytes( 1 ) ); + marshal.marshal( newEntryRequest.content(), sizeBoundChannel ); return null; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java index 3cb661cadca07..5edb6feee3c13 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; public class RaftMessageEncoder extends MessageToByteEncoder @@ -41,7 +41,7 @@ protected void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMes ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); channel.put( ContentType.Message.get() ); ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); @@ -53,9 +53,9 @@ protected void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMes private static class Handler implements RaftMessages.Handler { private final MemberId.Marshal memberMarshal; - private final NetworkFlushableByteBuf channel; + private final NetworkWritableChannel channel; - Handler( MemberId.Marshal memberMarshal, NetworkFlushableByteBuf channel ) + Handler( MemberId.Marshal memberMarshal, NetworkWritableChannel channel ) { this.memberMarshal = memberMarshal; this.channel = channel; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java index 8e377b02522f5..5f1e6eb9452ce 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java @@ -28,7 +28,7 @@ import java.util.UUID; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.identity.MemberId; @@ -48,7 +48,7 @@ public void shouldSerializeAndDeserialize() throws Exception // when ByteBuf buffer = Unpooled.buffer( 1_000 ); - marshal.marshal( member, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( member, new BoundedNetworkWritableChannel( buffer ) ); final MemberId recovered = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); // then @@ -66,7 +66,7 @@ public void shouldThrowExceptionForHalfWrittenInstance() throws Exception ByteBuf buffer = Unpooled.buffer( 1000 ); // and the CoreMember is serialized but for 5 bytes at the end - marshal.marshal( aRealMember, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( aRealMember, new BoundedNetworkWritableChannel( buffer ) ); ByteBuf bufferWithMissingBytes = buffer.copy( 0, buffer.writerIndex() - 5 ); // when diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java index f76086b23e2d7..33cc95f75ec4b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java @@ -29,7 +29,7 @@ import java.util.Set; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import static org.hamcrest.MatcherAssert.assertThat; @@ -143,7 +143,7 @@ public void shouldMarshalCorrectly() throws Exception // when ByteBuf buffer = Unpooled.buffer( 1_000 ); - marshal.marshal( state, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( state, new BoundedNetworkWritableChannel( buffer ) ); final RaftMembershipState recovered = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); // then diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java index 1fcfda4ac9f04..5226e82a4735d 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java @@ -41,7 +41,7 @@ import org.neo4j.causalclustering.core.state.machines.tx.TransactionRepresentationReplicatedTransaction; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal; @@ -127,7 +127,7 @@ public void shouldMarshalTokenRequest() throws Exception private void assertMarshalingEquality( ByteBuf buffer, ReplicatedContent replicatedTx ) throws IOException, EndOfStreamException { - marshal.marshal( replicatedTx, new NetworkFlushableByteBuf( buffer ) ); + marshal.marshal( replicatedTx, new NetworkWritableChannel( buffer ) ); assertThat( marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ), equalTo( replicatedTx ) ); } @@ -135,7 +135,7 @@ private void assertMarshalingEquality( ByteBuf buffer, ReplicatedContent replica private void assertMarshalingEquality( ByteBuf buffer, TransactionRepresentationReplicatedTransaction replicatedTx ) throws IOException, EndOfStreamException { - marshal.marshal( replicatedTx, new NetworkFlushableByteBuf( buffer ) ); + marshal.marshal( replicatedTx, new NetworkWritableChannel( buffer ) ); ReplicatedContent unmarshal = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java index 4f327741fba87..ad3ef6d050dbe 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java @@ -23,14 +23,20 @@ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.helpers.Buffers; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; +import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; @@ -39,6 +45,9 @@ public class TransactionRepresentationReplicatedTransactionTest { + @Rule + public final Buffers buffers = new Buffers(); + @Test public void shouldDecodeAndUnmarshalSameBytes() throws IOException { @@ -47,9 +56,9 @@ public void shouldDecodeAndUnmarshalSameBytes() throws IOException expectedTx.setHeader( new byte[0], 1, 2, 3, 4, 5, 6 ); - ByteBuf buffer = Unpooled.buffer(); + ByteBuf buffer = buffers.buffer(); TransactionRepresentationReplicatedTransaction replicatedTransaction = ReplicatedTransaction.from( expectedTx ); - replicatedTransaction.marshal( new NetworkFlushableChannelNetty4( buffer ) ); + replicatedTransaction.marshal( new BoundedNetworkWritableChannel( buffer ) ); ReplicatedTransaction decoded = ReplicatedTransactionSerializer.unmarshal( buffer ); buffer.readerIndex( 0 ); @@ -59,4 +68,26 @@ public void shouldDecodeAndUnmarshalSameBytes() throws IOException buffer.release(); } + + @Test + public void shouldMarshalToSameByteIfByteBufBackedOrNot() throws IOException + { + PhysicalTransactionRepresentation expectedTx = + new PhysicalTransactionRepresentation( Collections.singleton( new Command.NodeCommand( new NodeRecord( 1 ), new NodeRecord( 2 ) ) ) ); + + expectedTx.setHeader( new byte[0], 1, 2, 3, 4, 5, 6 ); + TransactionRepresentationReplicatedTransaction replicatedTransaction = ReplicatedTransaction.from( expectedTx ); + + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ByteBuf buffer = buffers.buffer(); + OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( stream ); + NetworkWritableChannel networkWritableChannel = new NetworkWritableChannel( buffer ); + + replicatedTransaction.marshal( outputStreamWritableChannel ); + replicatedTransaction.marshal( networkWritableChannel ); + + byte[] bufferArray = Arrays.copyOf( buffer.array(), buffer.writerIndex() ); + + Assertions.assertArrayEquals( bufferArray, stream.toByteArray() ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java similarity index 88% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java index 1f2ccf4af66d1..61d57db9e1055 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java @@ -28,14 +28,14 @@ import static org.junit.Assert.fail; -public class NetworkFlushableChannelNetty4Test +public class BoundedNetworkWritableChannelTest { @Test public void shouldRespectSizeLimit() throws Exception { // Given int sizeLimit = 100; - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( Unpooled.buffer(), sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( Unpooled.buffer(), sizeLimit ); // when for ( int i = 0; i < sizeLimit; i++ ) @@ -59,7 +59,7 @@ public void sizeLimitShouldWorkWithArrays() throws Exception { // Given int sizeLimit = 100; - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( Unpooled.buffer(), sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( Unpooled.buffer(), sizeLimit ); // When int padding = 10; @@ -89,7 +89,7 @@ public void shouldNotCountBytesAlreadyInBuffer() throws Exception int padding = Long.BYTES; buffer.writeLong( 0 ); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( buffer, sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( buffer, sizeLimit ); // When for ( int i = 0; i < sizeLimit - padding; i++ ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java similarity index 90% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java index fa25c8c3eceb5..a95604af6fe4a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java @@ -26,7 +26,6 @@ import org.junit.Rule; import org.junit.Test; -import java.io.IOException; import java.util.LinkedList; import org.neo4j.causalclustering.helpers.Buffers; @@ -36,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -public class BoundedNetworkChannelTest +public class ChunkingNetworkChannelTest { @Rule public final Buffers buffers = new Buffers(); @@ -47,7 +46,7 @@ public void shouldSerializeIntoChunksOfGivenSize() // given int chunkSize = 8; LinkedList byteBufs = new LinkedList<>(); - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, byteBufs ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, byteBufs ); // and data is written byte[] array = new byte[10]; @@ -89,7 +88,7 @@ public void shouldReturnNullIfQueueIsEmpty() int chunkSize = 8; LinkedList byteBufs = new LinkedList<>(); - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, byteBufs ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, byteBufs ); // when channel.putLong( 1L ); @@ -117,7 +116,7 @@ public void shouldReturnNullIfQueueIsEmpty() public void shouldThrowIllegalStatAfterClosed() { int chunkSize = 8; - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, new LinkedList<>() ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, new LinkedList<>() ); channel.close(); channel.putInt( 1 ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java index 94506fce13a73..ac2d4d3ec5798 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java @@ -44,7 +44,7 @@ import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.helpers.Buffers; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal; @@ -79,7 +79,7 @@ public void shouldSerializeAndDeserialize() throws Exception { ChannelMarshal coreReplicatedContentMarshal = CoreReplicatedContentMarshal.marshaller(); ByteBuf buffer = buffers.buffer(); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( buffer ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( buffer ); coreReplicatedContentMarshal.marshal( replicatedContent, channel ); NetworkReadableClosableChannelNetty4 readChannel = new NetworkReadableClosableChannelNetty4( buffer );