Skip to content

Commit

Permalink
Pack full messages before sending in Bolt server
Browse files Browse the repository at this point in the history
Previously Bolt server wrote messages (like RECORD) directly into the
outgoing buffer and flushed it when size limit was reached. This could
result in half-written messages be send back to the client when packing
failed. Observer failure scenario: transaction terminated while node is
being streamed back to the client. This caused Bolt server to fail
while reading node's labels/properties but after RECORD and NODE struct
headers have already been written. Next, server tried to stream FAILURE
back while resulting buffer already contained rouge struct headers.
This caused client to misinterpret the response and fail with protocol
violation errors.

This commit fixes the problem by making Bolt server write full messages
to the buffer before flushing it. Buffer can also be truncated when
message serialization fails half-way. So that all rogue bytes are
removed. Buffer will only be written to the network channel when it
contains a complete message.

`BoltProceduresIT` is the integration test that reproduces the described
failure scenario.
  • Loading branch information
lutovich committed Mar 22, 2018
1 parent 354805b commit 202a8a9
Show file tree
Hide file tree
Showing 21 changed files with 1,380 additions and 398 deletions.

This file was deleted.

Expand Up @@ -22,8 +22,12 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.bolt.logging.BoltMessageLogger; import org.neo4j.bolt.logging.BoltMessageLogger;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.cypher.result.QueryResult; import org.neo4j.cypher.result.QueryResult;
import org.neo4j.function.ThrowingAction;
import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
import org.neo4j.values.AnyValue; import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue; import org.neo4j.values.virtual.MapValue;


