diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java index b01a0ca56b11..46b0a3d15abd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java @@ -32,7 +32,6 @@ import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageEncoder; -import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.SerializableContentEncoder; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -76,7 +75,6 @@ public void install( Channel channel ) throws Exception .add( "raft_content_type_encoder", new ContentTypeEncoder() ) .add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() ) .add( "raft_chunked_writer", new ChunkedWriteHandler( ) ) - .add( "raft_content_encoder", new SerializableContentEncoder() ) .add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() ) .add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) ) .install(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java index a782de59b75f..f6026993224e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java @@ -27,10 +27,11 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher; +import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer; +import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ReplicatedContentDecoder; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -78,6 +79,7 @@ public void install( Channel channel ) throws Exception .addFraming() .add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) ) .add( "raft_component_decoder", new DecodingDispatcher( contentTypeProtocol, logProvider ) ) + .add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) ) .add( "raft_message_composer", new RaftMessageComposer( Clock.systemUTC() ) ) .add( "raft_handler", raftMessageHandler ) .install(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInput.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInput.java deleted file mode 100644 index 506400b19d76..000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInput.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.core.state.machines.tx; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.EmptyByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.stream.ChunkedInput; -import io.netty.handler.stream.ChunkedStream; - -import java.io.ByteArrayInputStream; -import java.io.IOException; - -import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; -import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent; -import org.neo4j.storageengine.api.WritableChannel; - -public class ChunkedReplicatedTransactionInput implements SerializableContent, ChunkedInput -{ - - private final ChunkedStream chunkedStream; - private final byte txContentType; - private final ReplicatedTransaction replicatedTransaction; - - public ChunkedReplicatedTransactionInput( byte txContentType, ReplicatedTransaction replicatedTransaction, int chunkSize ) - { - if ( chunkSize < 4 ) - { - throw new IllegalArgumentException( "Chunk size must be at least 4 bytes" ); - } - this.txContentType = txContentType; - this.replicatedTransaction = replicatedTransaction; - chunkedStream = new ChunkedStream( new ByteArrayInputStream( replicatedTransaction.getTxBytes() ), chunkSize - 3 ); - } - - public ChunkedReplicatedTransactionInput( byte txContentType, ReplicatedTransaction replicatedTransaction ) - { - this.txContentType = txContentType; - this.replicatedTransaction = replicatedTransaction; - chunkedStream = new ChunkedStream( new ByteArrayInputStream( replicatedTransaction.getTxBytes() ) ); - } - - @Override - public void serialize( WritableChannel channel ) throws IOException - { - channel.put( txContentType ); - ReplicatedTransactionSerializer.marshal( replicatedTransaction, channel ); - } - - @Override - public boolean isEndOfInput() throws Exception - { - return chunkedStream.isEndOfInput(); - } - - @Override - public void close() throws Exception - { - chunkedStream.close(); - } - - @Override - public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws Exception - { - return readChunk( ctx.alloc() ); - } - - @Override - public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws Exception - { - boolean isFirst = progress() == 0; - ByteBuf byteBuf = chunkedStream.readChunk( allocator ); - if ( byteBuf == null ) - { - byteBuf = new EmptyByteBuf( allocator ); - } - return new ReplicatedContentChunk( txContentType, isFirst, isEndOfInput(), byteBuf ); - } - - @Override - public long length() - { - return -1; - } - - @Override - public long progress() - { - return chunkedStream.progress(); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java index 08357916fa65..8cfbb280052e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java @@ -24,9 +24,10 @@ import io.netty.buffer.ByteBuf; +import java.io.ByteArrayInputStream; import java.io.IOException; -import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent; +import org.neo4j.causalclustering.messaging.marshalling.Serializer; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -67,4 +68,50 @@ public static ReplicatedTransaction unmarshal( ByteBuf buffer ) return new ReplicatedTransaction( txBytes ); } + + public static Serializer serializer( ReplicatedTransaction replicatedTransaction ) + { + return new TxSerializer( replicatedTransaction ); + } + + private static class TxSerializer implements Serializer + { + private final ReplicatedTransaction replicatedTransaction; + private final ByteArrayInputStream inputStream; + + TxSerializer( ReplicatedTransaction replicatedTransaction ) + { + inputStream = new ByteArrayInputStream( replicatedTransaction.getTxBytes() ); + this.replicatedTransaction = replicatedTransaction; + } + + @Override + public boolean encode( ByteBuf byteBuf ) throws IOException + { + if ( inputStream.available() == replicatedTransaction.getTxBytes().length ) + { + byteBuf.writeInt( replicatedTransaction.getTxBytes().length ); + } + if ( !hasBytes() ) + { + return false; + } + int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() ); + byteBuf.writeBytes( inputStream, toWrite ); + return hasBytes(); + } + + private boolean hasBytes() + { + return inputStream.available() > 0; + } + + @Override + public void marshal( WritableChannel channel ) throws IOException + { + int length = replicatedTransaction.getTxBytes().length; + channel.putInt( length ); + channel.put( replicatedTransaction.getTxBytes(), length ); + } + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java new file mode 100644 index 000000000000..8dcd1548b7af --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging.marshalling; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; + +import java.io.IOException; + +import org.neo4j.storageengine.api.WritableChannel; + +public class ChunkedReplicatedContent implements SerializableContent, ChunkedInput +{ + + private static final int DEFAULT_CHUNK_SIZE = 8192; + private final byte contentType; + private final Serializer serializer; + private final int chunkSize; + private boolean lastByteWasWritten; + private int progress; + + public ChunkedReplicatedContent( byte contentType, Serializer serializer, int chunkSize ) + { + this.serializer = serializer; + this.chunkSize = chunkSize; + if ( chunkSize < 4 ) + { + throw new IllegalArgumentException( "Chunk size must be at least 4 bytes" ); + } + this.contentType = contentType; + } + + public ChunkedReplicatedContent( byte contentType, Serializer serializer ) + { + this( contentType, serializer, DEFAULT_CHUNK_SIZE ); + } + + @Override + public void serialize( WritableChannel channel ) throws IOException + { + channel.put( contentType ); + serializer.marshal( channel ); + } + + @Override + public boolean isEndOfInput() + { + return lastByteWasWritten; + } + + @Override + public void close() + { + // do nothing + } + + @Override + public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws IOException + { + return readChunk( ctx.alloc() ); + } + + @Override + public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws IOException + { + if ( isEndOfInput() ) + { + return null; + } + ByteBuf buffer = allocator.buffer( chunkSize ); + if ( !serializer.encode( buffer ) ) + { + lastByteWasWritten = true; + } + progress += buffer.readableBytes(); + return new ReplicatedContentChunk( contentType, isEndOfInput(), buffer ); + } + + @Override + public long length() + { + return -1; + } + + @Override + public long progress() + { + return progress; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java index 914006b7cb81..c8358e82e3b3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunk.java @@ -24,56 +24,37 @@ public class ReplicatedContentChunk extends DefaultByteBufHolder { - private final byte txContentType; - private final boolean isFirst; + private final byte contentType; private final boolean isLast; - public ReplicatedContentChunk( byte txContentType, boolean isFirst, boolean isLast, ByteBuf data ) + ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data ) { super( data ); - this.txContentType = txContentType; - this.isFirst = isFirst; + this.contentType = contentType; this.isLast = isLast; } - public boolean isFirst() - { - return isFirst; - } - public boolean isLast() { return isLast; } - private byte txContentType() + public byte contentType() { - return txContentType; + return contentType; } - public void serialize( ByteBuf out ) + public void encode( ByteBuf out ) { - out.writeByte( txContentType() ); - out.writeByte( isFirstByte() ); - out.writeByte( isLastByte() ); + out.writeByte( contentType() ); + out.writeBoolean( isLast() ); out.writeBytes( content() ); } public static ReplicatedContentChunk deSerialize( ByteBuf in ) { byte txContentType = in.readByte(); - boolean isFirst = in.readByte() == (byte) 1; - boolean isLast = in.readByte() == (byte) 1; - return new ReplicatedContentChunk( txContentType, isFirst, isLast, in.readRetainedSlice( in.readableBytes() ) ); - } - - private byte isLastByte() - { - return isLast ? (byte) 1 : (byte) 0; - } - - private byte isFirstByte() - { - return isFirst ? (byte) 1 : (byte) 0; + boolean isLast = in.readBoolean(); + return new ReplicatedContentChunk( txContentType, isLast, in.readSlice( in.readableBytes() ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java index 8bf570876cac..0f1d3eabfba5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java @@ -28,6 +28,6 @@ public class ReplicatedContentChunkEncoder extends MessageToByteEncoder. */ -package org.neo4j.causalclustering.messaging.marshalling.v2; +package org.neo4j.causalclustering.messaging.marshalling; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import org.neo4j.function.ThrowingConsumer; import org.neo4j.storageengine.api.WritableChannel; public interface SerializableContent { void serialize( WritableChannel channel ) throws IOException; - - static SimpleSerializableContent simple( byte contentType, ThrowingConsumer serializer ) - { - return new SimpleSerializableContent( contentType, serializer ); - } - - class SimpleSerializableContent implements SerializableContent - { - private final byte contentType; - private final ThrowingConsumer serializer; - - private SimpleSerializableContent( byte contentType, ThrowingConsumer serializer ) - { - this.contentType = contentType; - this.serializer = serializer; - } - - public void serialize( WritableChannel channel ) throws IOException - { - channel.put( contentType ); - serializer.accept( channel ); - } - } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java new file mode 100644 index 000000000000..fea8c3c9e4c8 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging.marshalling; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; + +import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.function.ThrowingConsumer; +import org.neo4j.storageengine.api.WritableChannel; + +public interface Serializer +{ + /** May override buffer allocation size. + * @param channelConsumer + * @return a simple serializer that encodes all the content at once. + */ + static Serializer simple( ThrowingConsumer channelConsumer ) + { + return new Serializer() + { + private boolean consumed; + + @Override + public boolean encode( ByteBuf byteBuf ) throws IOException + { + if ( consumed ) + { + return false; + } + marshal( new NetworkFlushableChannelNetty4( byteBuf ) ); + consumed = true; + return false; + } + + @Override + public void marshal( WritableChannel channel ) throws IOException + { + channelConsumer.accept( channel ); + } + }; + } + + /** + * Writes to byteBuf until full. + * + * @param byteBuf where data will be written + * @return false if there is no more data left to write after this call. + */ + boolean encode( ByteBuf byteBuf ) throws IOException; + + /** + * Writes all content to the channel + * + * @param channel to where data is written. + */ + void marshal( WritableChannel channel ) throws IOException; +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java index be0e0d8d930a..17bd2b4dccd4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java @@ -19,12 +19,9 @@ */ package org.neo4j.causalclustering.messaging.marshalling.v2; -import io.netty.buffer.ByteBuf; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; import java.util.UUID; import org.neo4j.causalclustering.core.consensus.NewLeaderBarrier; @@ -41,19 +38,18 @@ import org.neo4j.causalclustering.core.state.machines.locks.ReplicatedLockTokenSerializer; import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequest; import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequestSerializer; -import org.neo4j.causalclustering.core.state.machines.tx.ChunkedReplicatedTransactionInput; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionSerializer; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; -import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; +import org.neo4j.causalclustering.messaging.marshalling.ChunkedReplicatedContent; +import org.neo4j.causalclustering.messaging.marshalling.SerializableContent; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; import static java.util.Collections.singleton; -import static org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent.simple; +import static org.neo4j.causalclustering.messaging.marshalling.Serializer.simple; public class CoreReplicatedContentSerializer extends SafeChannelMarshal { @@ -70,35 +66,38 @@ public Collection toSerializable( ReplicatedContent content { if ( content instanceof ReplicatedTransaction ) { - return singleton( new ChunkedReplicatedTransactionInput( TX_CONTENT_TYPE, (ReplicatedTransaction) content ) ); + return singleton( new ChunkedReplicatedContent( TX_CONTENT_TYPE, ReplicatedTransactionSerializer.serializer( (ReplicatedTransaction) content ) ) ); } else if ( content instanceof MemberIdSet ) { - return singleton( simple( RAFT_MEMBER_SET_TYPE, channel -> MemberIdSetSerializer.marshal( (MemberIdSet) content, channel ) ) ); + return singleton( new ChunkedReplicatedContent( RAFT_MEMBER_SET_TYPE, + simple( channel -> MemberIdSetSerializer.marshal( (MemberIdSet) content, channel ) ) ) ); } else if ( content instanceof ReplicatedIdAllocationRequest ) { - return singleton( simple( ID_RANGE_REQUEST_TYPE, - channel -> ReplicatedIdAllocationRequestSerializer.marshal( (ReplicatedIdAllocationRequest) content, channel ) ) ); + return singleton( new ChunkedReplicatedContent( ID_RANGE_REQUEST_TYPE, + simple( channel -> ReplicatedIdAllocationRequestSerializer.marshal( (ReplicatedIdAllocationRequest) content, channel ) ) ) ); } else if ( content instanceof ReplicatedTokenRequest ) { - return singleton( simple( TOKEN_REQUEST_TYPE, channel -> ReplicatedTokenRequestSerializer.marshal( (ReplicatedTokenRequest) content, channel ) ) ); + return singleton( new ChunkedReplicatedContent( TOKEN_REQUEST_TYPE, + simple( channel -> ReplicatedTokenRequestSerializer.marshal( (ReplicatedTokenRequest) content, channel ) ) ) ); } else if ( content instanceof NewLeaderBarrier ) { - return singleton( simple( NEW_LEADER_BARRIER_TYPE, channel -> + return singleton( new ChunkedReplicatedContent( NEW_LEADER_BARRIER_TYPE, simple( channel -> { - } ) ); + } ) ) ); } else if ( content instanceof ReplicatedLockTokenRequest ) { - return singleton( simple( LOCK_TOKEN_REQUEST, channel -> ReplicatedLockTokenSerializer.marshal( (ReplicatedLockTokenRequest) content, channel ) ) ); + return singleton( new ChunkedReplicatedContent( LOCK_TOKEN_REQUEST, + simple( channel -> ReplicatedLockTokenSerializer.marshal( (ReplicatedLockTokenRequest) content, channel ) ) ) ); } else if ( content instanceof DistributedOperation ) { - ArrayList list = new ArrayList<>( toSerializable( ((DistributedOperation) content).content() ) ); - list.add( 0, simple( DISTRIBUTED_OPERATION, channel -> + LinkedList list = new LinkedList<>( toSerializable( ((DistributedOperation) content).content() ) ); + list.add( 0, new ChunkedReplicatedContent( DISTRIBUTED_OPERATION, simple( channel -> { channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getMostSignificantBits() ); channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getLeastSignificantBits() ); @@ -106,12 +105,13 @@ else if ( content instanceof DistributedOperation ) channel.putLong( ((DistributedOperation) content).operationId().localSessionId() ); channel.putLong( ((DistributedOperation) content).operationId().sequenceNumber() ); - } ) ); + } ) ) ); return list; } else if ( content instanceof DummyRequest ) { - return singleton( simple( DUMMY_REQUEST, channel -> DummyRequest.Marshal.INSTANCE.marshal( (DummyRequest) content, channel ) ) ); + return singleton( new ChunkedReplicatedContent( DUMMY_REQUEST, + simple( channel -> DummyRequest.Marshal.INSTANCE.marshal( (DummyRequest) content, channel ) ) ) ); } else { @@ -119,50 +119,9 @@ else if ( content instanceof DummyRequest ) } } - public ContentBuilder decode( ByteBuf byteBuf ) throws IOException, EndOfStreamException - { - byte type = byteBuf.readByte(); - byteBuf.readerIndex( byteBuf.readerIndex() - 1 ); - switch ( type ) - { - case TX_CONTENT_TYPE: - ReplicatedContentChunk replicatedContentChunk = ReplicatedContentChunk.deSerialize( byteBuf ); - UnfinishedChunk unfinishedChunk = UnfinishedChunk.create( replicatedContentChunk.content() ); - return new ContentBuilder<>( replicatedContent -> - { - UnfinishedChunk chunk; - if ( replicatedContent != null ) - { - chunk = unfinishedChunk.consume( (UnfinishedChunk) replicatedContent ); - // chunk = ( (UnfinishedChunk) replicatedContent ).consume( unfinishedChunk ); - } - else - { - chunk = unfinishedChunk; - } - if ( !replicatedContentChunk.isFirst() ) - { - return chunk; - } - else - { - int length = chunk.byteBuf.readableBytes(); - byte[] bytes = new byte[length]; - chunk.byteBuf.readBytes( bytes ); - chunk.byteBuf.release(); - return new ReplicatedTransaction( bytes ); - } - }, replicatedContentChunk.isLast() ); - - default: - return read( new NetworkReadableClosableChannelNetty4( byteBuf ) ); - } - } - - public ContentBuilder read( ReadableChannel channel ) throws IOException, EndOfStreamException + public ContentBuilder read( byte contentType, ReadableChannel channel ) throws IOException, EndOfStreamException { - byte type = channel.get(); - switch ( type ) + switch ( contentType ) { case TX_CONTENT_TYPE: return new ContentBuilder<>( ReplicatedTransactionSerializer.unmarshal( channel ) ); @@ -187,46 +146,12 @@ public ContentBuilder read( ReadableChannel channel ) throws long sequenceNumber = channel.getLong(); LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber ); - return new ContentBuilder<>( replicatedContent -> - { - if ( replicatedContent instanceof UnfinishedChunk ) - { - throw new IllegalStateException( "Cannot combine with unfinished chunk" ); - } - return new DistributedOperation( replicatedContent, globalSession, localOperationId ); - }, false ); + return new ContentBuilder<>( replicatedContent -> new DistributedOperation( replicatedContent, globalSession, localOperationId ), false ); } case DUMMY_REQUEST: return new ContentBuilder<>( DummyRequest.Marshal.INSTANCE.unmarshal( channel ) ); default: - throw new IllegalStateException( "Not a recognized content type: " + type ); - } - } - - private static class UnfinishedChunk implements ReplicatedContent - { - ByteBuf byteBuf; - - private UnfinishedChunk( ByteBuf content ) - { - byteBuf = content.copy(); - } - - public static UnfinishedChunk create( ByteBuf content ) - { - return new UnfinishedChunk( content ); - } - - public UnfinishedChunk consume( UnfinishedChunk chunk ) - { - byteBuf.writeBytes( chunk.byteBuf ); - chunk.byteBuf.release(); - return this; - } - - public byte[] array() - { - return Arrays.copyOf( byteBuf.array(), byteBuf.writerIndex() ); + throw new IllegalStateException( "Not a recognized content type: " + contentType ); } } @@ -242,10 +167,12 @@ public void marshal( ReplicatedContent coreReplicatedContent, WritableChannel ch @Override protected ReplicatedContent unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException { - ContentBuilder contentBuilder = read( channel ); + byte type = channel.get(); + ContentBuilder contentBuilder = read( type, channel ); while ( !contentBuilder.isComplete() ) { - contentBuilder = contentBuilder.combine( read( channel ) ); + type = channel.get(); + contentBuilder = contentBuilder.combine( read( type, channel ) ); } return contentBuilder.build(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java index 5ff5995ad40e..653b11d8dea3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java @@ -47,7 +47,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) } } ); register( ContentType.RaftLogEntries, new RaftLogEntryTermDecoder( protocol ) ); - register( ContentType.ReplicatedContent, new ReplicatedContentDecoder( protocol ) ); + register( ContentType.ReplicatedContent, new ReplicatedContentChunkDecoder( ) ); register( ContentType.Message, new RaftMessageDecoder( protocol ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java new file mode 100644 index 000000000000..bd719687341d --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging.marshalling.v2.decoding; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultByteBufHolder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; +import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; +import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; + +public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder +{ + private UnfinishedChunk unfinishedChunk; + private final CoreReplicatedContentSerializer coreReplicatedContentSerializer = new CoreReplicatedContentSerializer(); + + @Override + protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) throws Exception + { + ReplicatedContentChunk replicatedContentChunk = ReplicatedContentChunk.deSerialize( in ); + if ( unfinishedChunk == null ) + { + if ( replicatedContentChunk.isLast() ) + { + out.add( coreReplicatedContentSerializer.read( replicatedContentChunk.contentType(), + new NetworkReadableClosableChannelNetty4( replicatedContentChunk.content() ) ) ); + } + else + { + unfinishedChunk = new UnfinishedChunk( replicatedContentChunk ); + } + } + else + { + unfinishedChunk.consume( replicatedContentChunk ); + + if ( replicatedContentChunk.isLast() ) + { + out.add( coreReplicatedContentSerializer.read( unfinishedChunk.contentType, + new NetworkReadableClosableChannelNetty4( unfinishedChunk.content() ) ) ); + unfinishedChunk.content().release(); + unfinishedChunk = null; + } + } + } + + private static class UnfinishedChunk extends DefaultByteBufHolder + { + private final byte contentType; + + UnfinishedChunk( ReplicatedContentChunk replicatedContentChunk ) + { + super( replicatedContentChunk.content().copy() ); + contentType = replicatedContentChunk.contentType(); + } + + void consume( ReplicatedContentChunk replicatedContentChunk ) + { + if ( replicatedContentChunk.contentType() != contentType ) + { + throw new IllegalArgumentException( "Wrong content type. Expected " + contentType + " got " + replicatedContentChunk.contentType() ); + } + content().writeBytes( replicatedContentChunk.content() ); + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java index e42fbab47965..bcd3045d9abf 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java @@ -19,9 +19,8 @@ */ package org.neo4j.causalclustering.messaging.marshalling.v2.decoding; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; @@ -29,23 +28,21 @@ import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentBuilder; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; -import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; -public class ReplicatedContentDecoder extends ByteToMessageDecoder +public class ReplicatedContentDecoder extends MessageToMessageDecoder> { - private final CoreReplicatedContentSerializer coreReplicatedContentSerializer = new CoreReplicatedContentSerializer(); private final Protocol protocol; private ContentBuilder contentBuilder = ContentBuilder.emptyUnfinished(); - ReplicatedContentDecoder( Protocol protocol ) + public ReplicatedContentDecoder( Protocol protocol ) { this.protocol = protocol; } @Override - protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) throws Exception + protected void decode( ChannelHandlerContext ctx, ContentBuilder msg, List out ) { - contentBuilder.combine( coreReplicatedContentSerializer.decode( in ) ); + contentBuilder.combine( msg ); if ( contentBuilder.isComplete() ) { out.add( contentBuilder.build() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java index 67bfdeed90cf..ac93eff03c12 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java @@ -30,7 +30,6 @@ import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; -import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent; public class RaftMessageContentEncoder extends MessageToMessageEncoder { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/SerializableContentEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/SerializableContentEncoder.java deleted file mode 100644 index 330b39b06d55..000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/SerializableContentEncoder.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging.marshalling.v2.encoding; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; - -import java.io.IOException; - -import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; -import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent; -import org.neo4j.storageengine.api.WritableChannel; - -public class SerializableContentEncoder extends MessageToByteEncoder -{ - @Override - protected void encode( ChannelHandlerContext ctx, SerializableContent.SimpleSerializableContent msg, ByteBuf out ) throws Exception - { - sendToNetwork( ctx, msg, out ); - } - - public void marshal( SerializableContent serializableContent, WritableChannel channel ) throws IOException - { - serializableContent.serialize( channel ); - } - - private void sendToNetwork( ChannelHandlerContext ctx, SerializableContent msg, ByteBuf out ) throws IOException - { - msg.serialize( new NetworkFlushableChannelNetty4( out ) ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java index 2a49d7440c2b..358488a955da 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/tx/ChunkedReplicatedTransactionInputTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; +import org.neo4j.causalclustering.messaging.marshalling.ChunkedReplicatedContent; import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; import static org.junit.Assert.assertEquals; @@ -37,7 +38,9 @@ public class ChunkedReplicatedTransactionInputTest public void shouldEncodeAndDecode() throws Exception { ReplicatedTransaction replicatedTransaction = new ReplicatedTransaction( new byte[]{1, 2, 3, 4} ); - ChunkedReplicatedTransactionInput chunkedReplicatedTransactionInput = new ChunkedReplicatedTransactionInput( (byte) 0, replicatedTransaction, 3 ); + byte contentType = (byte) 1; + ChunkedReplicatedContent chunkedReplicatedTransactionInput = + new ChunkedReplicatedContent( contentType, ReplicatedTransactionSerializer.serializer( replicatedTransaction ), 4 ); UnpooledByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; ByteBuf composedDeserialized = Unpooled.buffer(); @@ -46,7 +49,7 @@ public void shouldEncodeAndDecode() throws Exception ReplicatedContentChunk chunk = chunkedReplicatedTransactionInput.readChunk( allocator ); ByteBuf buffer = Unpooled.buffer(); - chunk.serialize( buffer ); + chunk.encode( buffer ); ReplicatedContentChunk deserializedChunk = ReplicatedContentChunk.deSerialize( buffer );