Skip to content

Commit

Permalink
Removed ACK_FAILURE
Browse files Browse the repository at this point in the history
  • Loading branch information
technige committed Jan 7, 2016
1 parent 7838f1a commit 27add8d
Show file tree
Hide file tree
Showing 17 changed files with 60 additions and 236 deletions.
7 changes: 3 additions & 4 deletions community/bolt/src/docs/dev/examples.asciidoc
Expand Up @@ -122,8 +122,7 @@ Server: SUCCESS { "type": "r" }


=== Error handling === Error handling


This illustrates how the server behaves when a request fails, and the server ignores incoming messages until an This illustrates how the server behaves when a request fails, and shows how the server ignores incoming messages until a `RESET` message is received.
`ACK_FAILURE` message is received.


.Error handling .Error handling
[source,bolt_exchange] [source,bolt_exchange]
Expand Down Expand Up @@ -180,9 +179,9 @@ Server: IGNORED
# Until the error is acknowledged # Until the error is acknowledged
Client: ACK_FAILURE Client: RESET
00 02 b0 0f 00 00 00 02 b0 02 00 00
Server: SUCCESS {} Server: SUCCESS {}
Expand Down
45 changes: 11 additions & 34 deletions community/bolt/src/docs/dev/messaging.asciidoc
Expand Up @@ -46,10 +46,11 @@ Because the protocol leverages pipelining, the client and the server need to agr
occurs, otherwise messages that were sent assuming no failure would occur might have unintended effects. occurs, otherwise messages that were sent assuming no failure would occur might have unintended effects.


When requests fail on the server, the server will send the client a `FAILURE` message. When requests fail on the server, the server will send the client a `FAILURE` message.
The client must acknowledge the `FAILURE` message by sending an `ACK_FAILURE` message to the server. The client must acknowledge the `FAILURE` message by sending an `RESET` message to the server.
Until the server receives the `ACK_FAILURE` message, it will send an `IGNORED` message in response to any other message from by the client, including messages that were sent in a pipeline. Until the server receives the `RESET` message, it will send an `IGNORED` message in response to any other message from by the client, including messages that were sent in a pipeline.
The `RESET` clears the pending failure state, disposes of any outstanding records and rolls back the current transaction (if any).


The diagram below illustrates a typical flow involving `ACK_FAILURE` messages: The diagram below illustrates a typical flow involving `RESET` messages:


image:failure-ack.png[] image:failure-ack.png[]


Expand Down Expand Up @@ -200,38 +201,14 @@ Value: PULL_ALL
B0 3F B0 3F
---- ----


==== ACK_FAILURE

The `ACK_FAILURE` message is a client message used to signal that a client has acknowledged a previous `FAILURE`
. It has the following structure:

[source,bolt_message_struct]
----
AcknowledgeFailureMessage (signature=0x0F) {
}
----

On receipt of an `ACK_FAILURE` message, the server will clear any pending failure state and respond with a single `SUCCESS` message.
If no such failure state is pending, a FAILURE message will be sent instead.

An `ACK_FAILURE` will never be ignored by the server.

.Response
- `SUCCESS {}` if a failure has been successfully acknowledged
- `FAILURE {"code": ..., "message": ...}` if there is no outstanding failure that requires acknowledgement

.Example
[source,bolt_packstream_type]
----
Value: ACK_FAILURE
B0 0F
----

==== RESET ==== RESET


The `RESET` message is a client message used to return the current session to a "clean" state by discarding any outstanding records and rolling back the current transaction, if one exists. The `RESET` message is a client message used to return the current session to a "clean" state.
`RESET` will also override any outstanding `FAILURE` and can therefore be used in place of an `ACK_FAILURE` (albeit with extra power). The following actions are performed by `RESET`:

- clear any outstanding `FAILURE` state
- dispose of any outstanding result records
- rollback the current transaction (if any)


[source,bolt_message_struct] [source,bolt_message_struct]
---- ----
Expand Down Expand Up @@ -339,7 +316,7 @@ IgnoredMessage (signature=0x7E) {
} }
---- ----


A client message will be ignored if an earlier failure has not yet been acknowledged by the client via an `ACK_FAILURE` message. A client message will be ignored if an earlier failure has not yet been acknowledged by the client via a `RESET` message.
For example, this will occur if the client optimistically sends a group of messages, one of which fails during execution: all subsequent messages in that group will then be ignored. For example, this will occur if the client optimistically sends a group of messages, one of which fails during execution: all subsequent messages in that group will then be ignored.
Note that the original `PULL_ALL` message was never processed by the server. Note that the original `PULL_ALL` message was never processed by the server.