Expand All @@ -37,80 +41,73 @@
*/ */
public class BoltResponseMessageWriter implements BoltResponseMessageHandler<IOException> public class BoltResponseMessageWriter implements BoltResponseMessageHandler<IOException>
{ {
public static final BoltResponseMessageBoundaryHook NO_BOUNDARY_HOOK = () -> private final PackOutput output;
{
};

private final Neo4jPack.Packer packer; private final Neo4jPack.Packer packer;
private final BoltResponseMessageBoundaryHook onMessageComplete;
private final BoltMessageLogger messageLogger; private final BoltMessageLogger messageLogger;
private final Log log;


/** public BoltResponseMessageWriter( Neo4jPack neo4jPack, PackOutput output, LogService logService, BoltMessageLogger messageLogger )
* @param packer serializer to output channel
* @param onMessageComplete invoked for each message, after it's done writing to the output
* @param messageLogger logger for Bolt messages
*/
public BoltResponseMessageWriter( Neo4jPack.Packer packer, BoltResponseMessageBoundaryHook onMessageComplete,
BoltMessageLogger messageLogger )
{ {
this.packer = packer; this.output = output;
this.onMessageComplete = onMessageComplete; this.packer = neo4jPack.newPacker( output );
this.messageLogger = messageLogger; this.messageLogger = messageLogger;
this.log = logService.getInternalLog( getClass() );
} }


@Override @Override
public void onRecord( QueryResult.Record item ) throws IOException public void onRecord( QueryResult.Record item ) throws IOException
{ {
AnyValue[] fields = item.fields(); packCompleteMessageOrFail( RECORD, () ->
packer.packStructHeader( 1, RECORD.signature() );
packer.packListHeader( fields.length );
for ( AnyValue field : fields )
{
packer.pack( field );
}
onMessageComplete.onMessageComplete();

//The record might contain unpackable values,
//hence we must consume any errors that might
//have occurred.
IOException error = packer.consumeError();
if ( error != null )
{ {
throw error; AnyValue[] fields = item.fields();
} packer.packStructHeader( 1, RECORD.signature() );
packer.packListHeader( fields.length );
for ( AnyValue field : fields )
{
packer.pack( field );
}
} );
} }


@Override @Override
public void onSuccess( MapValue metadata ) throws IOException public void onSuccess( MapValue metadata ) throws IOException
{ {
packCompleteMessageOrFail( SUCCESS, () ->
{
packer.packStructHeader( 1, SUCCESS.signature() );
packer.pack( metadata );
} );

messageLogger.logSuccess( () -> metadata ); messageLogger.logSuccess( () -> metadata );
packer.packStructHeader( 1, SUCCESS.signature() );
packer.pack( metadata );
onMessageComplete.onMessageComplete();
} }


@Override @Override
public void onIgnored() throws IOException public void onIgnored() throws IOException
{ {
packCompleteMessageOrFail( IGNORED, () ->
{
packer.packStructHeader( 0, IGNORED.signature() );
} );

messageLogger.logIgnored(); messageLogger.logIgnored();
packer.packStructHeader( 0, IGNORED.signature() );
onMessageComplete.onMessageComplete();
} }


@Override @Override
public void onFailure( Status status, String errorMessage ) throws IOException public void onFailure( Status status, String errorMessage ) throws IOException
{ {
messageLogger.logFailure( status ); packCompleteMessageOrFail( FAILURE, () ->
packer.packStructHeader( 1, FAILURE.signature() ); {
packer.packMapHeader( 2 ); packer.packStructHeader( 1, FAILURE.signature() );
packer.packMapHeader( 2 );


packer.pack( "code" ); packer.pack( "code" );
packer.pack( status.code().serialize() ); packer.pack( status.code().serialize() );


packer.pack( "message" ); packer.pack( "message" );
packer.pack( errorMessage ); packer.pack( errorMessage );
} );


onMessageComplete.onMessageComplete(); messageLogger.logFailure( status );
} }


@Override @Override
Expand All @@ -125,4 +122,27 @@ public void flush() throws IOException
{ {
packer.flush(); packer.flush();
} }

private void packCompleteMessageOrFail( BoltResponseMessage message, ThrowingAction<IOException> action ) throws IOException
{
boolean packingFailed = true;
output.beginMessage();
try
{
action.apply();
packingFailed = false;
output.messageSucceeded();
}
catch ( Throwable error )
{
if ( packingFailed )
{
// packing failed, there might be some half-written data in the output buffer right now
// notify output about the failure so that it cleans up the buffer
output.messageFailed();
log.error( "Failed to write full %s message because: %s", message, error.getMessage() );
}
throw error;
}
}
} }
Expand Up @@ -45,8 +45,6 @@ interface Packer


void packListHeader( int size ) throws IOException; void packListHeader( int size ) throws IOException;


IOException consumeError();

void flush() throws IOException; void flush() throws IOException;
} }


Expand Down
Expand Up @@ -86,7 +86,6 @@ public String toString()


protected static class PackerV1 extends PackStream.Packer implements AnyValueWriter<IOException>, Neo4jPack.Packer protected static class PackerV1 extends PackStream.Packer implements AnyValueWriter<IOException>, Neo4jPack.Packer
{ {
private Error error;
private static final int INITIAL_PATH_CAPACITY = 500; private static final int INITIAL_PATH_CAPACITY = 500;
private static final int NO_SUCH_ID = -1; private static final int NO_SUCH_ID = -1;
private final PrimitiveLongIntKeyValueArray nodeIndexes = private final PrimitiveLongIntKeyValueArray nodeIndexes =
Expand All @@ -105,18 +104,6 @@ public void pack( AnyValue value ) throws IOException
value.writeTo( this ); value.writeTo( this );
} }


@Override
public IOException consumeError()
{
if ( error != null )
{
IOException exception = new BoltIOException( error.status(), error.msg() );
error = null;
return exception;
}
return null;
}

