Skip to content

Commit

Permalink
Added network fragmentation test & fixed fragmentation bug
Browse files Browse the repository at this point in the history
No matter how data are fragmentated and sent to server, the server should always append them in proper order into buffer.
  • Loading branch information
jakewins authored and Zhen committed May 21, 2015
1 parent 72c7977 commit e2a130e
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 20 deletions.
Expand Up @@ -61,7 +61,7 @@ public ChunkedInput clear()
return this;
}

public void addChunk( ByteBuf chunk )
public void append( ByteBuf chunk )
{
if ( chunk.readableBytes() > 0 )
{
Expand Down
Expand Up @@ -54,7 +54,7 @@ public class SocketProtocolV1 implements SocketProtocol
private final Log log;
private final AtomicInteger inFlight = new AtomicInteger( 0 );

enum State
public enum State
{
AWAITING_CHUNK,
IN_CHUNK,
Expand Down Expand Up @@ -127,20 +127,22 @@ public void handle( ChannelHandlerContext channelContext, ByteBuf data )
if ( chunkSize < data.readableBytes() )
{
// Current packet is larger than current chunk, slice of the chunk
input.addChunk( data.readSlice( chunkSize ) );
input.append( data.readSlice( chunkSize ) );
state = State.AWAITING_CHUNK;
}
else if ( chunkSize == data.readableBytes() )
{
// Current packet perfectly maps to current chunk
input.addChunk( data );
input.append( data );
state = State.AWAITING_CHUNK;
return;
}
else
{
// Current packet is smaller than the chunk we're reading, split the current chunk itself up
chunkSize -= data.readableBytes();
input.addChunk( data );
input.append( data );
return;
}
break;
}
Expand Down Expand Up @@ -174,6 +176,11 @@ public void close()
session.close();
}

public State state()
{
return state;
}

private void handleHeader( ChannelHandlerContext channelContext )
{
if(chunkSize == 0)
Expand All @@ -188,7 +195,7 @@ private void handleHeader( ChannelHandlerContext channelContext )
}
}

public void processCollectedMessage( final ChannelHandlerContext channelContext )
private void processCollectedMessage( final ChannelHandlerContext channelContext )
{
output.setTargetChannel( channelContext );
try
Expand Down
Expand Up @@ -35,9 +35,9 @@ public void shouldExposeMultipleChunksAsCohesiveStream() throws Throwable
// Given
ChunkedInput ch = new ChunkedInput();

ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) );
ch.addChunk( wrappedBuffer( new byte[]{3} ) );
ch.addChunk( wrappedBuffer( new byte[]{4, 5} ) );
ch.append( wrappedBuffer( new byte[]{1, 2} ) );
ch.append( wrappedBuffer( new byte[]{3} ) );
ch.append( wrappedBuffer( new byte[]{4, 5} ) );

// When
byte[] bytes = new byte[5];
Expand All @@ -54,11 +54,11 @@ public void shouldReadIntoMisalignedDestinationBuffer() throws Throwable
byte[] bytes = new byte[3];
ChunkedInput ch = new ChunkedInput();

ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) );
ch.addChunk( wrappedBuffer( new byte[]{3} ) );
ch.addChunk( wrappedBuffer( new byte[]{4} ) );
ch.addChunk( wrappedBuffer( new byte[]{5} ) );
ch.addChunk( wrappedBuffer( new byte[]{6, 7} ) );
ch.append( wrappedBuffer( new byte[]{1, 2} ) );
ch.append( wrappedBuffer( new byte[]{3} ) );
ch.append( wrappedBuffer( new byte[]{4} ) );
ch.append( wrappedBuffer( new byte[]{5} ) );
ch.append( wrappedBuffer( new byte[]{6, 7} ) );

// When I read {1,2,3}
ch.get( bytes, 0, 3 );
Expand Down Expand Up @@ -88,9 +88,9 @@ public void shouldReadPartialChunk() throws Throwable
// Given
ChunkedInput ch = new ChunkedInput();

ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) );
ch.addChunk( wrappedBuffer( new byte[]{3} ) );
ch.addChunk( wrappedBuffer( new byte[]{4, 5} ) );
ch.append( wrappedBuffer( new byte[]{1, 2} ) );
ch.append( wrappedBuffer( new byte[]{3} ) );
ch.append( wrappedBuffer( new byte[]{4, 5} ) );

// When
byte[] bytes = new byte[5];
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.ndp.docs.v1;
package org.neo4j.ndp.transport.socket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand Down
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.ndp.transport.socket;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.neo4j.kernel.impl.util.HexPrinter;
import org.neo4j.logging.NullLog;
import org.neo4j.ndp.messaging.v1.PackStreamMessageFormatV1;
import org.neo4j.ndp.messaging.v1.RecordingByteChannel;
import org.neo4j.ndp.messaging.v1.message.Message;
import org.neo4j.ndp.runtime.Session;