Expand Down
Expand Up @@ -32,8 +32,6 @@ public interface MessageHandler<E extends Exception>


void handleDiscardAllMessage() throws E; void handleDiscardAllMessage() throws E;


void handleAckFailureMessage() throws E;

void handleRecordMessage( Record item ) throws E; void handleRecordMessage( Record item ) throws E;


void handleSuccessMessage( Map<String,Object> metadata ) throws E; void handleSuccessMessage( Map<String,Object> metadata ) throws E;
Expand Down Expand Up @@ -66,12 +64,6 @@ public void handleDiscardAllMessage() throws E


} }


@Override
public void handleAckFailureMessage() throws E
{

}

@Override @Override
public void handleRecordMessage( Record item ) throws E public void handleRecordMessage( Record item ) throws E
{ {
Expand Down
Expand Up @@ -129,13 +129,6 @@ public void handleDiscardAllMessage()
onMessageComplete.onMessageComplete(); onMessageComplete.onMessageComplete();
} }


@Override
public void handleAckFailureMessage() throws IOException
{
packer.packStructHeader( 0, MessageTypes.MSG_ACK_FAILURE );
onMessageComplete.onMessageComplete();
}

@Override @Override
public void handleRecordMessage( Record item ) public void handleRecordMessage( Record item )
throws IOException throws IOException
Expand Down Expand Up @@ -253,9 +246,6 @@ public <E extends Exception> void read( MessageHandler<E> output ) throws IOExce
case MessageTypes.MSG_FAILURE: case MessageTypes.MSG_FAILURE:
unpackFailureMessage( output ); unpackFailureMessage( output );
break; break;
case MessageTypes.MSG_ACK_FAILURE:
unpackAckFailureMessage( output );
break;
case MessageTypes.MSG_IGNORED: case MessageTypes.MSG_IGNORED:
unpackIgnoredMessage( output ); unpackIgnoredMessage( output );
break; break;
Expand Down Expand Up @@ -284,12 +274,6 @@ public <E extends Exception> void read( MessageHandler<E> output ) throws IOExce
} }
} }


private <E extends Exception> void unpackAckFailureMessage( MessageHandler<E> output )
throws E
{
output.handleAckFailureMessage();
}

