Skip to content

Commit

Permalink
[WFCORE-3597] Allow reload handler to signal when to send the early r…
Browse files Browse the repository at this point in the history
…esponse to the client rather than always sending it when the op is prepared
  • Loading branch information
bstansberry committed Feb 23, 2018
1 parent 3485411 commit ef50575
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ public void commit() {
public void rollback() {
ref.set(ResultAction.ROLLBACK);
}
}, primaryResponse);
}, primaryResponse, this);
}
resultAction = ref.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,22 @@ public interface ModelController {
*/
interface OperationTransactionControl {

/**
* Notify that an operation is complete and may be committed or rolled back.
*
* <p><strong>It is the responsibility of the user of this {@code OperationTransactionControl} to ensure that
* {@link OperationTransaction#commit()} or {@link OperationTransaction#rollback()} is eventually called on
* the provided {@code transaction}.
* </strong></p>
*
* @param transaction the transaction to control the fate of the operation. Cannot be {@code null}
* @param result the result. Cannot be {@code null}
* @param context the {@code OperationContext} for the operation that is prepared
*/
default void operationPrepared(OperationTransaction transaction, ModelNode result, OperationContext context) {
operationPrepared(transaction, result);
}

/**
* Notify that an operation is complete and may be committed or rolled back.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ protected OperationResponse internalExecute(final ModelNode operation, final Ope
// Report the correct operation response, otherwise the preparedResult would only contain
// the result of the last active step in a composite operation
final OperationTransactionControl originalResultTxControl = control == null ? null : new OperationTransactionControl() {
@Override
public void operationPrepared(OperationTransaction transaction, ModelNode result, OperationContext context) {
control.operationPrepared(transaction, responseNode, context);
}

@Override
public void operationPrepared(OperationTransaction transaction, ModelNode result) {
control.operationPrepared(transaction, responseNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ public void close() throws IOException {

@Override
public void operationPrepared(ModelController.OperationTransaction transaction, ModelNode response) {
proxyControl.operationPrepared(transaction, transformResponse(response));
}

@Override
public void operationPrepared(ModelController.OperationTransaction transaction, ModelNode response,
OperationContext context1) {
proxyControl.operationPrepared(transaction, transformResponse(response), context1);
}

private ModelNode transformResponse(ModelNode response) {
final ModelNode transformed;
// Check if we have to reject the operation
if(result.rejectOperation(response)) {
Expand All @@ -164,7 +174,7 @@ public void operationPrepared(ModelController.OperationTransaction transaction,
} else {
transformed = response;
}
proxyControl.operationPrepared(transaction, transformed);
return transformed;
}
};
proxyController.execute(transformedOperation, messageHandler, transformingProxyControl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@

package org.jboss.as.controller.operations.common;

import static org.jboss.as.controller.AbstractControllerService.EXECUTOR_CAPABILITY;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.ControlledProcessState;
import org.jboss.as.controller.OperationContext;
Expand All @@ -35,7 +30,7 @@
import org.jboss.as.controller.RunningModeControl;
import org.jboss.as.controller.SimpleAttributeDefinitionBuilder;
import org.jboss.as.controller.descriptions.ModelDescriptionConstants;
import org.jboss.as.controller.logging.ControllerLogger;
import org.jboss.as.controller.remote.EarlyResponseSendListener;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;
import org.jboss.msc.service.AbstractServiceListener;
Expand Down Expand Up @@ -78,37 +73,49 @@ public void execute(OperationContext context, ModelNode operation) throws Operat
public void execute(final OperationContext context, final ModelNode operation) throws OperationFailedException {
final ReloadContext<T> reloadContext = initializeReloadContext(context, operation);
final ServiceController<?> service = context.getServiceRegistry(true).getRequiredService(rootService);
final ExecutorService executor = (ExecutorService) context.getServiceRegistry(false).getRequiredService(EXECUTOR_CAPABILITY.getCapabilityServiceName()).getValue();
context.completeStep(new OperationContext.ResultHandler() {
@Override
public void handleResult(OperationContext.ResultAction resultAction, OperationContext context, ModelNode operation) {
if(resultAction == OperationContext.ResultAction.KEEP) {
service.addListener(new AbstractServiceListener<Object>() {
@Override
public void listenerAdded(final ServiceController<?> controller) {
Future<?> stopping = executor.submit(() -> {
final EarlyResponseSendListener sendListener = context.getAttachment(EarlyResponseSendListener.ATTACHMENT_KEY);
try {
if (resultAction == OperationContext.ResultAction.KEEP) {
service.addListener(new AbstractServiceListener<Object>() {
@Override
public void listenerAdded(final ServiceController<?> controller) {
reloadContext.reloadInitiated(runningModeControl);
processState.setStopping();
controller.setMode(ServiceController.Mode.NEVER);
});
try {
stopping.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
ControllerLogger.ROOT_LOGGER.errorStoppingServer(ex);
try {
// If we were interrupted during setStopping (i.e. while calling process state listeners)
// we want to clear that so we don't disrupt the reload of MSC services.
// Once we set STOPPING state we proceed. And we don't want this thread
// to have interrupted status as that will just mess up checking for
// container stability
Thread.interrupted();
// Now that we're in STOPPING state we can send the response to the caller
if (sendListener != null) {
sendListener.sendEarlyResponse(resultAction);
}
} finally {
// If we set the process state to STOPPING, we must stop
controller.setMode(ServiceController.Mode.NEVER);
}
}
}

@Override
public void transition(final ServiceController<? extends Object> controller, final ServiceController.Transition transition) {
if (transition == ServiceController.Transition.STOPPING_to_DOWN) {
controller.removeListener(this);
reloadContext.doReload(runningModeControl);
controller.setMode(ServiceController.Mode.ACTIVE);
@Override
public void transition(final ServiceController<? extends Object> controller, final ServiceController.Transition transition) {
if (transition == ServiceController.Transition.STOPPING_to_DOWN) {
controller.removeListener(this);
reloadContext.doReload(runningModeControl);
controller.setMode(ServiceController.Mode.ACTIVE);
}
}
}
});
});
}
} finally {
if (sendListener != null) {
// even if we called this in the try block, it's ok to call it again.
sendListener.sendEarlyResponse(resultAction);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2018 Red Hat, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package org.jboss.as.controller.remote;

import org.jboss.as.controller.OperationContext;

/**
* Callback that can be provided to operation step handlers for operations like 'reload' and 'shutdown'
* where the response needs to be sent to the caller before the operation completes. The handler will
* invoke the callback when the operation has reached the point where it is safe to send the response.
* <p>
* No callback should be attached before the operation is committed.
*
* @author Brian Stansberry
*/
public interface EarlyResponseSendListener {
/**
* Key under which a listener would be
* {@link OperationContext#attach(OperationContext.AttachmentKey, Object) attached to an operation context}
* if notification that it's safe
*/
OperationContext.AttachmentKey<EarlyResponseSendListener> ATTACHMENT_KEY = OperationContext.AttachmentKey.create(EarlyResponseSendListener.class);

/**
* Informs the management kernel that it is ok to send an early response to the operation.
* <strong>Note:</strong> It is valid for this method to be invoked multiple times for the same
* listener. It is the responsibility of the listener implementation to ensure that only one
* response is sent to the caller.
*
* @param resultAction the result of the operation for which an early response is being sent
*/
void sendEarlyResponse(OperationContext.ResultAction resultAction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import java.net.InetSocketAddress;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -58,6 +58,7 @@

import org.jboss.as.controller.AccessAuditContext;
import org.jboss.as.controller.ModelController;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.PathAddress;
import org.jboss.as.controller.client.OperationResponse;
import org.jboss.as.controller.client.impl.ModelControllerProtocol;
Expand Down Expand Up @@ -200,9 +201,31 @@ private void doExecute(final ModelNode operation, final int attachmentsLength,
// Send the prepared response for :reload operations
final boolean sendPreparedOperation = sendPreparedResponse(operation);
final ModelController.OperationTransactionControl transactionControl = sendPreparedOperation ? new ModelController.OperationTransactionControl() {

@Override
public void operationPrepared(ModelController.OperationTransaction transaction, ModelNode preparedResult) {
// Shouldn't be called but if it is, send the result immediately
operationPrepared(transaction, preparedResult, null);
}

@Override
public void operationPrepared(ModelController.OperationTransaction transaction, final ModelNode preparedResult, OperationContext context) {
transaction.commit();
if (context == null || !RELOAD.equals(operation.get(OP).asString())) { // TODO deal with shutdown as well,
// the handlers for which have some
// subtleties that need thought
sendResponse(preparedResult);
} else {
context.attach(EarlyResponseSendListener.ATTACHMENT_KEY, new EarlyResponseSendListener() {
@Override
public void sendEarlyResponse(OperationContext.ResultAction resultAction) {
sendResponse(preparedResult);
}
});
}
}

private void sendResponse(ModelNode preparedResult) {
// Fix prepared result
preparedResult.get(OUTCOME).set(SUCCESS);
preparedResult.get(RESULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OPERATION_HEADERS;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP_ADDR;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OUTCOME;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.RESULT;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.SUCCESS;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.USER;
import static org.jboss.as.domain.http.server.DomainUtil.writeResponse;
import static org.jboss.as.domain.http.server.logging.HttpServerLogger.ROOT_LOGGER;
Expand All @@ -58,6 +56,7 @@
import org.jboss.as.controller.client.Operation;
import org.jboss.as.controller.client.OperationBuilder;
import org.jboss.as.controller.client.OperationMessageHandler;
import org.jboss.as.controller.client.OperationResponse;
import org.jboss.as.core.security.AccessMechanism;
import org.jboss.dmr.ModelNode;
import org.xnio.IoUtils;
Expand Down Expand Up @@ -135,6 +134,15 @@ public void handleRequest(final HttpServerExchange exchange) throws Exception {
return;
}

// Set up the correct headers
ModelNode opheaders = operation.get(OPERATION_HEADERS);
opheaders.get(ACCESS_MECHANISM).set(AccessMechanism.HTTP.toString());
opheaders.get(CALLER_TYPE).set(USER);
// Don't allow a domain-uuid operation header from a user call
if (opheaders.hasDefined(DOMAIN_UUID)) {
opheaders.remove(DOMAIN_UUID);
}

// Process the input streams
final OperationBuilder builder = OperationBuilder.create(operation, true);
final Iterator<String> i = data.iterator();
Expand All @@ -155,7 +163,8 @@ public void handleRequest(final HttpServerExchange exchange) throws Exception {
final OperationParameter opParam = operationParameterBuilder.build();
final ResponseCallback callback = new ResponseCallback() {
@Override
void doSendResponse(final ModelNode response) {
void doSendResponse(final OperationResponse operationResponse) {
ModelNode response = operationResponse.getResponseNode();
if (response.hasDefined(OUTCOME) && FAILED.equals(response.get(OUTCOME).asString())) {
Common.sendError(exchange, opParam.isEncode(), response);
return;
Expand All @@ -165,28 +174,14 @@ void doSendResponse(final ModelNode response) {
};

final boolean sendPreparedResponse = sendPreparedResponse(operation);
final ModelController.OperationTransactionControl control = sendPreparedResponse ? new ModelController.OperationTransactionControl() {
@Override
public void operationPrepared(final ModelController.OperationTransaction transaction, final ModelNode result) {
transaction.commit();
// Fix prepared result
result.get(OUTCOME).set(SUCCESS);
result.get(RESULT);
callback.sendResponse(result);
}
} : ModelController.OperationTransactionControl.COMMIT;
final ModelController.OperationTransactionControl control = sendPreparedResponse
? new EarlyResponseTransactionControl(callback, operation)
: ModelController.OperationTransactionControl.COMMIT;

ModelNode response;
OperationResponse response;
final Operation builtOp = builder.build();
try {
ModelNode opheaders = operation.get(OPERATION_HEADERS);
opheaders.get(ACCESS_MECHANISM).set(AccessMechanism.HTTP.toString());
opheaders.get(CALLER_TYPE).set(USER);
// Don't allow a domain-uuid operation header from a user call
if (opheaders.hasDefined(DOMAIN_UUID)) {
opheaders.remove(DOMAIN_UUID);
}
response = modelController.execute(operation, OperationMessageHandler.DISCARD, control, builtOp);
response = modelController.execute(builtOp, OperationMessageHandler.DISCARD, control);
} catch (Throwable t) {
ROOT_LOGGER.modelRequestError(t);
Common.sendError(exchange, opParam.isEncode(), t.getLocalizedMessage());
Expand Down Expand Up @@ -255,21 +250,4 @@ private boolean sendPreparedResponse(final ModelNode operation) {
return false;
}

/**
* Callback to prevent the response will be sent multiple times.
*/
private abstract static class ResponseCallback {
private boolean complete;

synchronized void sendResponse(final ModelNode response) {
if (complete) {
return;
}
complete = true;
doSendResponse(response);
}

abstract void doSendResponse(ModelNode response);
}

}

0 comments on commit ef50575

Please sign in to comment.