Skip to content

Commit

Permalink
Resolve index-out-of-bounds error and several exception hiding issues…
Browse files Browse the repository at this point in the history
… in GAP.

- IOB was caused by a race between IO thread and worker thread on flushing.
  Flushing is now only ordered from the worker thread. The general behavior
  is covered by existing tests, the race condition convered by larger scale
  integration tests in quality tooling.
- Be more careful in several exception paths to log the exception before doing
  anything else
- Handle empty buffers in the input chunking
  • Loading branch information
jakewins committed Jul 28, 2015
1 parent fda66c3 commit 015e46c
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 102 deletions.
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.ndp.messaging.v1;

import java.io.IOException;

public interface MessageBoundaryHook
{
void onMessageComplete() throws IOException;
}
Expand Up @@ -96,23 +96,23 @@ static String messageTypeName( int type )


public static class Writer implements MessageFormat.Writer public static class Writer implements MessageFormat.Writer
{ {
public static final Runnable NO_OP = new Runnable() public static final MessageBoundaryHook NO_OP = new MessageBoundaryHook()
{ {
@Override @Override
public void run() public void onMessageComplete() throws IOException
{ {
// no-op
} }
}; };


private final PackStream.Packer packer; private final PackStream.Packer packer;
private final Runnable onMessageComplete; private final MessageBoundaryHook onMessageComplete;


/** /**
* @param packer serializer to output channel * @param packer serializer to output channel
* @param onMessageComplete invoked for each message, after it's done writing to the output * @param onMessageComplete invoked for each message, after it's done writing to the output
*/ */
public Writer( PackStream.Packer packer, Runnable onMessageComplete ) public Writer( PackStream.Packer packer, MessageBoundaryHook onMessageComplete )
{ {
this.packer = packer; this.packer = packer;
this.onMessageComplete = onMessageComplete; this.onMessageComplete = onMessageComplete;
Expand All @@ -132,30 +132,30 @@ public void handleRunMessage( String statement, Map<String,Object> params )
packer.packStructHeader( 2, MessageTypes.MSG_RUN ); packer.packStructHeader( 2, MessageTypes.MSG_RUN );
packer.pack( statement ); packer.pack( statement );
packRawMap( params ); packRawMap( params );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
public void handlePullAllMessage() public void handlePullAllMessage()
throws IOException throws IOException
{ {
packer.packStructHeader( 0, MessageTypes.MSG_PULL_ALL ); packer.packStructHeader( 0, MessageTypes.MSG_PULL_ALL );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
public void handleDiscardAllMessage() public void handleDiscardAllMessage()
throws IOException throws IOException
{ {
packer.packStructHeader( 0, MessageTypes.MSG_DISCARD_ALL ); packer.packStructHeader( 0, MessageTypes.MSG_DISCARD_ALL );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
public void handleAckFailureMessage() throws IOException public void handleAckFailureMessage() throws IOException
{ {
packer.packStructHeader( 0, MessageTypes.MSG_ACK_FAILURE ); packer.packStructHeader( 0, MessageTypes.MSG_ACK_FAILURE );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
Expand All @@ -169,7 +169,7 @@ public void handleRecordMessage( Record item )
{ {
packValue( field ); packValue( field );
} }
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
Expand All @@ -178,7 +178,7 @@ public void handleSuccessMessage( Map<String,Object> metadata )
{ {
packer.packStructHeader( 1, MessageTypes.MSG_SUCCESS ); packer.packStructHeader( 1, MessageTypes.MSG_SUCCESS );
packRawMap( metadata ); packRawMap( metadata );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
Expand All @@ -193,22 +193,23 @@ public void handleFailureMessage( Status status, String message )


packer.pack( "message" ); packer.pack( "message" );
packer.pack( message ); packer.pack( message );
onMessageComplete.run();
onMessageComplete.onMessageComplete();
} }


@Override @Override
public void handleIgnoredMessage() throws IOException public void handleIgnoredMessage() throws IOException
{ {
packer.packStructHeader( 0, MessageTypes.MSG_IGNORED ); packer.packStructHeader( 0, MessageTypes.MSG_IGNORED );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
public void handleInitializeMessage( String clientName ) throws IOException public void handleInitializeMessage( String clientName ) throws IOException
{ {
packer.packStructHeader( 1, MessageTypes.MSG_INITIALIZE ); packer.packStructHeader( 1, MessageTypes.MSG_INITIALIZE );
packer.pack( clientName ); packer.pack( clientName );
onMessageComplete.run(); onMessageComplete.onMessageComplete();
} }


@Override @Override
Expand Down
Expand Up @@ -193,7 +193,7 @@ private void ensure( int numBytes ) throws IOException


private void ensureChunkAvailable() throws IOException private void ensureChunkAvailable() throws IOException
{ {
if ( currentChunk == null || currentChunk.readableBytes() == 0 ) while ( currentChunk == null || currentChunk.readableBytes() == 0 )
{ {
currentChunkIndex++; currentChunkIndex++;
if ( currentChunkIndex < chunks.size() ) if ( currentChunkIndex < chunks.size() )
Expand Down
Expand Up @@ -25,11 +25,16 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import org.neo4j.ndp.messaging.v1.MessageBoundaryHook;
import org.neo4j.packstream.PackOutput; import org.neo4j.packstream.PackOutput;


import static java.lang.Math.max; import static java.lang.Math.max;


public class ChunkedOutput implements PackOutput /**
* A target output for {@link org.neo4j.packstream.PackStream} which breaks the data into a continuous stream of chunks before pushing them into a netty
* channel.
*/
public class ChunkedOutput implements PackOutput, MessageBoundaryHook
{ {
public static final int CHUNK_HEADER_SIZE = 2; public static final int CHUNK_HEADER_SIZE = 2;
public static final int MESSAGE_BOUNDARY = 0; public static final int MESSAGE_BOUNDARY = 0;
Expand All @@ -44,36 +49,6 @@ public class ChunkedOutput implements PackOutput
/** Are currently in the middle of writing a chunk? */ /** Are currently in the middle of writing a chunk? */
private boolean chunkOpen = false; private boolean chunkOpen = false;


private Runnable onMessageComplete = new Runnable()
{
@Override
public void run()
{
try
{
closeChunkIfOpen();

// Ensure there's space to write the message boundary
if ( buffer.writableBytes() < CHUNK_HEADER_SIZE )
{
flush();
}

// Write message boundary
buffer.writeShort( MESSAGE_BOUNDARY );

// Mark us as not currently in a chunk
chunkOpen = false;
}
catch ( IOException e )
{
// TODO: Don't use runnable here then, use something that can throw this IOException
throw new RuntimeException( e );
}

}
};

public ChunkedOutput( Channel ch, int bufferSize ) public ChunkedOutput( Channel ch, int bufferSize )
{ {
this.channel = ch; this.channel = ch;
Expand All @@ -88,7 +63,13 @@ public PackOutput flush() throws IOException
if ( buffer.readableBytes() > 0 ) if ( buffer.readableBytes() > 0 )
{ {
closeChunkIfOpen(); closeChunkIfOpen();
channel.writeAndFlush( buffer, channel.voidPromise() );
// Local copy and clear the buffer field. This ensures that the buffer is not re-released if the flush call fails
ByteBuf out = this.buffer;
this.buffer = null;

channel.writeAndFlush( out, channel.voidPromise() );

newBuffer(); newBuffer();
} }
return this; return this;
Expand Down Expand Up @@ -139,6 +120,7 @@ public PackOutput writeBytes( ByteBuffer data ) throws IOException
{ {
// TODO: If data is larger than our chunk size or so, we're very likely better off just passing this ByteBuffer on rather than doing the copy here // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this ByteBuffer on rather than doing the copy here
// TODO: *however* note that we need some way to find out when the data has been written (and thus the buffer can be re-used) if we take that approach // TODO: *however* note that we need some way to find out when the data has been written (and thus the buffer can be re-used) if we take that approach
// See the comment in #newBuffer for an approach that would allow that
while ( data.remaining() > 0 ) while ( data.remaining() > 0 )
{ {
// Ensure there is an open chunk, and that it has at least one byte of space left // Ensure there is an open chunk, and that it has at least one byte of space left
Expand Down Expand Up @@ -196,15 +178,13 @@ private void closeChunkIfOpen()
private void newBuffer() private void newBuffer()
{ {
// Assumption: We're using nettys buffer pooling here // Assumption: We're using nettys buffer pooling here
// If we wanted to, we can optimize this further and restrict memory usage by using our own ByteBuf impl. Each Output instance would have, say, 3
// buffers that it rotates. Fill one up, send it to be async flushed, fill the next one up, etc. When release is called by Netty, push buffer back
// onto our local stack. That way there are no global data structures for managing memory, no fragmentation and a fixed amount of RAM per session used.
buffer = channel.alloc().buffer( bufferSize, bufferSize ); buffer = channel.alloc().buffer( bufferSize, bufferSize );
chunkOpen = false; chunkOpen = false;
} }


public Runnable messageBoundaryHook()
{
return onMessageComplete;
}

public void close() public void close()
{ {
if(buffer != null) if(buffer != null)
Expand All @@ -213,4 +193,22 @@ public void close()
buffer = null; buffer = null;
} }
} }

@Override
public void onMessageComplete() throws IOException
{
closeChunkIfOpen();

// Ensure there's space to write the message boundary
if ( buffer.writableBytes() < CHUNK_HEADER_SIZE )
{
flush();
}

// Write message boundary
buffer.writeShort( MESSAGE_BOUNDARY );

// Mark us as not currently in a chunk
chunkOpen = false;
}
} }

0 comments on commit 015e46c

Please sign in to comment.