import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.neo4j.ndp.messaging.v1.message.Messages.run;
import static org.neo4j.ndp.transport.socket.SocketProtocolV1.State.AWAITING_CHUNK;

/**
* This tests network fragmentation of messages. Given a set of messages, it will serialize and chunk the message up
* to a specified chunk size. Then it will split that data into a specified number of fragments, trying every possible
* permutation of fragment sizes for the specified number. For instance, assuming an unfragmented message size of 15,
* and a fragment count of 3, it will create fragment size permutations like:
* <p/>
* [1,1,13]
* [1,2,12]
* [1,3,11]
* ..
* [12,1,1]
* <p/>
* For each permutation, it delivers the fragments to the protocol implementation, and asserts the protocol handled
* them properly.
*/
public class FragmentedMessageDeliveryTest
{
// Only test one chunk size for now, this can be parameterized to test lots of different ones
private int chunkSize = 16;

// Only test messages broken into three fragments for now, this can be parameterized later
private int numFragments = 3;

// Only test one message for now. This can be parameterized later to test lots of different ones
private Message[] messages = new Message[]{run( "Mjölnir" )};

private PackStreamMessageFormatV1.Writer format = new PackStreamMessageFormatV1.Writer();

@Test
public void testFragmentedMessageDelivery() throws Throwable
{
// Given
byte[] unfragmented = serialize( chunkSize, messages );

// When & Then
int n = unfragmented.length;
for ( int i = 1; i < n - 1; i++ )
{
for ( int j = 1; j < n - i; j++ )
{
testPermutation( unfragmented, i, j, n - i - j );
}
}
}

private void testPermutation( byte[] unfragmented, int... sizes )
{
int pos = 0;
ByteBuf[] fragments = new ByteBuf[sizes.length];
for ( int i = 0; i < sizes.length; i++ )
{
fragments[i] = wrappedBuffer( unfragmented, pos, sizes[i] );
pos += sizes[i];
}
testPermutation( unfragmented, fragments );
}

private void testPermutation( byte[] unfragmented, ByteBuf[] fragments )
{
// Given
// System.out.println( "Testing fragmentation:" + describeFragments( fragments ) );
Session sess = mock( Session.class );
ChannelHandlerContext ch = mock( ChannelHandlerContext.class );
SocketProtocolV1 protocol = new SocketProtocolV1( NullLog.getInstance(), sess );

// When data arrives split up according to the current permutation
for ( ByteBuf fragment : fragments )
{
fragment.readerIndex( 0 ).retain();
protocol.handle( ch, fragment );
}

// Then the session should've received the specified messages, and the protocol should be in a nice clean state
try
{
assertEquals( AWAITING_CHUNK, protocol.state() );
verify( sess ).run( eq( "Mjölnir" ), any( Map.class ), any(), any( Session.Callback.class ) );
}
catch ( AssertionError e )
{
throw new AssertionError( "Failed to handle fragmented delivery.\n" +
"Messages: " + Arrays.toString( messages ) + "\n" +
"Chunk size: " + chunkSize + "\n" +
"Serialized data delivered in fragments: " + describeFragments( fragments ) +
"\n" +
"Unfragmented data: " + HexPrinter.hex( unfragmented ) + "\n", e );
}
}

private String describeFragments( ByteBuf[] fragments )
{
StringBuilder sb = new StringBuilder();
for ( int i = 0; i < fragments.length; i++ )
{
if ( i > 0 ) { sb.append( "," ); }
sb.append( fragments[i].capacity() );
}
return sb.toString();
}

private byte[] serialize( int chunkSize, Message... msgs ) throws IOException
{
byte[][] serialized = new byte[msgs.length][];
for ( int i = 0; i < msgs.length; i++ )
{
RecordingByteChannel channel = new RecordingByteChannel();
format.reset( channel ).write( msgs[i] ).flush();
serialized[i] = channel.getBytes();
}
return Chunker.chunk( chunkSize, serialized );
}
}
Expand Up @@ -40,7 +40,7 @@

import static java.util.Arrays.asList;
import static org.neo4j.helpers.collection.MapUtil.map;
import static org.neo4j.ndp.docs.v1.Chunker.chunk;
import static org.neo4j.ndp.transport.socket.Chunker.chunk;

/**
* Takes human-readable value descriptions and packs them to binary data, and vice versa.
Expand Down Expand Up @@ -232,7 +232,7 @@ public static List<Message> unpackChunked( byte[] data ) throws Exception
int chunkSize = buf.readUnsignedShort();
if ( chunkSize > 0 )
{
input.addChunk( buf.readSlice( chunkSize ) );
input.append( buf.readSlice( chunkSize ) );
}
else
{
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.ArrayList;
import java.util.Collection;

import org.neo4j.ndp.transport.socket.Chunker;

import static java.lang.Integer.parseInt;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down

0 comments on commit e2a130e

Please sign in to comment.