private <E extends Exception> void unpackSuccessMessage( MessageHandler<E> output ) private <E extends Exception> void unpackSuccessMessage( MessageHandler<E> output )
throws E, IOException throws E, IOException
{ {
Expand Down

This file was deleted.

Expand Up @@ -28,7 +28,6 @@ public class Messages
{ {
private static final PullAllMessage PULL_ALL = new PullAllMessage(); private static final PullAllMessage PULL_ALL = new PullAllMessage();
private static final DiscardAllMessage DISCARD_ALL = new DiscardAllMessage(); private static final DiscardAllMessage DISCARD_ALL = new DiscardAllMessage();
private static final AcknowledgeFailureMessage ACK_FAILURE = new AcknowledgeFailureMessage();
private static final SuccessMessage SUCCESS = new SuccessMessage( Collections.EMPTY_MAP ); private static final SuccessMessage SUCCESS = new SuccessMessage( Collections.EMPTY_MAP );


public static Message reset() public static Message reset()
Expand Down Expand Up @@ -61,11 +60,6 @@ public static Message discardAll()
return DISCARD_ALL; return DISCARD_ALL;
} }


public static Message ackFailure()
{
return ACK_FAILURE;
}

public static Message record( Record value ) public static Message record( Record value )
{ {
return new RecordMessage( value ); return new RecordMessage( value );
Expand Down
Expand Up @@ -84,12 +84,6 @@ public void handleDiscardAllMessage()
session.discardAll( null, simpleCallback ); session.discardAll( null, simpleCallback );
} }


@Override
public void handleAckFailureMessage() throws RuntimeException
{
session.acknowledgeFailure( null, simpleCallback );
}

@Override @Override
public void handleResetMessage() throws RuntimeException public void handleResetMessage() throws RuntimeException
{ {
Expand Down
Expand Up @@ -137,19 +137,8 @@ public static <V, A> Callback<V,A> noop()
<A> void discardAll( A attachment, Callback<Void,A> callback ); <A> void discardAll( A attachment, Callback<Void,A> callback );


/** /**
* Whenever an error has occurred, all incoming requests will be ignored until the error is acknowledged through * Reset the session to an IDLE state. This clears any outstanding failure condition, disposes
* this method. The point of this is that we can do pipelining, sending multiple requests in one go and * of any outstanding result records and rolls back the current transaction (if any).
* optimistically assuming they will succeed. If any of them fail all subsequent requests are declined until the
* client has acknowledged it has seen the error and has taken it into account for upcoming requests.
* <p/>
* Whenever an error has been acknowledged, the session will revert back to its intial state. Any ongoing
* statements
* or transactions will have been rolled back and/or disposed of.
*/
<A> void acknowledgeFailure( A attachment, Callback<Void,A> callback );

/**
* Reset the session to an IDLE state.
*/ */
<A> void reset( A attachment, Callback<Void,A> callback ); <A> void reset( A attachment, Callback<Void,A> callback );


Expand Down
Expand Up @@ -76,12 +76,6 @@ public <A> void discardAll( A attachment, Callback<Void,A> callback )
reportError( attachment, callback ); reportError( attachment, callback );
} }


@Override
public <A> void acknowledgeFailure( A attachment, Callback<Void,A> callback )
{
reportError( attachment, callback );
}

@Override @Override
public <A> void reset( A attachment, Callback<Void,A> callback ) public <A> void reset( A attachment, Callback<Void,A> callback )
{ {
Expand Down
Expand Up @@ -236,16 +236,10 @@ public State reset( SessionStateMachine ctx )
/** An error has occurred, client must acknowledge it before anything else is allowed. */ /** An error has occurred, client must acknowledge it before anything else is allowed. */
ERROR ERROR
{ {
@Override
public State acknowledgeError( SessionStateMachine ctx )
{
return IDLE;
}

@Override @Override
public State reset( SessionStateMachine ctx ) public State reset( SessionStateMachine ctx )
{ {
return acknowledgeError( ctx ); return IDLE;
} }


@Override @Override
Expand All @@ -263,7 +257,7 @@ protected State onNoImplementation( SessionStateMachine ctx, String command )
RECOVERABLE_ERROR RECOVERABLE_ERROR
{ {
@Override @Override
public State acknowledgeError (SessionStateMachine ctx) public State reset (SessionStateMachine ctx)
{ {
return IN_TRANSACTION; return IN_TRANSACTION;
} }
Expand Down Expand Up @@ -336,11 +330,6 @@ public State beginTransaction( SessionStateMachine ctx )
return onNoImplementation( ctx, "beginning implicit transaction" ); return onNoImplementation( ctx, "beginning implicit transaction" );
} }


public State acknowledgeError( SessionStateMachine ctx )
{
return onNoImplementation( ctx, "acknowledging an error" );
}

public State reset( SessionStateMachine ctx ) public State reset( SessionStateMachine ctx )
{ {
return onNoImplementation( ctx, "resetting the current session" ); return onNoImplementation( ctx, "resetting the current session" );
Expand Down Expand Up @@ -521,17 +510,6 @@ public <A> void discardAll( A attachment, Callback<Void,A> callback )
finally { after(); } finally { after(); }
} }


@Override
public <A> void acknowledgeFailure( A attachment, Callback<Void,A> callback )
{
before( attachment, callback );
try
{
state = state.acknowledgeError( this );
}
finally { after(); }
}

@Override @Override
public <A> void reset( A attachment, Callback<Void, A> callback ) public <A> void reset( A attachment, Callback<Void, A> callback )
{ {
Expand Down
Expand Up @@ -71,12 +71,6 @@ public <A> void discardAll( final A attachment, final Callback<Void,A> callback
queue( session -> session.discardAll( attachment, callback ) ); queue( session -> session.discardAll( attachment, callback ) );
} }


@Override
public <A> void acknowledgeFailure( final A attachment, final Callback<Void,A> callback )
{
queue( session -> session.acknowledgeFailure( attachment, callback ) );
}

@Override @Override
public <A> void reset( final A attachment, final Callback<Void,A> callback ) public <A> void reset( final A attachment, final Callback<Void,A> callback )
{ {
Expand Down

0 comments on commit 27add8d

Please sign in to comment.