Skip to content

Commit

Permalink
Chunk all replicated content
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent c66f8e6 commit b888f22
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 332 deletions.
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.SerializableContentEncoder;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -76,7 +75,6 @@ public void install( Channel channel ) throws Exception
.add( "raft_content_type_encoder", new ContentTypeEncoder() ) .add( "raft_content_type_encoder", new ContentTypeEncoder() )
.add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() ) .add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() )
.add( "raft_chunked_writer", new ChunkedWriteHandler( ) ) .add( "raft_chunked_writer", new ChunkedWriteHandler( ) )
.add( "raft_content_encoder", new SerializableContentEncoder() )
.add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() ) .add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() )
.add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) ) .add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) )
.install(); .install();
Expand Down
Expand Up @@ -27,10 +27,11 @@
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ReplicatedContentDecoder;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -78,6 +79,7 @@ public void install( Channel channel ) throws Exception
.addFraming() .addFraming()
.add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) ) .add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) )
.add( "raft_component_decoder", new DecodingDispatcher( contentTypeProtocol, logProvider ) ) .add( "raft_component_decoder", new DecodingDispatcher( contentTypeProtocol, logProvider ) )
.add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) )
.add( "raft_message_composer", new RaftMessageComposer( Clock.systemUTC() ) ) .add( "raft_message_composer", new RaftMessageComposer( Clock.systemUTC() ) )
.add( "raft_handler", raftMessageHandler ) .add( "raft_handler", raftMessageHandler )
.install(); .install();
Expand Down

This file was deleted.

Expand Up @@ -24,9 +24,10 @@


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;


import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;


import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent; import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


Expand Down Expand Up @@ -67,4 +68,50 @@ public static ReplicatedTransaction unmarshal( ByteBuf buffer )


return new ReplicatedTransaction( txBytes ); return new ReplicatedTransaction( txBytes );
} }

public static Serializer serializer( ReplicatedTransaction replicatedTransaction )
{
return new TxSerializer( replicatedTransaction );
}

private static class TxSerializer implements Serializer
{
private final ReplicatedTransaction replicatedTransaction;
private final ByteArrayInputStream inputStream;

TxSerializer( ReplicatedTransaction replicatedTransaction )
{
inputStream = new ByteArrayInputStream( replicatedTransaction.getTxBytes() );
this.replicatedTransaction = replicatedTransaction;
}

@Override
public boolean encode( ByteBuf byteBuf ) throws IOException
{
if ( inputStream.available() == replicatedTransaction.getTxBytes().length )
{
byteBuf.writeInt( replicatedTransaction.getTxBytes().length );
}
if ( !hasBytes() )
{
return false;
}
int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() );
byteBuf.writeBytes( inputStream, toWrite );
return hasBytes();
}

private boolean hasBytes()
{
return inputStream.available() > 0;
}

@Override
public void marshal( WritableChannel channel ) throws IOException
{
int length = replicatedTransaction.getTxBytes().length;
channel.putInt( length );
channel.put( replicatedTransaction.getTxBytes(), length );
}
}
} }
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;

import java.io.IOException;

import org.neo4j.storageengine.api.WritableChannel;

public class ChunkedReplicatedContent implements SerializableContent, ChunkedInput<ReplicatedContentChunk>
{

private static final int DEFAULT_CHUNK_SIZE = 8192;
private final byte contentType;
private final Serializer serializer;
private final int chunkSize;
private boolean lastByteWasWritten;
private int progress;

public ChunkedReplicatedContent( byte contentType, Serializer serializer, int chunkSize )
{
this.serializer = serializer;
this.chunkSize = chunkSize;
if ( chunkSize < 4 )
{
throw new IllegalArgumentException( "Chunk size must be at least 4 bytes" );
}
this.contentType = contentType;
}

public ChunkedReplicatedContent( byte contentType, Serializer serializer )
{
this( contentType, serializer, DEFAULT_CHUNK_SIZE );
}

@Override
public void serialize( WritableChannel channel ) throws IOException
{
channel.put( contentType );
serializer.marshal( channel );
}

@Override
public boolean isEndOfInput()
{
return lastByteWasWritten;
}

@Override
public void close()
{
// do nothing
}

@Override
public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws IOException
{
return readChunk( ctx.alloc() );
}

@Override
public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws IOException
{
if ( isEndOfInput() )
{
return null;
}
ByteBuf buffer = allocator.buffer( chunkSize );
if ( !serializer.encode( buffer ) )
{
lastByteWasWritten = true;
}
progress += buffer.readableBytes();
return new ReplicatedContentChunk( contentType, isEndOfInput(), buffer );
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return progress;
}
}
Expand Up @@ -24,56 +24,37 @@


public class ReplicatedContentChunk extends DefaultByteBufHolder public class ReplicatedContentChunk extends DefaultByteBufHolder
{ {
private final byte txContentType; private final byte contentType;
private final boolean isFirst;
private final boolean isLast; private final boolean isLast;


public ReplicatedContentChunk( byte txContentType, boolean isFirst, boolean isLast, ByteBuf data ) ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data )
{ {
super( data ); super( data );
this.txContentType = txContentType; this.contentType = contentType;
this.isFirst = isFirst;
this.isLast = isLast; this.isLast = isLast;
} }


public boolean isFirst()
{
return isFirst;
}

public boolean isLast() public boolean isLast()
{ {
return isLast; return isLast;
} }


private byte txContentType() public byte contentType()
{ {
return txContentType; return contentType;
} }


public void serialize( ByteBuf out ) public void encode( ByteBuf out )
{ {
out.writeByte( txContentType() ); out.writeByte( contentType() );
out.writeByte( isFirstByte() ); out.writeBoolean( isLast() );
out.writeByte( isLastByte() );
out.writeBytes( content() ); out.writeBytes( content() );
} }


public static ReplicatedContentChunk deSerialize( ByteBuf in ) public static ReplicatedContentChunk deSerialize( ByteBuf in )
{ {
byte txContentType = in.readByte(); byte txContentType = in.readByte();
boolean isFirst = in.readByte() == (byte) 1; boolean isLast = in.readBoolean();
boolean isLast = in.readByte() == (byte) 1; return new ReplicatedContentChunk( txContentType, isLast, in.readSlice( in.readableBytes() ) );
return new ReplicatedContentChunk( txContentType, isFirst, isLast, in.readRetainedSlice( in.readableBytes() ) );
}

private byte isLastByte()
{
return isLast ? (byte) 1 : (byte) 0;
}

private byte isFirstByte()
{
return isFirst ? (byte) 1 : (byte) 0;
} }
} }
Expand Up @@ -28,6 +28,6 @@ public class ReplicatedContentChunkEncoder extends MessageToByteEncoder<Replicat
@Override @Override
protected void encode( ChannelHandlerContext ctx, ReplicatedContentChunk msg, ByteBuf out ) protected void encode( ChannelHandlerContext ctx, ReplicatedContentChunk msg, ByteBuf out )
{ {
msg.serialize( out ); msg.encode( out );
} }
} }

0 comments on commit b888f22

Please sign in to comment.