From e2a130ea3e5f44d1d3c0ae1b5adfe076ec3793bc Mon Sep 17 00:00:00 2001 From: Jacob Hansson Date: Wed, 20 May 2015 20:02:35 -0500 Subject: [PATCH] Added network fragmentation test & fixed fragmentation bug No matter how data are fragmentated and sent to server, the server should always append them in proper order into buffer. --- .../ndp/transport/socket/ChunkedInput.java | 2 +- .../transport/socket/SocketProtocolV1.java | 17 +- .../transport/socket/ChunkedInputTest.java | 22 +-- .../neo4j/ndp/transport/socket}/Chunker.java | 2 +- .../socket/FragmentedMessageDeliveryTest.java | 157 ++++++++++++++++++ .../neo4j/ndp/docs/v1/DocSerialization.java | 4 +- .../docs/v1/NDPTransportChunkingDocTest.java | 2 + 7 files changed, 186 insertions(+), 20 deletions(-) rename community/ndp/{v1-docs/src/test/java/org/neo4j/ndp/docs/v1 => transport-socket/src/test/java/org/neo4j/ndp/transport/socket}/Chunker.java (98%) create mode 100644 community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/FragmentedMessageDeliveryTest.java diff --git a/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/ChunkedInput.java b/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/ChunkedInput.java index 195a06fa09b6e..ae9afb4789355 100644 --- a/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/ChunkedInput.java +++ b/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/ChunkedInput.java @@ -61,7 +61,7 @@ public ChunkedInput clear() return this; } - public void addChunk( ByteBuf chunk ) + public void append( ByteBuf chunk ) { if ( chunk.readableBytes() > 0 ) { diff --git a/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/SocketProtocolV1.java b/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/SocketProtocolV1.java index a603f9ca577be..a98cf8cd4ec92 100644 --- a/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/SocketProtocolV1.java +++ b/community/ndp/transport-socket/src/main/java/org/neo4j/ndp/transport/socket/SocketProtocolV1.java @@ -54,7 +54,7 @@ public class SocketProtocolV1 implements SocketProtocol private final Log log; private final AtomicInteger inFlight = new AtomicInteger( 0 ); - enum State + public enum State { AWAITING_CHUNK, IN_CHUNK, @@ -127,20 +127,22 @@ public void handle( ChannelHandlerContext channelContext, ByteBuf data ) if ( chunkSize < data.readableBytes() ) { // Current packet is larger than current chunk, slice of the chunk - input.addChunk( data.readSlice( chunkSize ) ); + input.append( data.readSlice( chunkSize ) ); state = State.AWAITING_CHUNK; } else if ( chunkSize == data.readableBytes() ) { // Current packet perfectly maps to current chunk - input.addChunk( data ); + input.append( data ); state = State.AWAITING_CHUNK; + return; } else { // Current packet is smaller than the chunk we're reading, split the current chunk itself up chunkSize -= data.readableBytes(); - input.addChunk( data ); + input.append( data ); + return; } break; } @@ -174,6 +176,11 @@ public void close() session.close(); } + public State state() + { + return state; + } + private void handleHeader( ChannelHandlerContext channelContext ) { if(chunkSize == 0) @@ -188,7 +195,7 @@ private void handleHeader( ChannelHandlerContext channelContext ) } } - public void processCollectedMessage( final ChannelHandlerContext channelContext ) + private void processCollectedMessage( final ChannelHandlerContext channelContext ) { output.setTargetChannel( channelContext ); try diff --git a/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/ChunkedInputTest.java b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/ChunkedInputTest.java index ef2ef168b6ac3..7d353eaf840ce 100644 --- a/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/ChunkedInputTest.java +++ b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/ChunkedInputTest.java @@ -35,9 +35,9 @@ public void shouldExposeMultipleChunksAsCohesiveStream() throws Throwable // Given ChunkedInput ch = new ChunkedInput(); - ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) ); - ch.addChunk( wrappedBuffer( new byte[]{3} ) ); - ch.addChunk( wrappedBuffer( new byte[]{4, 5} ) ); + ch.append( wrappedBuffer( new byte[]{1, 2} ) ); + ch.append( wrappedBuffer( new byte[]{3} ) ); + ch.append( wrappedBuffer( new byte[]{4, 5} ) ); // When byte[] bytes = new byte[5]; @@ -54,11 +54,11 @@ public void shouldReadIntoMisalignedDestinationBuffer() throws Throwable byte[] bytes = new byte[3]; ChunkedInput ch = new ChunkedInput(); - ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) ); - ch.addChunk( wrappedBuffer( new byte[]{3} ) ); - ch.addChunk( wrappedBuffer( new byte[]{4} ) ); - ch.addChunk( wrappedBuffer( new byte[]{5} ) ); - ch.addChunk( wrappedBuffer( new byte[]{6, 7} ) ); + ch.append( wrappedBuffer( new byte[]{1, 2} ) ); + ch.append( wrappedBuffer( new byte[]{3} ) ); + ch.append( wrappedBuffer( new byte[]{4} ) ); + ch.append( wrappedBuffer( new byte[]{5} ) ); + ch.append( wrappedBuffer( new byte[]{6, 7} ) ); // When I read {1,2,3} ch.get( bytes, 0, 3 ); @@ -88,9 +88,9 @@ public void shouldReadPartialChunk() throws Throwable // Given ChunkedInput ch = new ChunkedInput(); - ch.addChunk( wrappedBuffer( new byte[]{1, 2} ) ); - ch.addChunk( wrappedBuffer( new byte[]{3} ) ); - ch.addChunk( wrappedBuffer( new byte[]{4, 5} ) ); + ch.append( wrappedBuffer( new byte[]{1, 2} ) ); + ch.append( wrappedBuffer( new byte[]{3} ) ); + ch.append( wrappedBuffer( new byte[]{4, 5} ) ); // When byte[] bytes = new byte[5]; diff --git a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/Chunker.java b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/Chunker.java similarity index 98% rename from community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/Chunker.java rename to community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/Chunker.java index 1c9c5aabb52c6..177f6dc7d56b9 100644 --- a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/Chunker.java +++ b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/Chunker.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -package org.neo4j.ndp.docs.v1; +package org.neo4j.ndp.transport.socket; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; diff --git a/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/FragmentedMessageDeliveryTest.java b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/FragmentedMessageDeliveryTest.java new file mode 100644 index 0000000000000..c7a5746d979d0 --- /dev/null +++ b/community/ndp/transport-socket/src/test/java/org/neo4j/ndp/transport/socket/FragmentedMessageDeliveryTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.ndp.transport.socket; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.neo4j.kernel.impl.util.HexPrinter; +import org.neo4j.logging.NullLog; +import org.neo4j.ndp.messaging.v1.PackStreamMessageFormatV1; +import org.neo4j.ndp.messaging.v1.RecordingByteChannel; +import org.neo4j.ndp.messaging.v1.message.Message; +import org.neo4j.ndp.runtime.Session; + +import static io.netty.buffer.Unpooled.wrappedBuffer; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.neo4j.ndp.messaging.v1.message.Messages.run; +import static org.neo4j.ndp.transport.socket.SocketProtocolV1.State.AWAITING_CHUNK; + +/** + * This tests network fragmentation of messages. Given a set of messages, it will serialize and chunk the message up + * to a specified chunk size. Then it will split that data into a specified number of fragments, trying every possible + * permutation of fragment sizes for the specified number. For instance, assuming an unfragmented message size of 15, + * and a fragment count of 3, it will create fragment size permutations like: + *

+ * [1,1,13] + * [1,2,12] + * [1,3,11] + * .. + * [12,1,1] + *

+ * For each permutation, it delivers the fragments to the protocol implementation, and asserts the protocol handled + * them properly. + */ +public class FragmentedMessageDeliveryTest +{ + // Only test one chunk size for now, this can be parameterized to test lots of different ones + private int chunkSize = 16; + + // Only test messages broken into three fragments for now, this can be parameterized later + private int numFragments = 3; + + // Only test one message for now. This can be parameterized later to test lots of different ones + private Message[] messages = new Message[]{run( "Mjölnir" )}; + + private PackStreamMessageFormatV1.Writer format = new PackStreamMessageFormatV1.Writer(); + + @Test + public void testFragmentedMessageDelivery() throws Throwable + { + // Given + byte[] unfragmented = serialize( chunkSize, messages ); + + // When & Then + int n = unfragmented.length; + for ( int i = 1; i < n - 1; i++ ) + { + for ( int j = 1; j < n - i; j++ ) + { + testPermutation( unfragmented, i, j, n - i - j ); + } + } + } + + private void testPermutation( byte[] unfragmented, int... sizes ) + { + int pos = 0; + ByteBuf[] fragments = new ByteBuf[sizes.length]; + for ( int i = 0; i < sizes.length; i++ ) + { + fragments[i] = wrappedBuffer( unfragmented, pos, sizes[i] ); + pos += sizes[i]; + } + testPermutation( unfragmented, fragments ); + } + + private void testPermutation( byte[] unfragmented, ByteBuf[] fragments ) + { + // Given + // System.out.println( "Testing fragmentation:" + describeFragments( fragments ) ); + Session sess = mock( Session.class ); + ChannelHandlerContext ch = mock( ChannelHandlerContext.class ); + SocketProtocolV1 protocol = new SocketProtocolV1( NullLog.getInstance(), sess ); + + // When data arrives split up according to the current permutation + for ( ByteBuf fragment : fragments ) + { + fragment.readerIndex( 0 ).retain(); + protocol.handle( ch, fragment ); + } + + // Then the session should've received the specified messages, and the protocol should be in a nice clean state + try + { + assertEquals( AWAITING_CHUNK, protocol.state() ); + verify( sess ).run( eq( "Mjölnir" ), any( Map.class ), any(), any( Session.Callback.class ) ); + } + catch ( AssertionError e ) + { + throw new AssertionError( "Failed to handle fragmented delivery.\n" + + "Messages: " + Arrays.toString( messages ) + "\n" + + "Chunk size: " + chunkSize + "\n" + + "Serialized data delivered in fragments: " + describeFragments( fragments ) + + "\n" + + "Unfragmented data: " + HexPrinter.hex( unfragmented ) + "\n", e ); + } + } + + private String describeFragments( ByteBuf[] fragments ) + { + StringBuilder sb = new StringBuilder(); + for ( int i = 0; i < fragments.length; i++ ) + { + if ( i > 0 ) { sb.append( "," ); } + sb.append( fragments[i].capacity() ); + } + return sb.toString(); + } + + private byte[] serialize( int chunkSize, Message... msgs ) throws IOException + { + byte[][] serialized = new byte[msgs.length][]; + for ( int i = 0; i < msgs.length; i++ ) + { + RecordingByteChannel channel = new RecordingByteChannel(); + format.reset( channel ).write( msgs[i] ).flush(); + serialized[i] = channel.getBytes(); + } + return Chunker.chunk( chunkSize, serialized ); + } +} \ No newline at end of file diff --git a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/DocSerialization.java b/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/DocSerialization.java index 5aa82376bc82b..bcb00c89f490f 100644 --- a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/DocSerialization.java +++ b/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/DocSerialization.java @@ -40,7 +40,7 @@ import static java.util.Arrays.asList; import static org.neo4j.helpers.collection.MapUtil.map; -import static org.neo4j.ndp.docs.v1.Chunker.chunk; +import static org.neo4j.ndp.transport.socket.Chunker.chunk; /** * Takes human-readable value descriptions and packs them to binary data, and vice versa. @@ -232,7 +232,7 @@ public static List unpackChunked( byte[] data ) throws Exception int chunkSize = buf.readUnsignedShort(); if ( chunkSize > 0 ) { - input.addChunk( buf.readSlice( chunkSize ) ); + input.append( buf.readSlice( chunkSize ) ); } else { diff --git a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/NDPTransportChunkingDocTest.java b/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/NDPTransportChunkingDocTest.java index 4bd4f959a40b7..fabfcd65316dd 100644 --- a/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/NDPTransportChunkingDocTest.java +++ b/community/ndp/v1-docs/src/test/java/org/neo4j/ndp/docs/v1/NDPTransportChunkingDocTest.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.Collection; +import org.neo4j.ndp.transport.socket.Chunker; + import static java.lang.Integer.parseInt; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat;