Skip to content

Commit

Permalink
Refactoring from review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent 1e869b1 commit 2d01553
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 127 deletions.
Expand Up @@ -68,7 +68,8 @@ public void install( Channel channel ) throws Exception
{
clientPipelineBuilderFactory.client( channel, log )
.modify( modifiers )
.addFraming().add( "raft_encoder", new RaftMessageEncoder( CoreReplicatedContentMarshal.marshaller() ) )
.addFraming()
.add( "raft_encoder", new RaftMessageEncoder( CoreReplicatedContentMarshal.marshaller() ) )
.install();
}

Expand Down
@@ -0,0 +1,129 @@
package org.neo4j.causalclustering.core.state.machines.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;

import java.util.LinkedList;
import java.util.Queue;

import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.messaging.BoundedNetworkChannel;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;

class ChunkedTransaction implements ChunkedInput<ByteBuf>
{
private static final int CHUNK_SIZE = 32 * 1024;
private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter;
private BoundedNetworkChannel channel;
private Queue<ByteBuf> chunks = new LinkedList<>();

ChunkedTransaction( TransactionRepresentation tx )
{
txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( tx );
}

@Override
public boolean isEndOfInput()
{
return channel != null && channel.closed() && chunks.isEmpty();
}

@Override
public void close()
{
try ( ErrorHandler errorHandler = new ErrorHandler( "Closing ChunkedTransaction" ) )
{
if ( channel != null )
{
try
{
channel.close();
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
if ( !chunks.isEmpty() )
{
for ( ByteBuf byteBuf : chunks )
{
try
{
ReferenceCountUtil.release( byteBuf );
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
}
}
}

@Override
public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception
{
return readChunk( ctx.alloc() );
}

@Override
public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
{
if ( isEndOfInput() )
{
return null;
}
if ( channel == null )
{
// Ensure that the written buffers does not overflow the allocators chunk size.
channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks );
/*
Unknown length. The reason for sending this int is to avoid conflicts with Raft V1.
This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1.
*/
channel.putInt( -1 );
}
try
{
// write to chunks if empty and there is more to write
while ( txWriter.canWrite() && chunks.isEmpty() )
{
txWriter.write( channel );
}
// nothing more to write, close the channel to get the potential last buffer
if ( chunks.isEmpty() )
{
channel.close();
}
return chunks.poll();
}
catch ( Throwable t )
{
try
{
close();
}
catch ( Exception e )
{
t.addSuppressed( e );
}
throw t;
}
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return 0;
}
}
Expand Up @@ -23,21 +23,13 @@
package org.neo4j.causalclustering.core.state.machines.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;

import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.messaging.BoundedNetworkChannel;
import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder;
import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

Expand Down Expand Up @@ -96,126 +88,11 @@ public static void marshal( WritableChannel writableChannel, TransactionRepresen

public static ChunkedInput<ByteBuf> encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction )
{
return new TxRepresentationMarshal( representationReplicatedTransaction.tx() );
return new ChunkedTransaction( representationReplicatedTransaction.tx() );
}

public static ChunkedInput<ByteBuf> encode( ByteArrayReplicatedTransaction byteArrayReplicatedTransaction )
{
return new ByteArrayChunkedEncoder( byteArrayReplicatedTransaction.getTxBytes() );
}

private static class TxRepresentationMarshal implements ChunkedInput<ByteBuf>
{
private static final int CHUNK_SIZE = 32 * 1024;
private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter;
private BoundedNetworkChannel channel;
private Queue<ByteBuf> chunks = new LinkedList<>();

private TxRepresentationMarshal( TransactionRepresentation replicatedTransaction )
{
txWriter = ReplicatedTransactionFactory.transactionalRepresentationWriter( replicatedTransaction );
}

@Override
public boolean isEndOfInput()
{
return channel != null && channel.closed() && chunks.isEmpty();
}

@Override
public void close()
{
try ( ErrorHandler errorHandler = new ErrorHandler( "Closing TxRepresentationMarshal" ) )
{
if ( channel != null )
{
try
{
channel.close();
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
if ( !chunks.isEmpty() )
{
for ( ByteBuf byteBuf : chunks )
{
try
{
ReferenceCountUtil.release( byteBuf );
}
catch ( Throwable t )
{
errorHandler.add( t );
}
}
}
}
}

@Override
public ByteBuf readChunk( ChannelHandlerContext ctx ) throws Exception
{
return readChunk( ctx.alloc() );
}

@Override
public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
{
if ( isEndOfInput() )
{
return null;
}
if ( channel == null )
{
// Ensure that the written buffers does not overflow the allocators chunk size.
channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks );
/*
Unknown length. The reason for sending this int is to avoid conflicts with Raft V1.
This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1.
*/
channel.putInt( -1 );
}
try
{
// write to chunks if empty and there is more to write
while ( txWriter.canWrite() && chunks.isEmpty() )
{
txWriter.write( channel );
}
// nothing more to write, close the channel to get the potential last buffer
if ( chunks.isEmpty() )
{
channel.close();
}
return chunks.poll();
}
catch ( Throwable t )
{
try
{
close();
}
catch ( Exception e )
{
t.addSuppressed( e );
}
throw t;
}
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return 0;
}
}
}
Expand Up @@ -41,7 +41,7 @@ public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder

ReplicatedContentChunkDecoder()
{
setCumulator( new ContentChunkCumulator() );
setCumulator( new ContentChunkAccumulator() );
}

@Override
Expand All @@ -60,7 +60,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
}
}

private class ContentChunkCumulator implements Cumulator
private class ContentChunkAccumulator implements Cumulator
{
@Override
public ByteBuf cumulate( ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in )
Expand Down

0 comments on commit 2d01553

Please sign in to comment.