From a18a4bc822673ea8942fcfe3bb0db96972c1d5a7 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Thu, 6 Sep 2018 15:44:18 +0200 Subject: [PATCH] Refactor network files and fix tx marshalling in RaftV1 The Netty dependent Channels that is used in causal clustering now implements WritableChannel. Before they implemented FlushableChannel for no apparent reason. They have also been renamed. Marshalling for RaftV1 broke throughout development. By using the now introduced ByteBufBacked interface, the tx marhalling for RaftV1 can be optimized, writing directly to ioBuffer. --- .../catchup/storecopy/FileChunkEncoder.java | 4 +- .../storecopy/GetIndexFilesRequest.java | 4 +- .../storecopy/GetStoreFileRequest.java | 4 +- .../storecopy/GetStoreIdResponseEncoder.java | 4 +- .../PrepareStoreCopyRequestEncoder.java | 4 +- .../storecopy/PrepareStoreCopyResponse.java | 4 +- .../catchup/tx/TxPullRequestEncoder.java | 4 +- .../catchup/tx/TxPullResponseEncoder.java | 4 +- .../ReplicatedTokenRequestSerializer.java | 4 +- .../state/machines/tx/ChunkedTransaction.java | 28 +++++++++-- .../tx/ReplicatedTransactionSerializer.java | 47 ++++++++++++++----- .../state/snapshot/CoreSnapshotEncoder.java | 4 +- ...ava => BoundedNetworkWritableChannel.java} | 41 +++++++--------- .../messaging/ByteBufBacked.java | 30 ++++++++++++ ...annel.java => ChunkingNetworkChannel.java} | 4 +- ...teBuf.java => NetworkWritableChannel.java} | 31 +++++------- .../marshalling/ChunkedReplicatedContent.java | 4 +- .../marshalling/v1/RaftMessageEncoder.java | 25 ++++------ .../v2/encoding/RaftMessageEncoder.java | 8 ++-- .../membership/MemberIdMarshalTest.java | 6 +-- .../membership/RaftMembershipStateTest.java | 4 +- .../CoreReplicatedContentMarshalTest.java | 6 +-- ...presentationReplicatedTransactionTest.java | 39 +++++++++++++-- ...=> BoundedNetworkWritableChannelTest.java} | 8 ++-- ...t.java => ChunkingNetworkChannelTest.java} | 9 ++-- .../CoreReplicatedContentMarshallingTest.java | 4 +- 26 files changed, 208 insertions(+), 126 deletions(-) rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/{NetworkFlushableChannelNetty4.java => BoundedNetworkWritableChannel.java} (76%) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/{BoundedNetworkChannel.java => ChunkingNetworkChannel.java} (97%) rename enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/{NetworkFlushableByteBuf.java => NetworkWritableChannel.java} (74%) rename enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/{NetworkFlushableChannelNetty4Test.java => BoundedNetworkWritableChannelTest.java} (88%) rename enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/{BoundedNetworkChannelTest.java => ChunkingNetworkChannelTest.java} (90%) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java index 355b9df5659c4..1b320698d1d7f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileChunkEncoder.java @@ -26,13 +26,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; public class FileChunkEncoder extends MessageToByteEncoder { @Override protected void encode( ChannelHandlerContext ctx, FileChunk chunk, ByteBuf out ) throws Exception { - FileChunk.marshal().marshal( chunk, new NetworkFlushableByteBuf( out ) ); + FileChunk.marshal().marshal( chunk, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java index 7c5c4d7947e96..5d4c998740332 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexFilesRequest.java @@ -34,7 +34,7 @@ import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; @@ -102,7 +102,7 @@ public static class Encoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, GetIndexFilesRequest msg, ByteBuf out ) throws Exception { - new IndexSnapshotRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); + new IndexSnapshotRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java index c148792053a1a..53daad62c47ec 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreFileRequest.java @@ -35,7 +35,7 @@ import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; @@ -108,7 +108,7 @@ public static class Encoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, GetStoreFileRequest msg, ByteBuf out ) throws Exception { - new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); + new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java index fab6ca371811d..155c98590674c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseEncoder.java @@ -28,7 +28,7 @@ import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; public class GetStoreIdResponseEncoder extends MessageToByteEncoder @@ -36,6 +36,6 @@ public class GetStoreIdResponseEncoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, StoreId storeId, ByteBuf out ) throws Exception { - StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkFlushableByteBuf( out ) ); + StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java index e83775b82e7f9..32720dfe6cf3f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestEncoder.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder @@ -34,6 +34,6 @@ public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder @@ -35,6 +35,6 @@ public class TxPullRequestEncoder extends MessageToByteEncoder protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception { out.writeLong( request.previousTxId() ); - StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) ); + StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new BoundedNetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java index 77fc4f0a725c9..d120e439c75ae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter; @@ -35,7 +35,7 @@ public class TxPullResponseEncoder extends MessageToByteEncoder @Override protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception { - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel ); new LogEntryWriter( channel ).serialize( response.tx() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java index 4041dfea74c12..ffeef80935327 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java @@ -31,7 +31,7 @@ import java.util.LinkedList; import java.util.List; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.StringMarshal; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; @@ -97,7 +97,7 @@ public static ReplicatedTokenRequest unmarshal( ByteBuf buffer ) public static byte[] commandBytes( Collection commands ) { ByteBuf commandBuffer = Unpooled.buffer(); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( commandBuffer ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( commandBuffer ); try { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java index da0cace081299..cccf27d093cdc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedTransaction.java @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; @@ -10,14 +32,14 @@ import java.util.Queue; import org.neo4j.causalclustering.helper.ErrorHandler; -import org.neo4j.causalclustering.messaging.BoundedNetworkChannel; +import org.neo4j.causalclustering.messaging.ChunkingNetworkChannel; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; class ChunkedTransaction implements ChunkedInput { private static final int CHUNK_SIZE = 32 * 1024; private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter; - private BoundedNetworkChannel channel; + private ChunkingNetworkChannel channel; private Queue chunks = new LinkedList<>(); ChunkedTransaction( TransactionRepresentation tx ) @@ -80,7 +102,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception if ( channel == null ) { // Ensure that the written buffers does not overflow the allocators chunk size. - channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks ); + channel = new ChunkingNetworkChannel( allocator, CHUNK_SIZE, chunks ); /* Unknown length. The reason for sending this int is to avoid conflicts with Raft V1. This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1. diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java index 3fec22cd17543..691088f6f0007 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java @@ -28,8 +28,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.neo4j.causalclustering.messaging.ByteBufBacked; import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder; import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -70,20 +72,43 @@ public static void marshal( WritableChannel writableChannel, ByteArrayReplicated public static void marshal( WritableChannel writableChannel, TransactionRepresentationReplicatedTransaction replicatedTransaction ) throws IOException { - /* - Unknown length. This method will never be used in production. When a ReplicatedTransaction is serialized it has already passed over the network - and a more efficient marshalling is used in ByteArrayReplicatedTransaction. - */ - ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( replicatedTransaction.tx() ); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 ); - OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream ); + if ( writableChannel instanceof ByteBufBacked ) + { + /* + * Marshals more efficiently if Channel is going over the network. In practice, this means maintaining support for + * RaftV1 without loosing performance + */ + ByteBuf buffer = ((ByteBufBacked) writableChannel).byteBuf(); + int metaDataIndex = buffer.writerIndex(); + int txStartIndex = metaDataIndex + Integer.BYTES; + // leave room for length to be set later. + buffer.writerIndex( txStartIndex ); + writeTx( writableChannel, replicatedTransaction.tx() ); + int txLength = buffer.writerIndex() - txStartIndex; + buffer.setInt( metaDataIndex, txLength ); + } + else + { + /* + * Unknown length. This should only be reached in tests. When a ReplicatedTransaction is marshaled to file it has already passed over the network + * and is of a different type. More efficient marshalling is used in ByteArrayReplicatedTransaction. + */ + ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 ); + OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream ); + writeTx( outputStreamWritableChannel, replicatedTransaction.tx() ); + int length = outputStream.size(); + writableChannel.putInt( length ); + writableChannel.put( outputStream.toByteArray(), length ); + } + } + + private static void writeTx( WritableChannel writableChannel, TransactionRepresentation tx ) throws IOException + { + ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( tx ); while ( txWriter.canWrite() ) { - txWriter.write( outputStreamWritableChannel ); + txWriter.write( writableChannel ); } - int length = outputStream.size(); - writableChannel.putInt( length ); - writableChannel.put( outputStream.toByteArray(), length ); } public static ChunkedInput encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java index 91d405f3b86b4..1c773f21d7050 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java @@ -26,13 +26,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; public class CoreSnapshotEncoder extends MessageToByteEncoder { @Override protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, ByteBuf out ) throws Exception { - new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( out ) ); + new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkWritableChannel( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java similarity index 76% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java index 61c0400458fbb..b56bfb066e987 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannel.java @@ -24,14 +24,12 @@ import io.netty.buffer.ByteBuf; -import java.io.Flushable; - -import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.storageengine.api.WritableChannel; import static java.lang.String.format; import static org.neo4j.io.ByteUnit.mebiBytes; -public class NetworkFlushableChannelNetty4 implements FlushableChannel +public class BoundedNetworkWritableChannel implements WritableChannel, ByteBufBacked { /** * This implementation puts an upper limit to the size of the state serialized in the buffer. The default @@ -45,12 +43,12 @@ public class NetworkFlushableChannelNetty4 implements FlushableChannel private final long sizeLimit; - public NetworkFlushableChannelNetty4( ByteBuf delegate ) + public BoundedNetworkWritableChannel( ByteBuf delegate ) { this( delegate, DEFAULT_SIZE_LIMIT ); } - public NetworkFlushableChannelNetty4( ByteBuf delegate, long sizeLimit ) + public BoundedNetworkWritableChannel( ByteBuf delegate, long sizeLimit ) { this.delegate = delegate; this.initialWriterIndex = delegate.writerIndex(); @@ -58,13 +56,7 @@ public NetworkFlushableChannelNetty4( ByteBuf delegate, long sizeLimit ) } @Override - public Flushable prepareForFlush() - { - return null; - } - - @Override - public FlushableChannel put( byte value ) throws MessageTooBigException + public WritableChannel put( byte value ) throws MessageTooBigException { checkSize( Byte.BYTES ); delegate.writeByte( value ); @@ -72,7 +64,7 @@ public FlushableChannel put( byte value ) throws MessageTooBigException } @Override - public FlushableChannel putShort( short value ) throws MessageTooBigException + public WritableChannel putShort( short value ) throws MessageTooBigException { checkSize( Short.BYTES ); delegate.writeShort( value ); @@ -80,7 +72,7 @@ public FlushableChannel putShort( short value ) throws MessageTooBigException } @Override - public FlushableChannel putInt( int value ) throws MessageTooBigException + public WritableChannel putInt( int value ) throws MessageTooBigException { checkSize( Integer.BYTES ); delegate.writeInt( value ); @@ -88,7 +80,7 @@ public FlushableChannel putInt( int value ) throws MessageTooBigException } @Override - public FlushableChannel putLong( long value ) throws MessageTooBigException + public WritableChannel putLong( long value ) throws MessageTooBigException { checkSize( Long.BYTES ); delegate.writeLong( value ); @@ -96,7 +88,7 @@ public FlushableChannel putLong( long value ) throws MessageTooBigException } @Override - public FlushableChannel putFloat( float value ) throws MessageTooBigException + public WritableChannel putFloat( float value ) throws MessageTooBigException { checkSize( Float.BYTES ); delegate.writeFloat( value ); @@ -104,7 +96,7 @@ public FlushableChannel putFloat( float value ) throws MessageTooBigException } @Override - public FlushableChannel putDouble( double value ) throws MessageTooBigException + public WritableChannel putDouble( double value ) throws MessageTooBigException { checkSize( Double.BYTES ); delegate.writeDouble( value ); @@ -112,18 +104,13 @@ public FlushableChannel putDouble( double value ) throws MessageTooBigException } @Override - public FlushableChannel put( byte[] value, int length ) throws MessageTooBigException + public WritableChannel put( byte[] value, int length ) throws MessageTooBigException { checkSize( length ); delegate.writeBytes( value, 0, length ); return this; } - @Override - public void close() - { - } - private void checkSize( int additional ) throws MessageTooBigException { int writtenSoFar = delegate.writerIndex() - initialWriterIndex; @@ -135,4 +122,10 @@ private void checkSize( int additional ) throws MessageTooBigException sizeLimit, additional, delegate.writerIndex(), initialWriterIndex, writtenSoFar ) ); } } + + @Override + public ByteBuf byteBuf() + { + return delegate; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java new file mode 100644 index 0000000000000..28c422e4bf16e --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ByteBufBacked.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.buffer.ByteBuf; + +public interface ByteBufBacked +{ + ByteBuf byteBuf(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java similarity index 97% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java index 84180217312b7..f53509c317ac7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannel.java @@ -36,7 +36,7 @@ * Uses provided allocator to create {@link ByteBuf}. The buffers will be split if maximum size is reached. The full buffer is then added * to the provided output and a new buffer is allocated. If the output queue is bounded then writing to this channel may block! */ -public class BoundedNetworkChannel implements WritableChannel, AutoCloseable +public class ChunkingNetworkChannel implements WritableChannel, AutoCloseable { private static final int DEFAULT_INIT_CHUNK_SIZE = 512; private final ByteBufAllocator allocator; @@ -51,7 +51,7 @@ public class BoundedNetworkChannel implements WritableChannel, AutoCloseable * @param maxChunkSize when reached the current buffer will be moved to the @param outputQueue and a new {@link ByteBuf} is allocated * @param outputQueue full or flushed buffers are added here. If this queue is bounded then writing to this channel may block! */ - public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue outputQueue ) + public ChunkingNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue outputQueue ) { Objects.requireNonNull( allocator, "allocator cannot be null" ); Objects.requireNonNull( outputQueue, "outputQueue cannot be null" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java similarity index 74% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java index bf84b71b15e91..6144f86905f04 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkFlushableByteBuf.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NetworkWritableChannel.java @@ -27,77 +27,68 @@ import io.netty.buffer.ByteBuf; import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.storageengine.api.WritableChannel; -public class NetworkFlushableByteBuf implements FlushableChannel +public class NetworkWritableChannel implements WritableChannel, ByteBufBacked { private final ByteBuf delegate; - public NetworkFlushableByteBuf( ByteBuf byteBuf ) + public NetworkWritableChannel( ByteBuf byteBuf ) { this.delegate = byteBuf; } @Override - public FlushableChannel put( byte value ) + public WritableChannel put( byte value ) { delegate.writeByte( value ); return this; } @Override - public FlushableChannel putShort( short value ) + public WritableChannel putShort( short value ) { delegate.writeShort( value ); return this; } @Override - public FlushableChannel putInt( int value ) + public WritableChannel putInt( int value ) { delegate.writeInt( value ); return this; } @Override - public FlushableChannel putLong( long value ) + public WritableChannel putLong( long value ) { delegate.writeLong( value ); return this; } @Override - public FlushableChannel putFloat( float value ) + public WritableChannel putFloat( float value ) { delegate.writeFloat( value ); return this; } @Override - public FlushableChannel putDouble( double value ) + public WritableChannel putDouble( double value ) { delegate.writeDouble( value ); return this; } @Override - public FlushableChannel put( byte[] value, int length ) + public WritableChannel put( byte[] value, int length ) { delegate.writeBytes( value, 0, length ); return this; } @Override - public void close() - { - } - - @Override - public Flushable prepareForFlush() - { - return null; - } - - public ByteBuf buffer() + public ByteBuf byteBuf() { return delegate; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java index a12bd1b5d467b..ab7c825975b9c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -30,7 +30,7 @@ import java.io.IOException; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.function.ThrowingConsumer; import org.neo4j.storageengine.api.WritableChannel; @@ -173,7 +173,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception return null; } ByteBuf buffer = allocator.buffer(); - marshaller.accept( new NetworkFlushableChannelNetty4( buffer ) ); + marshaller.accept( new BoundedNetworkWritableChannel( buffer ) ); isEndOfInput = true; offset = buffer.readableBytes(); return buffer; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java index 94f5377ea51fe..b6eb3864883e4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/RaftMessageEncoder.java @@ -29,11 +29,12 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.core.state.machines.tx.TransactionRepresentationReplicatedTransaction; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; +import org.neo4j.io.ByteUnit; public class RaftMessageEncoder extends MessageToByteEncoder { @@ -53,7 +54,7 @@ public synchronized void encode( ChannelHandlerContext ctx, ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); memberMarshal.marshal( message.from(), channel ); @@ -65,9 +66,9 @@ private static class Handler implements RaftMessages.Handler { private final ChannelMarshal marshal; private final MemberId.Marshal memberMarshal; - private final NetworkFlushableByteBuf channel; + private final NetworkWritableChannel channel; - Handler( ChannelMarshal marshal, MemberId.Marshal memberMarshal, NetworkFlushableByteBuf channel ) + Handler( ChannelMarshal marshal, MemberId.Marshal memberMarshal, NetworkWritableChannel channel ) { this.marshal = marshal; this.memberMarshal = memberMarshal; @@ -147,18 +148,8 @@ public Void handle( RaftMessages.AppendEntries.Response appendResponse ) @Override public Void handle( RaftMessages.NewEntry.Request newEntryRequest ) throws Exception { - ReplicatedContent content = newEntryRequest.content(); - ByteBuf buffer = channel.buffer(); - int contentStartIndex = buffer.writerIndex() + 1; - marshal.marshal( content, channel ); - if ( content instanceof TransactionRepresentationReplicatedTransaction ) - { - // TransactionRepresentationReplicatedTransaction does not support marshal because it has unknown size - int contentEndIndex = buffer.writerIndex(); - int size = contentEndIndex - contentStartIndex - Integer.BYTES; // the integer is the length integer which should be excluded - buffer.setInt( contentStartIndex, size ); - } - + BoundedNetworkWritableChannel sizeBoundChannel = new BoundedNetworkWritableChannel( channel.byteBuf(), ByteUnit.gibiBytes( 1 ) ); + marshal.marshal( newEntryRequest.content(), sizeBoundChannel ); return null; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java index 3cb661cadca07..5edb6feee3c13 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageEncoder.java @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; public class RaftMessageEncoder extends MessageToByteEncoder @@ -41,7 +41,7 @@ protected void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMes ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); + NetworkWritableChannel channel = new NetworkWritableChannel( out ); channel.put( ContentType.Message.get() ); ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); @@ -53,9 +53,9 @@ protected void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMes private static class Handler implements RaftMessages.Handler { private final MemberId.Marshal memberMarshal; - private final NetworkFlushableByteBuf channel; + private final NetworkWritableChannel channel; - Handler( MemberId.Marshal memberMarshal, NetworkFlushableByteBuf channel ) + Handler( MemberId.Marshal memberMarshal, NetworkWritableChannel channel ) { this.memberMarshal = memberMarshal; this.channel = channel; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java index 8e377b02522f5..5f1e6eb9452ce 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/MemberIdMarshalTest.java @@ -28,7 +28,7 @@ import java.util.UUID; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.identity.MemberId; @@ -48,7 +48,7 @@ public void shouldSerializeAndDeserialize() throws Exception // when ByteBuf buffer = Unpooled.buffer( 1_000 ); - marshal.marshal( member, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( member, new BoundedNetworkWritableChannel( buffer ) ); final MemberId recovered = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); // then @@ -66,7 +66,7 @@ public void shouldThrowExceptionForHalfWrittenInstance() throws Exception ByteBuf buffer = Unpooled.buffer( 1000 ); // and the CoreMember is serialized but for 5 bytes at the end - marshal.marshal( aRealMember, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( aRealMember, new BoundedNetworkWritableChannel( buffer ) ); ByteBuf bufferWithMissingBytes = buffer.copy( 0, buffer.writerIndex() - 5 ); // when diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java index f76086b23e2d7..33cc95f75ec4b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/membership/RaftMembershipStateTest.java @@ -29,7 +29,7 @@ import java.util.Set; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import static org.hamcrest.MatcherAssert.assertThat; @@ -143,7 +143,7 @@ public void shouldMarshalCorrectly() throws Exception // when ByteBuf buffer = Unpooled.buffer( 1_000 ); - marshal.marshal( state, new NetworkFlushableChannelNetty4( buffer ) ); + marshal.marshal( state, new BoundedNetworkWritableChannel( buffer ) ); final RaftMembershipState recovered = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); // then diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java index 1fcfda4ac9f04..5226e82a4735d 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java @@ -41,7 +41,7 @@ import org.neo4j.causalclustering.core.state.machines.tx.TransactionRepresentationReplicatedTransaction; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal; @@ -127,7 +127,7 @@ public void shouldMarshalTokenRequest() throws Exception private void assertMarshalingEquality( ByteBuf buffer, ReplicatedContent replicatedTx ) throws IOException, EndOfStreamException { - marshal.marshal( replicatedTx, new NetworkFlushableByteBuf( buffer ) ); + marshal.marshal( replicatedTx, new NetworkWritableChannel( buffer ) ); assertThat( marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ), equalTo( replicatedTx ) ); } @@ -135,7 +135,7 @@ private void assertMarshalingEquality( ByteBuf buffer, ReplicatedContent replica private void assertMarshalingEquality( ByteBuf buffer, TransactionRepresentationReplicatedTransaction replicatedTx ) throws IOException, EndOfStreamException { - marshal.marshal( replicatedTx, new NetworkFlushableByteBuf( buffer ) ); + marshal.marshal( replicatedTx, new NetworkWritableChannel( buffer ) ); ReplicatedContent unmarshal = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java index 4f327741fba87..ad3ef6d050dbe 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransactionTest.java @@ -23,14 +23,20 @@ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.helpers.Buffers; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; +import org.neo4j.causalclustering.messaging.NetworkWritableChannel; +import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; @@ -39,6 +45,9 @@ public class TransactionRepresentationReplicatedTransactionTest { + @Rule + public final Buffers buffers = new Buffers(); + @Test public void shouldDecodeAndUnmarshalSameBytes() throws IOException { @@ -47,9 +56,9 @@ public void shouldDecodeAndUnmarshalSameBytes() throws IOException expectedTx.setHeader( new byte[0], 1, 2, 3, 4, 5, 6 ); - ByteBuf buffer = Unpooled.buffer(); + ByteBuf buffer = buffers.buffer(); TransactionRepresentationReplicatedTransaction replicatedTransaction = ReplicatedTransaction.from( expectedTx ); - replicatedTransaction.marshal( new NetworkFlushableChannelNetty4( buffer ) ); + replicatedTransaction.marshal( new BoundedNetworkWritableChannel( buffer ) ); ReplicatedTransaction decoded = ReplicatedTransactionSerializer.unmarshal( buffer ); buffer.readerIndex( 0 ); @@ -59,4 +68,26 @@ public void shouldDecodeAndUnmarshalSameBytes() throws IOException buffer.release(); } + + @Test + public void shouldMarshalToSameByteIfByteBufBackedOrNot() throws IOException + { + PhysicalTransactionRepresentation expectedTx = + new PhysicalTransactionRepresentation( Collections.singleton( new Command.NodeCommand( new NodeRecord( 1 ), new NodeRecord( 2 ) ) ) ); + + expectedTx.setHeader( new byte[0], 1, 2, 3, 4, 5, 6 ); + TransactionRepresentationReplicatedTransaction replicatedTransaction = ReplicatedTransaction.from( expectedTx ); + + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ByteBuf buffer = buffers.buffer(); + OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( stream ); + NetworkWritableChannel networkWritableChannel = new NetworkWritableChannel( buffer ); + + replicatedTransaction.marshal( outputStreamWritableChannel ); + replicatedTransaction.marshal( networkWritableChannel ); + + byte[] bufferArray = Arrays.copyOf( buffer.array(), buffer.writerIndex() ); + + Assertions.assertArrayEquals( bufferArray, stream.toByteArray() ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java similarity index 88% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java index 1f2ccf4af66d1..61d57db9e1055 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/NetworkFlushableChannelNetty4Test.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkWritableChannelTest.java @@ -28,14 +28,14 @@ import static org.junit.Assert.fail; -public class NetworkFlushableChannelNetty4Test +public class BoundedNetworkWritableChannelTest { @Test public void shouldRespectSizeLimit() throws Exception { // Given int sizeLimit = 100; - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( Unpooled.buffer(), sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( Unpooled.buffer(), sizeLimit ); // when for ( int i = 0; i < sizeLimit; i++ ) @@ -59,7 +59,7 @@ public void sizeLimitShouldWorkWithArrays() throws Exception { // Given int sizeLimit = 100; - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( Unpooled.buffer(), sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( Unpooled.buffer(), sizeLimit ); // When int padding = 10; @@ -89,7 +89,7 @@ public void shouldNotCountBytesAlreadyInBuffer() throws Exception int padding = Long.BYTES; buffer.writeLong( 0 ); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( buffer, sizeLimit ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( buffer, sizeLimit ); // When for ( int i = 0; i < sizeLimit - padding; i++ ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java similarity index 90% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java index fa25c8c3eceb5..a95604af6fe4a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChunkingNetworkChannelTest.java @@ -26,7 +26,6 @@ import org.junit.Rule; import org.junit.Test; -import java.io.IOException; import java.util.LinkedList; import org.neo4j.causalclustering.helpers.Buffers; @@ -36,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -public class BoundedNetworkChannelTest +public class ChunkingNetworkChannelTest { @Rule public final Buffers buffers = new Buffers(); @@ -47,7 +46,7 @@ public void shouldSerializeIntoChunksOfGivenSize() // given int chunkSize = 8; LinkedList byteBufs = new LinkedList<>(); - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, byteBufs ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, byteBufs ); // and data is written byte[] array = new byte[10]; @@ -89,7 +88,7 @@ public void shouldReturnNullIfQueueIsEmpty() int chunkSize = 8; LinkedList byteBufs = new LinkedList<>(); - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, byteBufs ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, byteBufs ); // when channel.putLong( 1L ); @@ -117,7 +116,7 @@ public void shouldReturnNullIfQueueIsEmpty() public void shouldThrowIllegalStatAfterClosed() { int chunkSize = 8; - BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, new LinkedList<>() ); + ChunkingNetworkChannel channel = new ChunkingNetworkChannel( buffers, chunkSize, new LinkedList<>() ); channel.close(); channel.putInt( 1 ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java index 94506fce13a73..ac2d4d3ec5798 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java @@ -44,7 +44,7 @@ import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.helpers.Buffers; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal; @@ -79,7 +79,7 @@ public void shouldSerializeAndDeserialize() throws Exception { ChannelMarshal coreReplicatedContentMarshal = CoreReplicatedContentMarshal.marshaller(); ByteBuf buffer = buffers.buffer(); - NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( buffer ); + BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( buffer ); coreReplicatedContentMarshal.marshal( replicatedContent, channel ); NetworkReadableClosableChannelNetty4 readChannel = new NetworkReadableClosableChannelNetty4( buffer );