From a00a1f64f3ec112cae875c92c2929d8b5b7f8b96 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Mon, 21 May 2018 15:12:54 +0200 Subject: [PATCH] Skip unnecessary allocation through extra encoding --- .../v2/RaftProtocolClientInstaller.java | 2 - .../tx/ReplicatedTransactionSerializer.java | 45 +----------- .../marshalling/ByteArraySerializer.java | 71 +++++++++++++++++++ .../marshalling/ChunkedReplicatedContent.java | 32 ++++++--- .../marshalling/ReplicatedContentChunk.java | 9 +-- .../ReplicatedContentChunkEncoder.java | 36 ---------- ...ChunkedReplicatedTransactionInputTest.java | 9 +-- 7 files changed, 101 insertions(+), 103 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ByteArraySerializer.java delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java index 8dae7b2210ec8..00b1b3e0b3180 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; @@ -76,7 +75,6 @@ public void install( Channel channel ) throws Exception .addFraming() .add( "raft_message_encoder", new RaftMessageEncoder() ) .add( "raft_content_type_encoder", new ContentTypeEncoder() ) - .add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() ) .add( "raft_chunked_writer", new ChunkedWriteHandler( ) ) .add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() ) .add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java index 8cfbb280052e0..1cff684d1ad5d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java @@ -24,9 +24,9 @@ import io.netty.buffer.ByteBuf; -import java.io.ByteArrayInputStream; import java.io.IOException; +import org.neo4j.causalclustering.messaging.marshalling.ByteArraySerializer; import org.neo4j.causalclustering.messaging.marshalling.Serializer; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -71,47 +71,6 @@ public static ReplicatedTransaction unmarshal( ByteBuf buffer ) public static Serializer serializer( ReplicatedTransaction replicatedTransaction ) { - return new TxSerializer( replicatedTransaction ); - } - - private static class TxSerializer implements Serializer - { - private final ReplicatedTransaction replicatedTransaction; - private final ByteArrayInputStream inputStream; - - TxSerializer( ReplicatedTransaction replicatedTransaction ) - { - inputStream = new ByteArrayInputStream( replicatedTransaction.getTxBytes() ); - this.replicatedTransaction = replicatedTransaction; - } - - @Override - public boolean encode( ByteBuf byteBuf ) throws IOException - { - if ( inputStream.available() == replicatedTransaction.getTxBytes().length ) - { - byteBuf.writeInt( replicatedTransaction.getTxBytes().length ); - } - if ( !hasBytes() ) - { - return false; - } - int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() ); - byteBuf.writeBytes( inputStream, toWrite ); - return hasBytes(); - } - - private boolean hasBytes() - { - return inputStream.available() > 0; - } - - @Override - public void marshal( WritableChannel channel ) throws IOException - { - int length = replicatedTransaction.getTxBytes().length; - channel.putInt( length ); - channel.put( replicatedTransaction.getTxBytes(), length ); - } + return new ByteArraySerializer( replicatedTransaction.getTxBytes() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ByteArraySerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ByteArraySerializer.java new file mode 100644 index 0000000000000..d9d375d45faaf --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ByteArraySerializer.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.messaging.marshalling; + +import io.netty.buffer.ByteBuf; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.neo4j.storageengine.api.WritableChannel; + +public class ByteArraySerializer implements Serializer +{ + private final byte[] content; + private final ByteArrayInputStream inputStream; + + public ByteArraySerializer( byte[] content ) + { + inputStream = new ByteArrayInputStream( content ); + this.content = content; + } + + @Override + public boolean encode( ByteBuf byteBuf ) throws IOException + { + if ( inputStream.available() == content.length ) + { + byteBuf.writeInt( content.length ); + } + if ( !hasBytes() ) + { + return false; + } + int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() ); + byteBuf.writeBytes( inputStream, toWrite ); + return hasBytes(); + } + + private boolean hasBytes() + { + return inputStream.available() > 0; + } + + @Override + public void marshal( WritableChannel channel ) throws IOException + { + int length = content.length; + channel.putInt( length ); + channel.put( content, length ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java index ab6657bb51c96..33722e831ed0d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -31,7 +31,7 @@ import org.neo4j.storageengine.api.WritableChannel; -public class ChunkedReplicatedContent implements Marshal, ChunkedInput +public class ChunkedReplicatedContent implements Marshal, ChunkedInput { private static final int DEFAULT_CHUNK_SIZE = 8192; @@ -77,25 +77,41 @@ public void close() } @Override - public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws IOException + public ByteBuf readChunk( ChannelHandlerContext ctx ) throws IOException { return readChunk( ctx.alloc() ); } @Override - public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws IOException + public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException { - if ( isEndOfInput() ) + boolean endOfInput = isEndOfInput(); + if ( endOfInput ) { return null; } ByteBuf buffer = allocator.buffer( chunkSize ); - if ( !serializer.encode( buffer ) ) + try { - lastByteWasWritten = true; + // transfer to buffer + buffer.writeByte( contentType ); + buffer.writeBoolean( endOfInput ); + if ( !serializer.encode( buffer ) ) + { + lastByteWasWritten = true; + } + if ( isEndOfInput() != endOfInput ) + { + buffer.setBoolean( 1, isEndOfInput() ); + } + progress += buffer.readableBytes(); + return buffer; + } + catch ( IOException e ) + { + buffer.release(); + throw e; } - progress += buffer.readableBytes(); - return new ReplicatedContentChunk( contentType, isEndOfInput(), buffer ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java index 9f98759c546a6..45658051d330b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java @@ -30,7 +30,7 @@ public class ReplicatedContentChunk extends DefaultByteBufHolder private final byte contentType; private final boolean isLast; - ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data ) + private ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data ) { super( data ); this.contentType = contentType; @@ -47,13 +47,6 @@ public byte contentType() return contentType; } - public void encode( ByteBuf out ) - { - out.writeByte( contentType() ); - out.writeBoolean( isLast() ); - out.writeBytes( content() ); - } - public static ReplicatedContentChunk deSerialize( ByteBuf in ) { byte txContentType = in.readByte(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java deleted file mode 100644 index 45199c5791c78..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.messaging.marshalling; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; - -public class ReplicatedContentChunkEncoder extends MessageToByteEncoder -{ - @Override - protected void encode( ChannelHandlerContext ctx, ReplicatedContentChunk msg, ByteBuf out ) - { - msg.encode( out ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java index bd9de904b2045..5dd929bf32753 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java @@ -49,15 +49,12 @@ public void shouldEncodeAndDecode() throws Exception ByteBuf composedDeserialized = Unpooled.buffer(); while ( !chunkedReplicatedTransactionInput.isEndOfInput() ) { - ReplicatedContentChunk chunk = chunkedReplicatedTransactionInput.readChunk( allocator ); + ByteBuf chunk = chunkedReplicatedTransactionInput.readChunk( allocator ); - ByteBuf buffer = Unpooled.buffer(); - chunk.encode( buffer ); - - ReplicatedContentChunk deserializedChunk = ReplicatedContentChunk.deSerialize( buffer ); + ReplicatedContentChunk deserializedChunk = ReplicatedContentChunk.deSerialize( chunk ); composedDeserialized.writeBytes( deserializedChunk.content() ); - buffer.release(); + chunk.release(); } byte[] array = Arrays.copyOf( composedDeserialized.array(), composedDeserialized.readableBytes() ); assertEquals( replicatedTransaction, new ReplicatedTransaction( array ) );