Skip to content

Commit

Permalink
Don't fail if multiple cancel messages come in for the same op, but a…
Browse files Browse the repository at this point in the history
…lso avoid sending them
  • Loading branch information
bstansberry committed Oct 21, 2015
1 parent 86f4599 commit 21a0d68
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ public <T extends Operation> AsyncFuture<OperationResponse> execute(Transactiona
final Subject subject = SecurityActions.getSubject();
final ExecuteRequestContext context = new ExecuteRequestContext(new OperationWrapper<T>(listener, operation), subject, tempDir);
final ActiveOperation<OperationResponse, ExecuteRequestContext> op = channelAssociation.initializeOperation(context, context);
final AtomicBoolean cancelSent = new AtomicBoolean();
final AsyncFuture<OperationResponse> result = new AbstractDelegatingAsyncFuture<OperationResponse>(op.getResult()) {
@Override
public void asyncCancel(boolean interruptionDesired) {
try {
// Execute
channelAssociation.executeRequest(op, new CompleteTxRequest(ModelControllerProtocol.PARAM_ROLLBACK, channelAssociation));
} catch (IOException e) {
throw new RuntimeException(e);
public synchronized void asyncCancel(boolean interruptionDesired) {
if (!cancelSent.get()) {
try {
// Execute
channelAssociation.executeRequest(op, new CompleteTxRequest(ModelControllerProtocol.PARAM_ROLLBACK, channelAssociation));
cancelSent.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,47 @@ public void operationPrepared(final ModelController.OperationTransaction transac
*/
private static class ExecuteRequestContext implements ActiveOperation.CompletedCallback<Void> {

/** The operation being executed */
private ActiveOperation<Void, ExecuteRequestContext> operation;
/**
* Whether the prepare method has been invoked. Once the initial response to the remote side goes out,
* then {@code responseChannel} will be set to null
*/
private boolean prepared;
/**
* True if we get a compleTx call before the prepare method is invoked.
* This would mean the op was cancelled on the remote side.
*/
private boolean rollbackOnPrepare;
/**
* The tx provided to prepare.
*/
private ModelController.OperationTransaction activeTx;
private ActiveOperation<Void, ExecuteRequestContext> operation;
/**
* Not null if we owe the remote side a response; null if the currently expected response has gone out.
* Use this to track whether a response has been sent to the remote side.
*/
private ManagementRequestContext<ExecuteRequestContext> responseChannel;
/**
* The thread that calls the prepare method will block on this so it eventually must be tripped once prepare is called
*/
private final CountDownLatch txCompletedLatch = new CountDownLatch(1);
/**
* Set to true once the prepare method has been invoked and a completeTx call is received.
* Used to detect a second completeTx call coming in to cancel the op the final response goes out.
*/
private boolean txCompleted;
private PrivilegedAction<Void> action;
/**
* A final response to the op that was passed to complete after the op had sent a prepare message
* to the remote side but before any completeTx message was received. This indicates a failure or
* local cancellation while waiting for completeTx. The final response is cached in this field
* so it can be sent to the client once the completeTx call comes in. Until that happens there
* is no responseChannel available to send the response, as the prepare message has completed the
* request/response pair from the initial request
*/
private OperationResponse postPrepareRaceResponse;

/** Support object for managing any streams associated with the response */
final ResponseAttachmentInputStreamSupport streamSupport;

ExecuteRequestContext(final ResponseAttachmentInputStreamSupport streamSupport) {
Expand All @@ -347,14 +378,6 @@ ActiveOperation.ResultHandler<Void> getResultHandler() {
return operation.getResultHandler();
}

public PrivilegedAction<Void> getAction() {
return action;
}

public void setAction(PrivilegedAction<Void> action) {
this.action = action;
}

@Override
public void completed(Void result) {
//
Expand All @@ -364,9 +387,13 @@ public void completed(Void result) {
public synchronized void failed(Exception e) {
if(prepared) {
final ModelController.OperationTransaction transaction = activeTx;
activeTx = null;
if(transaction != null) {
transaction.rollback();
txCompletedLatch.countDown();
try {
transaction.rollback();
} finally {
txCompletedLatch.countDown();
}
}
} else if (responseChannel != null) {
rollbackOnPrepare = true;
Expand All @@ -378,6 +405,7 @@ public synchronized void failed(Exception e) {
ControllerLogger.MGMT_OP_LOGGER.tracef("sending pre-prepare failed response for %d --- interrupted: %s", getOperationId(), (Object) Thread.currentThread().isInterrupted());
try {
sendResponse(responseChannel, ModelControllerProtocol.PARAM_OPERATION_FAILED, response);
responseChannel = null;
} catch (IOException ignored) {
ControllerLogger.MGMT_OP_LOGGER.debugf(ignored, "failed to process message");
}
Expand Down Expand Up @@ -405,10 +433,8 @@ synchronized void initialize(final ManagementRequestContext<ExecuteRequestContex
/** Signal from ProxyOperationTransactionControl that the operation is prepared */
synchronized void prepare(final ModelController.OperationTransaction tx, final ModelNode result) {
assert !prepared;
prepared = true;
if(rollbackOnPrepare) {
prepared = true; // we count as prepared no matter what. Prepare was invoked
// and the other side has told us it's not expecting a prepare message,
// so the 'prepare' communication work this object tracks is done
try {
tx.rollback();
ControllerLogger.MGMT_OP_LOGGER.tracef("rolled back on prepare for %d --- interrupted: %s", getOperationId(), (Object) Thread.currentThread().isInterrupted());
Expand All @@ -422,14 +448,13 @@ synchronized void prepare(final ModelController.OperationTransaction tx, final M
} else {
assert activeTx == null;
assert responseChannel != null;
activeTx = tx;
ControllerLogger.MGMT_OP_LOGGER.tracef("sending prepared response for %d --- interrupted: %s", getOperationId(), (Object) Thread.currentThread().isInterrupted());
try {
sendResponse(responseChannel, ModelControllerProtocol.PARAM_OPERATION_PREPARED, result);
responseChannel = null; // we've now sent a response to the original request, so we can't use this one further
activeTx = tx;
prepared = true; // Here we only mark the op prepared once the message the other side expects has gone out
} catch (IOException e) {
getResultHandler().failed(e);
getResultHandler().failed(e); // this will eventually call back into failed(e) above and roll back the tx
}
}
}
Expand All @@ -451,17 +476,23 @@ synchronized void completeTx(final ManagementRequestContext<ExecuteRequestContex
ControllerLogger.MGMT_OP_LOGGER.tracef("completeTx (post-commit cancel) for %d", getOperationId());
cancel(context);
} else if (postPrepareRaceResponse == null) {
assert activeTx != null;
assert responseChannel == null;
responseChannel = context;
ControllerLogger.MGMT_OP_LOGGER.tracef("completeTx (%s) for %d", commit, getOperationId());
if (commit) {
activeTx.commit();
} else {
activeTx.rollback();
}
txCompleted = true;
txCompletedLatch.countDown();
if (activeTx != null) {
try {
assert responseChannel == null;
responseChannel = context;
ControllerLogger.MGMT_OP_LOGGER.tracef("completeTx (%s) for %d", commit, getOperationId());
if (commit) {
activeTx.commit();
} else {
activeTx.rollback();
}
} finally {
txCompletedLatch.countDown();
}
} // else when the prepare call came in rollbackOnPrepare was true. That means this was
// a 2nd cancellation request. We already cancelled in the if (!prepared) block above
// when the first request came in and doing it again will do nothing, so ignore this.
} else {
assert responseChannel == null;
responseChannel = context;
Expand Down

0 comments on commit 21a0d68

Please sign in to comment.