Permalink
Browse files

allow async cancellation

  • Loading branch information...
1 parent 8d22de9 commit 0ec9bb2d637dfd3b28304cfb8c37a83b32916611 @emuckenhuber emuckenhuber committed with bstansberry Apr 21, 2012
View
107 ...ient/src/main/java/org/jboss/as/controller/client/impl/AbstractDelegatingAsyncFuture.java
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2012, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.as.controller.client.impl;
+
+import org.jboss.threads.AsyncFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author Emanuel Muckenhuber
+ */
+public abstract class AbstractDelegatingAsyncFuture<T> implements AsyncFuture<T> {
+
+ private final AsyncFuture<T> delegate;
+ public AbstractDelegatingAsyncFuture(AsyncFuture<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Status await() throws InterruptedException {
+ return delegate.await();
+ }
+
+ @Override
+ public Status await(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.await(timeout, unit);
+ }
+
+ @Override
+ public T getUninterruptibly() throws CancellationException, ExecutionException {
+ return delegate.getUninterruptibly();
+ }
+
+ @Override
+ public T getUninterruptibly(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, TimeoutException {
+ return delegate.getUninterruptibly(timeout, unit);
+ }
+
+ @Override
+ public Status awaitUninterruptibly() {
+ return delegate.awaitUninterruptibly();
+ }
+
+ @Override
+ public Status awaitUninterruptibly(long timeout, TimeUnit unit) {
+ return delegate.awaitUninterruptibly(timeout, unit);
+ }
+
+ @Override
+ public Status getStatus() {
+ return delegate.getStatus();
+ }
+
+ public <A> void addListener(Listener<? super T, A> aListener, A attachment) {
+ delegate.addListener(aListener, attachment);
+ }
+
+ @Override
+ public boolean cancel(boolean interruptionDesired) {
+ // allow custom cancellation policies
+ asyncCancel(interruptionDesired);
+ return awaitUninterruptibly() == Status.CANCELLED;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return delegate.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return delegate.isDone();
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return delegate.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.get(timeout, unit);
+ }
+}
View
64 ...ient/src/main/java/org/jboss/as/controller/client/impl/AbstractModelControllerClient.java
@@ -314,70 +314,14 @@ static OperationExecutionContext create(final Operation operation, final Operati
* Wraps the request execution AsyncFuture in an AsyncFuture impl that handles cancellation by sending a cancellation
* request to the remote side.
*/
- private class DelegatingCancellableAsyncFuture implements AsyncFuture<ModelNode>{
- private final int batchId;
- private final AsyncFuture<ModelNode> delegate;
+ private class DelegatingCancellableAsyncFuture extends AbstractDelegatingAsyncFuture<ModelNode> {
- public DelegatingCancellableAsyncFuture(AsyncFuture<ModelNode> delegate, int batchId) {
- this.delegate = delegate;
+ private final int batchId;
+ private DelegatingCancellableAsyncFuture(final AsyncFuture<ModelNode> delegate, final int batchId) {
+ super(delegate);
this.batchId = batchId;
}
- public org.jboss.threads.AsyncFuture.Status await() throws InterruptedException {
- return delegate.await();
- }
-
- public org.jboss.threads.AsyncFuture.Status await(long timeout, TimeUnit unit) throws InterruptedException {
- return delegate.await(timeout, unit);
- }
-
- public ModelNode getUninterruptibly() throws CancellationException, ExecutionException {
- return delegate.getUninterruptibly();
- }
-
- public ModelNode getUninterruptibly(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, TimeoutException {
- return delegate.getUninterruptibly(timeout, unit);
- }
-
- public org.jboss.threads.AsyncFuture.Status awaitUninterruptibly() {
- return delegate.awaitUninterruptibly();
- }
-
- public org.jboss.threads.AsyncFuture.Status awaitUninterruptibly(long timeout, TimeUnit unit) {
- return delegate.awaitUninterruptibly(timeout, unit);
- }
-
- public boolean isDone() {
- return delegate.isDone();
- }
-
- public org.jboss.threads.AsyncFuture.Status getStatus() {
- return delegate.getStatus();
- }
-
- public <A> void addListener(org.jboss.threads.AsyncFuture.Listener<? super ModelNode, A> listener, A attachment) {
- delegate.addListener(listener, attachment);
- }
-
- public ModelNode get() throws InterruptedException, ExecutionException {
- return delegate.get();
- }
-
- public ModelNode get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return delegate.get(timeout, unit);
- }
-
- @Override
- public boolean isCancelled() {
- return delegate.getStatus() == Status.CANCELLED;
- }
-
- @Override
- public boolean cancel(boolean interruptionDesired) {
- asyncCancel(interruptionDesired);
- return awaitUninterruptibly() == Status.CANCELLED;
- }
-
@Override
public void asyncCancel(boolean interruptionDesired) {
try {
View
54 controller/src/main/java/org/jboss/as/controller/remote/BlockingQueueOperationListener.java
@@ -24,12 +24,16 @@
import org.jboss.as.controller.descriptions.ModelDescriptionConstants;
import org.jboss.dmr.ModelNode;
+import org.jboss.threads.AsyncFuture;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Basic operation listener backed by a blocking queue. If the limit of the queue is reached prepared operations
@@ -151,8 +155,9 @@ public boolean isFailed() {
}
@Override
- public Future<ModelNode> getFinalResult() {
- return new Future<ModelNode>() {
+ public AsyncFuture<ModelNode> getFinalResult() {
+ return new AsyncFuture<ModelNode>() {
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
@@ -177,6 +182,51 @@ public ModelNode get() {
public ModelNode get(long timeout, TimeUnit unit) {
return finalResult;
}
+
+ @Override
+ public Status await() throws InterruptedException {
+ return Status.COMPLETE;
+ }
+
+ @Override
+ public Status await(long timeout, TimeUnit unit) throws InterruptedException {
+ return Status.COMPLETE;
+ }
+
+ @Override
+ public ModelNode getUninterruptibly() throws CancellationException, ExecutionException {
+ return finalResult;
+ }
+
+ @Override
+ public ModelNode getUninterruptibly(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, TimeoutException {
+ return finalResult;
+ }
+
+ @Override
+ public Status awaitUninterruptibly() {
+ return Status.COMPLETE;
+ }
+
+ @Override
+ public Status awaitUninterruptibly(long timeout, TimeUnit unit) {
+ return Status.COMPLETE;
+ }
+
+ @Override
+ public Status getStatus() {
+ return Status.COMPLETE;
+ }
+
+ @Override
+ public <A> void addListener(Listener<? super ModelNode, A> aListener, A attachment) {
+ aListener.handleComplete(this, attachment);
+ }
+
+ @Override
+ public void asyncCancel(boolean interruptionDesired) {
+ //
+ }
};
}
View
24 controller/src/main/java/org/jboss/as/controller/remote/RemoteProxyController.java
@@ -33,6 +33,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
/**
@@ -101,6 +102,7 @@ public PathAddress getProxyNodeAddress() {
@Override
public void execute(final ModelNode original, final OperationMessageHandler messageHandler, final ProxyOperationControl control, final OperationAttachments attachments) {
// Add blocking support to adhere to the proxy controller API contracts
+ final CountDownLatch completed = new CountDownLatch(1);
final BlockingQueue<TransactionalProtocolClient.PreparedOperation<TransactionalProtocolClient.Operation>> queue = new ArrayBlockingQueue<TransactionalProtocolClient.PreparedOperation<TransactionalProtocolClient.Operation>>(1, true);
final TransactionalProtocolClient.TransactionalOperationListener<TransactionalProtocolClient.Operation> operationListener = new TransactionalProtocolClient.TransactionalOperationListener<TransactionalProtocolClient.Operation>() {
@Override
@@ -112,12 +114,22 @@ public void operationPrepared(TransactionalProtocolClient.PreparedOperation<Tran
@Override
public void operationFailed(TransactionalProtocolClient.Operation operation, ModelNode result) {
- queue.offer(new BlockingQueueOperationListener.FailedOperation<TransactionalProtocolClient.Operation>(operation, result));
+ try {
+ queue.offer(new BlockingQueueOperationListener.FailedOperation<TransactionalProtocolClient.Operation>(operation, result));
+ } finally {
+ // This might not be needed?
+ completed.countDown();
+ }
}
@Override
public void operationComplete(TransactionalProtocolClient.Operation operation, ModelNode result) {
- control.operationCompleted(result);
+ try {
+ control.operationCompleted(result);
+ } finally {
+ // Make sure the handler is called before commit/rollback returns
+ completed.countDown();
+ }
}
};
Future<ModelNode> futureResult = null;
@@ -139,8 +151,8 @@ public void operationComplete(TransactionalProtocolClient.Operation operation, M
public void commit() {
prepared.commit();
try {
- // Await the result
- prepared.getFinalResult().get();
+ // Await the completed notification
+ completed.await();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
@@ -152,8 +164,8 @@ public void commit() {
public void rollback() {
prepared.rollback();
try {
- // Await the result
- prepared.getFinalResult().get();
+ // Await the completed notification
+ completed.await();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
View
111 controller/src/main/java/org/jboss/as/controller/remote/SimpleFuture.java
@@ -1,111 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2012, Red Hat, Inc., and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.as.controller.remote;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-class SimpleFuture<V> implements Future<V> {
-
- private V value;
- private volatile boolean done;
- private final Lock lock = new ReentrantLock();
- private final Condition hasValue = lock.newCondition();
-
- /**
- * Always returns <code>false</code>
- *
- * @return <code>false</code>
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public V get() throws InterruptedException, ExecutionException {
-
- lock.lock();
- try {
- while (!done) {
- hasValue.await();
- }
- return value;
- }
- finally {
- lock.unlock();
- }
- }
-
- @Override
- public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
- long deadline = unit.toMillis(timeout) + System.currentTimeMillis();
- lock.lock();
- try {
- while (!done) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- throw new TimeoutException();
- }
- hasValue.await(remaining, TimeUnit.MILLISECONDS);
- }
- return value;
- }
- finally {
- lock.unlock();
- }
- }
-
- /**
- * Always returns <code>false</code>
- *
- * @return <code>false</code>
- */
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return done;
- }
-
- void set(V value) {
- lock.lock();
- try {
- this.value = value;
- done = true;
- hasValue.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
-}
View
18 controller/src/main/java/org/jboss/as/controller/remote/TransactionalProtocolClient.java
@@ -26,6 +26,7 @@
import org.jboss.as.controller.client.OperationAttachments;
import org.jboss.as.controller.client.OperationMessageHandler;
import org.jboss.dmr.ModelNode;
+import org.jboss.threads.AsyncFuture;
import java.io.IOException;
import java.util.concurrent.Future;
@@ -48,7 +49,7 @@
* @return the future result
* @throws IOException
*/
- Future<ModelNode> execute(TransactionalOperationListener<Operation> listener, ModelNode operation, OperationMessageHandler messageHandler, OperationAttachments attachments) throws IOException;
+ AsyncFuture<ModelNode> execute(TransactionalOperationListener<Operation> listener, ModelNode operation, OperationMessageHandler messageHandler, OperationAttachments attachments) throws IOException;
/**
* Execute an operation. This returns a future for the final result, which will only available after the prepared
@@ -60,7 +61,7 @@
* @return the future result
* @throws IOException
*/
- <T extends Operation> Future<ModelNode> execute(TransactionalOperationListener<T> listener, T operation) throws IOException;
+ <T extends Operation> AsyncFuture<ModelNode> execute(TransactionalOperationListener<T> listener, T operation) throws IOException;
/**
* The transactional operation listener.
@@ -137,15 +138,16 @@
T getOperation();
/**
- * Get the prepared result
- * @return
+ * Get the prepared result.
+ *
+ * @return the prepared result
*/
ModelNode getPreparedResult();
/**
- * Check if prepare failed
+ * Check if prepare failed.
*
- * @return
+ * @return whether the operation failed
*/
boolean isFailed();
@@ -159,9 +161,9 @@
/**
* Get the final result.
*
- * @return
+ * @return the final result
*/
- Future<ModelNode> getFinalResult();
+ AsyncFuture<ModelNode> getFinalResult();
}
View
75 controller/src/main/java/org/jboss/as/controller/remote/TransactionalProtocolClientImpl.java
@@ -26,10 +26,12 @@
import org.jboss.as.controller.client.MessageSeverity;
import org.jboss.as.controller.client.OperationAttachments;
import org.jboss.as.controller.client.OperationMessageHandler;
+import org.jboss.as.controller.client.impl.AbstractDelegatingAsyncFuture;
import org.jboss.as.controller.client.impl.ModelControllerProtocol;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.CANCELLED;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.FAILED;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.FAILURE_DESCRIPTION;
+import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OUTCOME;
import org.jboss.as.protocol.StreamUtils;
import org.jboss.as.protocol.mgmt.AbstractManagementRequest;
@@ -44,6 +46,8 @@
import org.jboss.as.protocol.mgmt.ManagementResponseHeader;
import static org.jboss.as.protocol.mgmt.ProtocolUtils.expectHeader;
import org.jboss.dmr.ModelNode;
+import org.jboss.threads.AsyncFuture;
+import org.jboss.threads.AsyncFutureTask;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -52,6 +56,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -83,54 +88,29 @@ public TransactionalProtocolClientImpl(final ManagementChannelAssociation channe
}
@Override
- public Future<ModelNode> execute(TransactionalOperationListener<Operation> listener, ModelNode operation, OperationMessageHandler messageHandler, OperationAttachments attachments) throws IOException {
+ public AsyncFuture<ModelNode> execute(TransactionalOperationListener<Operation> listener, ModelNode operation, OperationMessageHandler messageHandler, OperationAttachments attachments) throws IOException {
final Operation wrapper = new TransactionalOperationImpl(operation, messageHandler, attachments);
return execute(listener, wrapper);
}
@Override
- public <T extends Operation> Future<ModelNode> execute(TransactionalOperationListener<T> listener, T operation) throws IOException {
+ public <T extends Operation> AsyncFuture<ModelNode> execute(TransactionalOperationListener<T> listener, T operation) throws IOException {
final ExecuteRequestContext context = new ExecuteRequestContext(new OperationWrapper<T>(listener, operation));
- final ActiveOperation<ModelNode, ExecuteRequestContext> op = channelAssociation.executeRequest(new ExecuteRequest(), context, context);
- final Future<ModelNode> result = context.getResult();
- // Propagate cancellation to the operation rather to the result
- return new Future<ModelNode>() {
+ final ActiveOperation<ModelNode, ExecuteRequestContext> op = channelAssociation.initializeOperation(context, context);
+ final AsyncFuture<ModelNode> result = new AbstractDelegatingAsyncFuture<ModelNode>(op.getResult()) {
@Override
- public boolean cancel(boolean mayInterruptIfRunning) {
+ public void asyncCancel(boolean interruptionDesired) {
try {
// Execute
channelAssociation.executeRequest(op, new CompleteTxRequest(ModelControllerProtocol.PARAM_ROLLBACK));
- // Block until done...
- op.getResult().await();
- } catch (InterruptedException e ){
- Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException(e);
}
- return isCancelled();
- }
-
- @Override
- public boolean isCancelled() {
- // TODO also check the outcome?
- return op.getResult().isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return result.isDone();
- }
-
- @Override
- public ModelNode get() throws InterruptedException, ExecutionException {
- return result.get();
- }
-
- @Override
- public ModelNode get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return result.get(timeout, unit);
}
};
+ context.initialize(result);
+ channelAssociation.executeRequest(op, new ExecuteRequest());
+ return result;
}
/**
@@ -318,6 +298,10 @@ protected ByteArrayOutputStream copyStream(final InputStream is) throws IOExcept
this.wrapper = operationWrapper;
}
+ void initialize(final AsyncFuture<ModelNode> result) {
+ wrapper.future = result;
+ }
+
OperationMessageHandler getMessageHandler() {
return wrapper.getMessageHandler();
}
@@ -338,10 +322,6 @@ OperationAttachments getAttachments() {
return attachments.getInputStreams();
}
- Future<ModelNode> getResult() {
- return wrapper.future;
- }
-
@Override
public synchronized void completed(final ModelNode result) {
if(completed.compareAndSet(false, true)) {
@@ -375,7 +355,7 @@ synchronized void operationPrepared(final ModelController.OperationTransaction t
private final T operation;
private final TransactionalOperationListener<T> listener;
- private SimpleFuture<ModelNode> future = new SimpleFuture<ModelNode>();
+ private AsyncFuture<ModelNode> future;
OperationWrapper(TransactionalOperationListener<T> listener, T operation) {
this.listener = listener;
@@ -400,19 +380,11 @@ void prepared(final ModelController.OperationTransaction transaction, final Mode
}
void completed(final ModelNode response) {
- try {
- listener.operationComplete(operation, response);
- } finally {
- future.set(response);
- }
+ listener.operationComplete(operation, response);
}
void failed(final ModelNode response) {
- try {
- listener.operationFailed(operation, response);
- } finally {
- future.set(response);
- }
+ listener.operationFailed(operation, response);
}
}
@@ -421,10 +393,11 @@ void failed(final ModelNode response) {
private final T operation;
private final ModelNode preparedResult;
- private final Future<ModelNode> finalResult;
+ private final AsyncFuture<ModelNode> finalResult;
private final ModelController.OperationTransaction transaction;
- protected PreparedOperationImpl(T operation, ModelNode preparedResult, Future<ModelNode> finalResult, ModelController.OperationTransaction transaction) {
+ protected PreparedOperationImpl(T operation, ModelNode preparedResult, AsyncFuture<ModelNode> finalResult, ModelController.OperationTransaction transaction) {
+ assert finalResult != null : "null result";
this.operation = operation;
this.preparedResult = preparedResult;
this.finalResult = finalResult;
@@ -452,7 +425,7 @@ public boolean isDone() {
}
@Override
- public Future<ModelNode> getFinalResult() {
+ public AsyncFuture<ModelNode> getFinalResult() {
return finalResult;
}
View
8 ...oller/src/test/java/org/jboss/as/controller/test/TransactionalProtocolClientTestCase.java
@@ -55,6 +55,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -158,11 +159,15 @@ public void testSimpleRequest() throws Exception {
public void testCancelBeforePrepared() throws Exception {
final BlockingOperationListener listener = new BlockingOperationListener();
+ final CountDownLatch latch = new CountDownLatch(1);
final TestOperationHandler handler = new TestOperationHandler() {
@Override
public void execute(ModelNode operation, OperationMessageHandler handler, OperationAttachments attachments) throws Exception {
try {
- wait();
+ synchronized (this) {
+ latch.countDown();
+ wait();
+ }
} catch (InterruptedException e) {
//
}
@@ -171,6 +176,7 @@ public void execute(ModelNode operation, OperationMessageHandler handler, Operat
//
final TestUpdateWrapper wrapper = createTestClient(0, handler);
final Future<ModelNode> futureResult = wrapper.execute(listener);
+ latch.await();
// Now the server side should for latch to countDown
futureResult.cancel(false);
//
View
9 protocol/src/main/java/org/jboss/as/protocol/mgmt/ActiveOperationSupport.java
@@ -289,10 +289,13 @@ public boolean done(T result) {
@Override
public boolean failed(Exception e) {
try {
- return ActiveOperationImpl.this.setFailed(e);
+ boolean failed = ActiveOperationImpl.this.setFailed(e);
+ if(failed) {
+ ProtocolLogger.ROOT_LOGGER.debugf(e, "active-op (%d) failed %s", operationId, attachment);
+ }
+ return failed;
} finally {
removeActiveOperation(operationId);
- ProtocolLogger.ROOT_LOGGER.debugf(e, "active-op (%d) failed %s", operationId, attachment);
}
}
@@ -378,7 +381,7 @@ public void asyncCancel(boolean interruptionDesired) {
@Override
public void addCancellable(final Cancellable cancellable) {
// Perhaps just use the IOFuture from XNIO...
- synchronized (lock) {
+ synchronized (this) {
switch (getStatus()) {
case CANCELLED:
break;
View
2 protocol/src/main/java/org/jboss/as/protocol/mgmt/ManagementChannelAssociation.java
@@ -83,6 +83,8 @@
*/
<T, A> AsyncFuture<T> executeRequest(final ActiveOperation<T, A> operation, final ManagementRequest<T, A> request) throws IOException;
+ <T, A> ActiveOperation<T, A> initializeOperation(A attachment, ActiveOperation.CompletedCallback<T> callback) throws IOException;
+
/**
* Get the underlying remoting channel associated with this context.
*
View
5 protocol/src/main/java/org/jboss/as/protocol/mgmt/ManagementChannelHandler.java
@@ -74,6 +74,11 @@ public Channel getChannel() throws IOException {
return strategy.getChannel();
}
+ @Override
+ public <T, A> ActiveOperation<T, A> initializeOperation(A attachment, ActiveOperation.CompletedCallback<T> callback) throws IOException {
+ return super.registerActiveOperation(attachment, callback);
+ }
+
/** {@inheritDoc} */
@Override
public <T, A> ActiveOperation<T, A> executeRequest(ManagementRequest<T, A> request, A attachment, ActiveOperation.CompletedCallback<T> callback) throws IOException {

0 comments on commit 0ec9bb2

Please sign in to comment.