diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ByteArrayReplicatedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ByteArrayReplicatedTransaction.java index 292064f87592a..29729e55e63a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ByteArrayReplicatedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ByteArrayReplicatedTransaction.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.OptionalLong; -import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder; import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentHandler; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.storageengine.api.WritableChannel; @@ -84,7 +83,7 @@ public int hashCode() @Override public ChunkedInput encode() { - return new ByteArrayChunkedEncoder( getTxBytes() ); + return ReplicatedTransactionSerializer.encode( this ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionFactory.java index 26b1b6ad418f2..1f97ed388f949 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionFactory.java @@ -134,7 +134,7 @@ private TransactionRepresentation read( NetworkReadableClosableChannelNetty4 cha static class TransactionRepresentationWriter { - private final Iterator iterator; + private final Iterator commands; private ThrowingConsumer nextJob; private TransactionRepresentationWriter( TransactionRepresentation tx ) @@ -159,15 +159,15 @@ private TransactionRepresentationWriter( TransactionRepresentation tx ) channel.putInt( 0 ); } }; - iterator = tx.iterator(); + commands = tx.iterator(); } void write( WritableChannel channel ) throws IOException { nextJob.accept( channel ); - if ( iterator.hasNext() ) + if ( commands.hasNext() ) { - StorageCommand storageCommand = iterator.next(); + StorageCommand storageCommand = commands.next(); nextJob = c -> new StorageCommandSerializer( c ).visit( storageCommand ); } else diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java index db1db6935d0f4..7b2b11844109b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionSerializer.java @@ -23,11 +23,21 @@ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; +import io.netty.util.ReferenceCountUtil; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; +import org.neo4j.causalclustering.helper.ErrorHandler; +import org.neo4j.causalclustering.messaging.BoundedNetworkChannel; +import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder; import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -84,12 +94,128 @@ public static void marshal( WritableChannel writableChannel, TransactionRepresen writableChannel.put( outputStream.toByteArray(), length ); } - static void writeInitialMetaData( WritableChannel firstChunk, TransactionRepresentationReplicatedTransaction tx ) throws IOException + public static ChunkedInput encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction ) { - /* - Unknown length. The reason for sending this int is to avoid conflicts with Raft V1. - This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1. - */ - firstChunk.putInt( -1 ); + return new TxRepresentationMarshal( representationReplicatedTransaction.tx() ); + } + + public static ChunkedInput encode( ByteArrayReplicatedTransaction byteArrayReplicatedTransaction ) + { + return new ByteArrayChunkedEncoder( byteArrayReplicatedTransaction.getTxBytes() ); + } + + private static class TxRepresentationMarshal implements ChunkedInput + { + private static final int CHUNK_SIZE = 32 * 1024; + private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter; + private BoundedNetworkChannel channel; + private Queue chunks = new LinkedList<>(); + + private TxRepresentationMarshal( TransactionRepresentation replicatedTransaction ) + { + txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( replicatedTransaction ); + } + + @Override + public boolean isEndOfInput() + { + return channel != null && channel.closed() && chunks.isEmpty(); + } + + @Override + public void close() + { + try ( ErrorHandler errorHandler = new ErrorHandler( "Closing TxRepresentationMarshal" ) ) + { + if ( channel != null ) + { + try + { + channel.close(); + } + catch ( Throwable t ) + { + errorHandler.add( t ); + } + } + if ( !chunks.isEmpty() ) + { + for ( ByteBuf byteBuf : chunks ) + { + try + { + ReferenceCountUtil.release( byteBuf ); + } + catch ( Throwable t ) + { + errorHandler.add( t ); + } + } + } + } + } + + @Override + public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception + { + return readChunk( ctx.alloc() ); + } + + @Override + public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception + { + if ( isEndOfInput() ) + { + return null; + } + if ( channel == null ) + { + // Ensure that the written buffers does not overflow the allocators chunk size. + channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks ); + /* + Unknown length. The reason for sending this int is to avoid conflicts with Raft V1. + This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1. + */ + channel.putInt( -1 ); + } + try + { + // write to chunks if empty and there is more to write + while ( txWriter.canWrite() && chunks.isEmpty() ) + { + txWriter.write( channel ); + } + // nothing more to write, close the channel to get the potential last buffer + if ( chunks.isEmpty() ) + { + channel.close(); + } + return chunks.poll(); + } + catch ( Throwable t ) + { + try + { + close(); + } + catch ( Exception e ) + { + t.addSuppressed( e ); + } + throw t; + } + } + + @Override + public long length() + { + return -1; + } + + @Override + public long progress() + { + return 0; + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java index b07d5bec328c2..36cbd85af5ec2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java @@ -23,24 +23,16 @@ package org.neo4j.causalclustering.core.state.machines.tx; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; import io.netty.handler.stream.ChunkedInput; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; -import java.util.LinkedList; -import java.util.Queue; -import org.neo4j.causalclustering.helper.ErrorHandler; -import org.neo4j.causalclustering.messaging.BoundedNetworkChannel; import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentHandler; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.storageengine.api.WritableChannel; public class TransactionRepresentationReplicatedTransaction implements ReplicatedTransaction { - private static final int CHUNK_SIZE = 32 * 1024; private final TransactionRepresentation tx; TransactionRepresentationReplicatedTransaction( TransactionRepresentation tx ) @@ -51,7 +43,7 @@ public class TransactionRepresentationReplicatedTransaction implements Replicate @Override public ChunkedInput encode() { - return new TxRepresentationMarshal( this ); + return ReplicatedTransactionSerializer.encode( this ); } @Override @@ -76,117 +68,4 @@ public void handle( ReplicatedContentHandler contentHandler ) throws IOException { contentHandler.handle( this ); } - - public class TxRepresentationMarshal implements ChunkedInput - { - private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter; - private final TransactionRepresentationReplicatedTransaction thisTx; - private BoundedNetworkChannel channel; - private Queue output = new LinkedList<>(); - - private TxRepresentationMarshal( TransactionRepresentationReplicatedTransaction tx ) - { - txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( tx.tx ); - thisTx = tx; - } - - @Override - public boolean isEndOfInput() - { - return channel != null && channel.closed() && output.isEmpty(); - } - - @Override - public void close() - { - try ( ErrorHandler errorHandler = new ErrorHandler( "Closing TxRepresentationMarshal" ) ) - { - if ( channel != null ) - { - try - { - channel.close(); - } - catch ( Throwable t ) - { - errorHandler.add( t ); - } - } - if ( !output.isEmpty() ) - { - for ( ByteBuf byteBuf : output ) - { - try - { - ReferenceCountUtil.release( byteBuf ); - } - catch ( Throwable t ) - { - errorHandler.add( t ); - } - } - } - } - } - - @Override - public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception - { - return readChunk( ctx.alloc() ); - } - - @Override - public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception - { - if ( isEndOfInput() ) - { - return null; - } - if ( channel == null ) - { - // Ensure that the written buffers does not overflow the allocators chunk size. - channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, output ); - // Add metadata to first chunk - ReplicatedTransactionSerializer.writeInitialMetaData( channel, thisTx ); - } - try - { - // write to output if empty and there is more to write - while ( txWriter.canWrite() && output.isEmpty() ) - { - txWriter.write( channel ); - } - // nothing more to write, close the channel to get the potential last buffer - if ( output.isEmpty() ) - { - channel.close(); - } - return output.poll(); - } - catch ( Throwable t ) - { - try - { - close(); - } - catch ( Exception e ) - { - t.addSuppressed( e ); - } - throw t; - } - } - - @Override - public long length() - { - return -1; - } - - @Override - public long progress() - { - return 0; - } - } }