Skip to content

Commit

Permalink
Skip unnecessary allocation through extra encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent 2488e9a commit a00a1f6
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 103 deletions.
Expand Up @@ -29,7 +29,6 @@
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
Expand Down Expand Up @@ -76,7 +75,6 @@ public void install( Channel channel ) throws Exception
.addFraming() .addFraming()
.add( "raft_message_encoder", new RaftMessageEncoder() ) .add( "raft_message_encoder", new RaftMessageEncoder() )
.add( "raft_content_type_encoder", new ContentTypeEncoder() ) .add( "raft_content_type_encoder", new ContentTypeEncoder() )
.add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() )
.add( "raft_chunked_writer", new ChunkedWriteHandler( ) ) .add( "raft_chunked_writer", new ChunkedWriteHandler( ) )
.add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() ) .add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() )
.add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) ) .add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) )
Expand Down
Expand Up @@ -24,9 +24,9 @@


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;


import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;


import org.neo4j.causalclustering.messaging.marshalling.ByteArraySerializer;
import org.neo4j.causalclustering.messaging.marshalling.Serializer; import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;
Expand Down Expand Up @@ -71,47 +71,6 @@ public static ReplicatedTransaction unmarshal( ByteBuf buffer )


public static Serializer serializer( ReplicatedTransaction replicatedTransaction ) public static Serializer serializer( ReplicatedTransaction replicatedTransaction )
{ {
return new TxSerializer( replicatedTransaction ); return new ByteArraySerializer( replicatedTransaction.getTxBytes() );
}

private static class TxSerializer implements Serializer
{
private final ReplicatedTransaction replicatedTransaction;
private final ByteArrayInputStream inputStream;

TxSerializer( ReplicatedTransaction replicatedTransaction )
{
inputStream = new ByteArrayInputStream( replicatedTransaction.getTxBytes() );
this.replicatedTransaction = replicatedTransaction;
}

@Override
public boolean encode( ByteBuf byteBuf ) throws IOException
{
if ( inputStream.available() == replicatedTransaction.getTxBytes().length )
{
byteBuf.writeInt( replicatedTransaction.getTxBytes().length );
}
if ( !hasBytes() )
{
return false;
}
int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() );
byteBuf.writeBytes( inputStream, toWrite );
return hasBytes();
}

private boolean hasBytes()
{
return inputStream.available() > 0;
}

@Override
public void marshal( WritableChannel channel ) throws IOException
{
int length = replicatedTransaction.getTxBytes().length;
channel.putInt( length );
channel.put( replicatedTransaction.getTxBytes(), length );
}
} }
} }
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.messaging.marshalling;

import io.netty.buffer.ByteBuf;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.neo4j.storageengine.api.WritableChannel;

public class ByteArraySerializer implements Serializer
{
private final byte[] content;
private final ByteArrayInputStream inputStream;

public ByteArraySerializer( byte[] content )
{
inputStream = new ByteArrayInputStream( content );
this.content = content;
}

@Override
public boolean encode( ByteBuf byteBuf ) throws IOException
{
if ( inputStream.available() == content.length )
{
byteBuf.writeInt( content.length );
}
if ( !hasBytes() )
{
return false;
}
int toWrite = Math.min( inputStream.available(), byteBuf.writableBytes() );
byteBuf.writeBytes( inputStream, toWrite );
return hasBytes();
}

private boolean hasBytes()
{
return inputStream.available() > 0;
}

@Override
public void marshal( WritableChannel channel ) throws IOException
{
int length = content.length;
channel.putInt( length );
channel.put( content, length );
}
}
Expand Up @@ -31,7 +31,7 @@


import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ReplicatedContentChunk> public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ByteBuf>
{ {


private static final int DEFAULT_CHUNK_SIZE = 8192; private static final int DEFAULT_CHUNK_SIZE = 8192;
Expand Down Expand Up @@ -77,25 +77,41 @@ public void close()
} }


@Override @Override
public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws IOException public ByteBuf readChunk( ChannelHandlerContext ctx ) throws IOException
{ {
return readChunk( ctx.alloc() ); return readChunk( ctx.alloc() );
} }


@Override @Override
public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws IOException public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException
{ {
if ( isEndOfInput() ) boolean endOfInput = isEndOfInput();
if ( endOfInput )
{ {
return null; return null;
} }
ByteBuf buffer = allocator.buffer( chunkSize ); ByteBuf buffer = allocator.buffer( chunkSize );
if ( !serializer.encode( buffer ) ) try
{ {
lastByteWasWritten = true; // transfer to buffer
buffer.writeByte( contentType );
buffer.writeBoolean( endOfInput );
if ( !serializer.encode( buffer ) )
{
lastByteWasWritten = true;
}
if ( isEndOfInput() != endOfInput )
{
buffer.setBoolean( 1, isEndOfInput() );
}
progress += buffer.readableBytes();
return buffer;
}
catch ( IOException e )
{
buffer.release();
throw e;
} }
progress += buffer.readableBytes();
return new ReplicatedContentChunk( contentType, isEndOfInput(), buffer );
} }


@Override @Override
Expand Down
Expand Up @@ -30,7 +30,7 @@ public class ReplicatedContentChunk extends DefaultByteBufHolder
private final byte contentType; private final byte contentType;
private final boolean isLast; private final boolean isLast;


ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data ) private ReplicatedContentChunk( byte contentType, boolean isLast, ByteBuf data )
{ {
super( data ); super( data );
this.contentType = contentType; this.contentType = contentType;
Expand All @@ -47,13 +47,6 @@ public byte contentType()
return contentType; return contentType;
} }


public void encode( ByteBuf out )
{
out.writeByte( contentType() );
out.writeBoolean( isLast() );
out.writeBytes( content() );
}

public static ReplicatedContentChunk deSerialize( ByteBuf in ) public static ReplicatedContentChunk deSerialize( ByteBuf in )
{ {
byte txContentType = in.readByte(); byte txContentType = in.readByte();
Expand Down

This file was deleted.

Expand Up @@ -49,15 +49,12 @@ public void shouldEncodeAndDecode() throws Exception
ByteBuf composedDeserialized = Unpooled.buffer(); ByteBuf composedDeserialized = Unpooled.buffer();
while ( !chunkedReplicatedTransactionInput.isEndOfInput() ) while ( !chunkedReplicatedTransactionInput.isEndOfInput() )
{ {
ReplicatedContentChunk chunk = chunkedReplicatedTransactionInput.readChunk( allocator ); ByteBuf chunk = chunkedReplicatedTransactionInput.readChunk( allocator );


ByteBuf buffer = Unpooled.buffer(); ReplicatedContentChunk deserializedChunk = ReplicatedContentChunk.deSerialize( chunk );
chunk.encode( buffer );

ReplicatedContentChunk deserializedChunk = ReplicatedContentChunk.deSerialize( buffer );


composedDeserialized.writeBytes( deserializedChunk.content() ); composedDeserialized.writeBytes( deserializedChunk.content() );
buffer.release(); chunk.release();
} }
byte[] array = Arrays.copyOf( composedDeserialized.array(), composedDeserialized.readableBytes() ); byte[] array = Arrays.copyOf( composedDeserialized.array(), composedDeserialized.readableBytes() );
assertEquals( replicatedTransaction, new ReplicatedTransaction( array ) ); assertEquals( replicatedTransaction, new ReplicatedTransaction( array ) );
Expand Down

0 comments on commit a00a1f6

Please sign in to comment.