Skip to content

Commit

Permalink
Moved ReplicatedTransaction encode logic to ReplicatedTransactionSeri…
Browse files Browse the repository at this point in the history
…alizer
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent 4836ed1 commit 1e869b1
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 134 deletions.
Expand Up @@ -29,7 +29,6 @@
import java.util.Arrays;
import java.util.OptionalLong;

import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder;
import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentHandler;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.storageengine.api.WritableChannel;
Expand Down Expand Up @@ -84,7 +83,7 @@ public int hashCode()
@Override
public ChunkedInput<ByteBuf> encode()
{
return new ByteArrayChunkedEncoder( getTxBytes() );
return ReplicatedTransactionSerializer.encode( this );
}

@Override
Expand Down
Expand Up @@ -134,7 +134,7 @@ private TransactionRepresentation read( NetworkReadableClosableChannelNetty4 cha

static class TransactionRepresentationWriter
{
private final Iterator<StorageCommand> iterator;
private final Iterator<StorageCommand> commands;
private ThrowingConsumer<WritableChannel,IOException> nextJob;

private TransactionRepresentationWriter( TransactionRepresentation tx )
Expand All @@ -159,15 +159,15 @@ private TransactionRepresentationWriter( TransactionRepresentation tx )
channel.putInt( 0 );
}
};
iterator = tx.iterator();
commands = tx.iterator();
}

void write( WritableChannel channel ) throws IOException
{
nextJob.accept( channel );
if ( iterator.hasNext() )
if ( commands.hasNext() )
{
StorageCommand storageCommand = iterator.next();
StorageCommand storageCommand = commands.next();
nextJob = c -> new StorageCommandSerializer( c ).visit( storageCommand );
}
else
Expand Down
Expand Up @@ -23,11 +23,21 @@
package org.neo4j.causalclustering.core.state.machines.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;

import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.messaging.BoundedNetworkChannel;
import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder;
import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

Expand Down Expand Up @@ -84,12 +94,128 @@ public static void marshal( WritableChannel writableChannel, TransactionRepresen
writableChannel.put( outputStream.toByteArray(), length );
}

static void writeInitialMetaData( WritableChannel firstChunk, TransactionRepresentationReplicatedTransaction tx ) throws IOException
public static ChunkedInput<ByteBuf> encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction )
{
/*
Unknown length. The reason for sending this int is to avoid conflicts with Raft V1.
This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1.
*/
firstChunk.putInt( -1 );
return new TxRepresentationMarshal( representationReplicatedTransaction.tx() );
}

public static ChunkedInput<ByteBuf> encode( ByteArrayReplicatedTransaction byteArrayReplicatedTransaction )
{
return new ByteArrayChunkedEncoder( byteArrayReplicatedTransaction.getTxBytes() );
}

private static class TxRepresentationMarshal implements ChunkedInput<ByteBuf>
{
private static final int CHUNK_SIZE = 32 * 1024;
private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter;
private BoundedNetworkChannel channel;
private Queue<ByteBuf> chunks = new LinkedList<>();

private TxRepresentationMarshal( TransactionRepresentation replicatedTransaction )
{
txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( replicatedTransaction );
}

@Override
public boolean isEndOfInput()
{
return channel != null && channel.closed() && chunks.isEmpty();
}

@Override
public void close()
{
try ( ErrorHandler errorHandler = new ErrorHandler( "Closing TxRepresentationMarshal" ) )
{
if ( channel != null )
{
try
{
channel.close();
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
if ( !chunks.isEmpty() )
{
for ( ByteBuf byteBuf : chunks )
{
try
{
ReferenceCountUtil.release( byteBuf );
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
}
}
}

@Override
public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception
{
return readChunk( ctx.alloc() );
}

@Override
public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
{
if ( isEndOfInput() )
{
return null;
}
if ( channel == null )
{
// Ensure that the written buffers does not overflow the allocators chunk size.
channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks );
/*
Unknown length. The reason for sending this int is to avoid conflicts with Raft V1.
This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1.
*/
channel.putInt( -1 );
}
try
{
// write to chunks if empty and there is more to write
while ( txWriter.canWrite() && chunks.isEmpty() )
{
txWriter.write( channel );
}
// nothing more to write, close the channel to get the potential last buffer
if ( chunks.isEmpty() )
{
channel.close();
}
return chunks.poll();
}
catch ( Throwable t )
{
try
{
close();
}
catch ( Exception e )
{
t.addSuppressed( e );
}
throw t;
}
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return 0;
}
}
}
Expand Up @@ -23,24 +23,16 @@
package org.neo4j.causalclustering.core.state.machines.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;

import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.messaging.BoundedNetworkChannel;
import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentHandler;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.storageengine.api.WritableChannel;

public class TransactionRepresentationReplicatedTransaction implements ReplicatedTransaction
{
private static final int CHUNK_SIZE = 32 * 1024;
private final TransactionRepresentation tx;

TransactionRepresentationReplicatedTransaction( TransactionRepresentation tx )
Expand All @@ -51,7 +43,7 @@ public class TransactionRepresentationReplicatedTransaction implements Replicate
@Override
public ChunkedInput<ByteBuf> encode()
{
return new TxRepresentationMarshal( this );
return ReplicatedTransactionSerializer.encode( this );
}

@Override
Expand All @@ -76,117 +68,4 @@ public void handle( ReplicatedContentHandler contentHandler ) throws IOException
{
contentHandler.handle( this );
}

public class TxRepresentationMarshal implements ChunkedInput<ByteBuf>
{
private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter;
private final TransactionRepresentationReplicatedTransaction thisTx;
private BoundedNetworkChannel channel;
private Queue<ByteBuf> output = new LinkedList<>();

private TxRepresentationMarshal( TransactionRepresentationReplicatedTransaction tx )
{
txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( tx.tx );
thisTx = tx;
}

@Override
public boolean isEndOfInput()
{
return channel != null && channel.closed() && output.isEmpty();
}

@Override
public void close()
{
try ( ErrorHandler errorHandler = new ErrorHandler( "Closing TxRepresentationMarshal" ) )
{
if ( channel != null )
{
try
{
channel.close();
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
if ( !output.isEmpty() )
{
for ( ByteBuf byteBuf : output )
{
try
{
ReferenceCountUtil.release( byteBuf );
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
}
}
}

@Override
public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception
{
return readChunk( ctx.alloc() );
}

@Override
public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
{
if ( isEndOfInput() )
{
return null;
}
if ( channel == null )
{
// Ensure that the written buffers does not overflow the allocators chunk size.
channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, output );
// Add metadata to first chunk
ReplicatedTransactionSerializer.writeInitialMetaData( channel, thisTx );
}
try
{
// write to output if empty and there is more to write
while ( txWriter.canWrite() && output.isEmpty() )
{
txWriter.write( channel );
}
// nothing more to write, close the channel to get the potential last buffer
if ( output.isEmpty() )
{
channel.close();
}
return output.poll();
}
catch ( Throwable t )
{
try
{
close();
}
catch ( Exception e )
{
t.addSuppressed( e );
}
throw t;
}
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return 0;
}
}
}

0 comments on commit 1e869b1

Please sign in to comment.