Skip to content

Commit

Permalink
Merge pull request #6184 from nigelsmall/3.0-bolt-reset
Browse files Browse the repository at this point in the history
3.0 bolt reset
  • Loading branch information
jakewins committed Jan 7, 2016
2 parents 307af28 + d6504bf commit 298d531
Show file tree
Hide file tree
Showing 22 changed files with 359 additions and 165 deletions.
47 changes: 44 additions & 3 deletions community/bolt/src/docs/dev/examples.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ Server: SUCCESS { "type": "r" }

=== Error handling

This illustrates how the server behaves when a request fails, and the server ignores incoming messages until an
`ACK_FAILURE` message is received.
This illustrates how the server behaves when a request fails, and shows how the server ignores incoming messages until a `RESET` message is received.

.Error handling
[source,bolt_exchange]
Expand Down Expand Up @@ -180,7 +179,7 @@ Server: IGNORED
# Until the error is acknowledged
Client: ACK_FAILURE
Client: RESET
00 02 b0 0f 00 00
Expand Down Expand Up @@ -587,3 +586,45 @@ Server: SUCCESS {
6E A3 86 6F 66 66 73 65 74 00 86 63 6F 6C 75 6D
6E 01 84 6C 69 6E 65 01 00 00
----

=== Resetting the session

This illustrates how to reset the session to a "clean" state.

.Resetting
[source,bolt_exchange]
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Client: INIT "MyClient/1.0"
00 0F B1 01 8C 4D 79 43 6C 69 65 6E 74 2F 31 2E 30 00 00
Server: SUCCESS {}
00 03 b1 70 a0 00 00
# Batch of messages
Client: RUN "RETURN 1 AS num" {}
00 13 b2 10 8f 52 45 54 55 52 4e 20 31 20 41 53
20 6e 75 6d a0 00 00
# Server responses
Server: SUCCESS { "fields": ["num"] }
00 0f b1 70 a1 86 66 69 65 6c 64 73 91 83 6e 75
6d 00 00
Client: RESET {}
00 02 b0 0f 00 00
Server: SUCCESS {}
00 03 b1 70 a0 00 00
----
Binary file removed community/bolt/src/docs/dev/images/failure-ack.png
Binary file not shown.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
67 changes: 34 additions & 33 deletions community/bolt/src/docs/dev/messaging.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ Because the protocol leverages pipelining, the client and the server need to agr
occurs, otherwise messages that were sent assuming no failure would occur might have unintended effects.

When requests fail on the server, the server will send the client a `FAILURE` message.
The client must acknowledge the `FAILURE` message by sending an `ACK_FAILURE` message to the server.
Until the server receives the `ACK_FAILURE` message, it will send an `IGNORED` message in response to any other message from by the client, including messages that were sent in a pipeline.
The client must acknowledge the `FAILURE` message by sending an `RESET` message to the server.
Until the server receives the `RESET` message, it will send an `IGNORED` message in response to any other message from by the client, including messages that were sent in a pipeline.
The `RESET` clears the pending failure state, disposes of any outstanding records and rolls back the current transaction (if any).

The diagram below illustrates a typical flow involving `ACK_FAILURE` messages:
The diagram below illustrates a typical flow involving `RESET` messages:

image:failure-ack.png[]
image:failure-reset.png[]

Here, the original failure is acknowledged immediately by the client, allowing the subsequent RUN to be actioned as expected.

Expand Down Expand Up @@ -104,6 +105,34 @@ B1 01 8C 4D 79 43 6C 69 65 6E 74 2F 31 2E 30
|clientName |A name and version for the client, if the user has allowed usage data collection, this is used to track popular clients. Example: "MyClient/1.0"
|=======================

[[bolt-message-structs-reset]]
==== RESET

The `RESET` message is a client message used to return the current session to a "clean" state.
The following actions are performed by `RESET`:

- clear any outstanding `FAILURE` state
- dispose of any outstanding result records
- rollback the current transaction (if any)

[source,bolt_message_struct]
----
ResetMessage (signature=0x0F) {
}
----

.Response
- `SUCCESS {}` if the session was successfully reset
- `FAILURE {"code": ..., "message": ...}` if a reset is not currently possible

.Example
[source,bolt_packstream_type]
----
Value: RESET
B0 0F
----

[[bolt-message-structs-run]]
==== RUN

Expand Down Expand Up @@ -200,34 +229,6 @@ Value: PULL_ALL
B0 3F
----

==== ACK_FAILURE

The `ACK_FAILURE` message is a client message used to signal that a client has acknowledged a previous `FAILURE`
. It has the following structure:

[source,bolt_message_struct]
----
AcknowledgeFailureMessage (signature=0x0F) {
}
----

On receipt of an `ACK_FAILURE` message, the server will clear any pending failure state and respond with a single `SUCCESS` message.
If no such failure state is pending, a FAILURE message will be sent instead.

An `ACK_FAILURE` will never be ignored by the server.

.Response
- `SUCCESS {}` if a failure has been successfully acknowledged
- `FAILURE {"code": ..., "message": ...}` if there is no outstanding failure that requires acknowledgement

.Example
[source,bolt_packstream_type]
----
Value: ACK_FAILURE
B0 0F
----

==== RECORD

The `RECORD` message is a server detail message used to deliver data from the server to the client.
Expand Down Expand Up @@ -316,7 +317,7 @@ IgnoredMessage (signature=0x7E) {
}
----

A client message will be ignored if an earlier failure has not yet been acknowledged by the client via an `ACK_FAILURE` message.
A client message will be ignored if an earlier failure has not yet been acknowledged by the client via a `RESET` message.
For example, this will occur if the client optimistically sends a group of messages, one of which fails during execution: all subsequent messages in that group will then be ignored.
Note that the original `PULL_ALL` message was never processed by the server.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public interface MessageHandler<E extends Exception>

void handleDiscardAllMessage() throws E;

void handleAckFailureMessage() throws E;

void handleRecordMessage( Record item ) throws E;

void handleSuccessMessage( Map<String,Object> metadata ) throws E;
Expand All @@ -44,6 +42,8 @@ public interface MessageHandler<E extends Exception>

void handleInitMessage( String clientName ) throws E;

void handleResetMessage() throws E;

class Adapter<E extends Exception> implements MessageHandler<E>
{
@Override
Expand All @@ -64,12 +64,6 @@ public void handleDiscardAllMessage() throws E

}

@Override
public void handleAckFailureMessage() throws E
{

}

@Override
public void handleRecordMessage( Record item ) throws E
{
Expand Down Expand Up @@ -99,5 +93,11 @@ public void handleInitMessage( String clientName ) throws E
{

}

@Override
public void handleResetMessage() throws E
{

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public int version()
public interface MessageTypes
{
byte MSG_INIT = 0x01;
byte MSG_ACK_FAILURE = 0x0F;
byte MSG_RESET = 0x0F;
byte MSG_RUN = 0x10;
byte MSG_DISCARD_ALL = 0x2F;
byte MSG_PULL_ALL = 0x3F;
Expand All @@ -58,7 +58,8 @@ static String messageTypeName( int type )
{
switch( type )
{
case MessageTypes.MSG_ACK_FAILURE: return "MSG_ACK_FAILURE";
case MessageTypes.MSG_INIT: return "MSG_INIT";
case MessageTypes.MSG_RESET: return "MSG_RESET";
case MessageTypes.MSG_RUN: return "MSG_RUN";
case MessageTypes.MSG_DISCARD_ALL: return "MSG_DISCARD_ALL";
case MessageTypes.MSG_PULL_ALL: return "MSG_PULL_ALL";
Expand Down Expand Up @@ -127,13 +128,6 @@ public void handleDiscardAllMessage()
onMessageComplete.onMessageComplete();
}

@Override
public void handleAckFailureMessage() throws IOException
{
packer.packStructHeader( 0, MessageTypes.MSG_ACK_FAILURE );
onMessageComplete.onMessageComplete();
}

@Override
public void handleRecordMessage( Record item )
throws IOException
Expand Down Expand Up @@ -188,6 +182,13 @@ public void handleInitMessage( String clientName ) throws IOException
onMessageComplete.onMessageComplete();
}

@Override
public void handleResetMessage() throws IOException
{
packer.packStructHeader( 0, MessageTypes.MSG_RESET );
onMessageComplete.onMessageComplete();
}

@Override
public void flush() throws IOException
{
Expand Down Expand Up @@ -244,15 +245,15 @@ public <E extends Exception> void read( MessageHandler<E> output ) throws IOExce
case MessageTypes.MSG_FAILURE:
unpackFailureMessage( output );
break;
case MessageTypes.MSG_ACK_FAILURE:
unpackAckFailureMessage( output );
break;
case MessageTypes.MSG_IGNORED:
unpackIgnoredMessage( output );
break;
case MessageTypes.MSG_INIT:
unpackInitMessage( output );
break;
case MessageTypes.MSG_RESET:
unpackResetMessage( output );
break;
default:
throw new BoltIOException( Status.Request.Invalid,
"0x" + Integer.toHexString(type) + " is not a valid message type." );
Expand All @@ -272,12 +273,6 @@ public <E extends Exception> void read( MessageHandler<E> output ) throws IOExce
}
}

private <E extends Exception> void unpackAckFailureMessage( MessageHandler<E> output )
throws E
{
output.handleAckFailureMessage();
}

private <E extends Exception> void unpackSuccessMessage( MessageHandler<E> output )
throws E, IOException
{
Expand Down Expand Up @@ -345,5 +340,10 @@ private <E extends Exception> void unpackInitMessage( MessageHandler<E> output )
output.handleInitMessage( clientName );
}

private <E extends Exception> void unpackResetMessage( MessageHandler<E> output ) throws IOException, E
{
output.handleResetMessage();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ public class Messages
{
private static final PullAllMessage PULL_ALL = new PullAllMessage();
private static final DiscardAllMessage DISCARD_ALL = new DiscardAllMessage();
private static final AcknowledgeFailureMessage ACK_FAILURE = new AcknowledgeFailureMessage();
private static final SuccessMessage SUCCESS = new SuccessMessage( Collections.EMPTY_MAP );

public static Message reset()
{
return new ResetMessage();
}

public static Message run( String statement )
{
return new RunMessage( statement );
Expand All @@ -56,11 +60,6 @@ public static Message discardAll()
return DISCARD_ALL;
}

public static Message ackFailure()
{
return ACK_FAILURE;
}

public static Message record( Record value )
{
return new RecordMessage( value );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,22 @@

import org.neo4j.bolt.v1.messaging.MessageHandler;

public class AcknowledgeFailureMessage implements Message
public class ResetMessage implements Message
{
public ResetMessage()
{
}

@Override
public <E extends Exception> void dispatch( MessageHandler<E> consumer ) throws E
{
consumer.handleAckFailureMessage();
consumer.handleResetMessage();
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
return !(o == null || getClass() != o.getClass());

return this == o || !(o == null || getClass() != o.getClass());
}

@Override
Expand All @@ -49,6 +48,6 @@ public int hashCode()
@Override
public String toString()
{
return "AcknowledgeFailureMessage{}";
return "ResetMessage{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public void handleDiscardAllMessage()
}

@Override
public void handleAckFailureMessage() throws RuntimeException
public void handleResetMessage() throws RuntimeException
{
session.acknowledgeFailure( null, simpleCallback );
session.reset( null, simpleCallback );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,10 @@ public static <V, A> Callback<V,A> noop()
<A> void discardAll( A attachment, Callback<Void,A> callback );

/**
* Whenever an error has occurred, all incoming requests will be ignored until the error is acknowledged through
* this method. The point of this is that we can do pipelining, sending multiple requests in one go and
* optimistically assuming they will succeed. If any of them fail all subsequent requests are declined until the
* client has acknowledged it has seen the error and has taken it into account for upcoming requests.
* <p/>
* Whenever an error has been acknowledged, the session will revert back to its intial state. Any ongoing
* statements
* or transactions will have been rolled back and/or disposed of.
* Reset the session to an IDLE state. This clears any outstanding failure condition, disposes
* of any outstanding result records and rolls back the current transaction (if any).
*/
<A> void acknowledgeFailure( A attachment, Callback<Void,A> callback );
<A> void reset( A attachment, Callback<Void,A> callback );

@Override
void close();
Expand Down

0 comments on commit 298d531

Please sign in to comment.