@Override @Override
public void writeNodeReference( long nodeId ) public void writeNodeReference( long nodeId )
{ {
Expand Down Expand Up @@ -302,65 +289,49 @@ private void writeRelationshipsForPath( RelationshipValue[] relationships ) thro
@Override @Override
public void writePoint( CoordinateReferenceSystem crs, double[] coordinate ) throws IOException public void writePoint( CoordinateReferenceSystem crs, double[] coordinate ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "Point is not yet supported as a return type in Bolt" );
"Point is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeDuration( long months, long days, long seconds, int nanos ) throws IOException public void writeDuration( long months, long days, long seconds, int nanos ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "Duration is not yet supported as a return type in Bolt" );
"Duration is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeDate( long epochDay ) throws IOException public void writeDate( long epochDay ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "Date is not yet supported as a return type in Bolt" );
"Date is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeLocalTime( long nanoOfDay ) throws IOException public void writeLocalTime( long nanoOfDay ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "LocalTime is not yet supported as a return type in Bolt" );
"LocalTime is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeTime( long nanosOfDayLocal, int offsetSeconds ) throws IOException public void writeTime( long nanosOfDayLocal, int offsetSeconds ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "Time is not yet supported as a return type in Bolt" );
"Time is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeLocalDateTime( long epochSecond, int nano ) throws IOException public void writeLocalDateTime( long epochSecond, int nano ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "LocalDateTime is not yet supported as a return type in Bolt" );
"LocalDateTime is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeDateTime( long epochSecondUTC, int nano, int offsetSeconds ) throws IOException public void writeDateTime( long epochSecondUTC, int nano, int offsetSeconds ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "DateTime is not yet supported as a return type in Bolt" );
"DateTime is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
public void writeDateTime( long epochSecondUTC, int nano, String zoneId ) throws IOException public void writeDateTime( long epochSecondUTC, int nano, String zoneId ) throws IOException
{ {
error = new Error( Status.Request.Invalid, throw new BoltIOException( Status.Request.Invalid, "DateTime is not yet supported as a return type in Bolt" );
"DateTime is not yet supported as a return type in Bolt" );
packNull();
} }


@Override @Override
Expand Down Expand Up @@ -658,26 +629,4 @@ public Neo4jError consumeError()
return error; return error;
} }
} }

private static class Error
{
private final Status status;
private final String msg;

private Error( Status status, String msg )
{
this.status = status;
this.msg = msg;
}

Status status()
{
return status;
}

String msg()
{
return msg;
}
}
} }
Expand Up @@ -27,6 +27,26 @@
*/ */
public interface PackOutput public interface PackOutput
{ {
/**
* Prepare this output to write a message. Later successful message should be signaled by {@link #messageSucceeded()}
* and failed message by {@link #messageFailed()};
*/
void beginMessage();

/**
* Finalize previously started message.
*
* @throws IOException when message can't be written to the network channel.
*/
void messageSucceeded() throws IOException;

/**
* Discard previously started message.
*
* @throws IOException when message can't be written to the network channel.
*/
void messageFailed() throws IOException;

/** If implementation has been buffering data, it should flush those buffers now. */ /** If implementation has been buffering data, it should flush those buffers now. */
PackOutput flush() throws IOException; PackOutput flush() throws IOException;


Expand Down
Expand Up @@ -61,8 +61,7 @@ public BoltMessagingProtocolHandlerImpl( BoltChannel boltChannel, BoltConnection
{ {
this.neo4jPack = neo4jPack; this.neo4jPack = neo4jPack;
this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup ); this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup );
this.packer = new BoltResponseMessageWriter( this.packer = new BoltResponseMessageWriter( neo4jPack, chunkedOutput, logging, boltChannel.log() );
neo4jPack.newPacker( chunkedOutput ), chunkedOutput, boltChannel.log() );
this.connection = connection; this.connection = connection;
this.internalLog = logging.getInternalLog( getClass() ); this.internalLog = logging.getInternalLog( getClass() );
this.dechunker = new BoltV1Dechunker( this.dechunker = new BoltV1Dechunker(
Expand Down

0 comments on commit 202a8a9

Please sign in to comment.