Permalink
Browse files

[AS7-4527] rollout plan tasks cleanup

  • Loading branch information...
1 parent 69775a4 commit 01514c8215d87196e05466c35c3f38c71b196935 @emuckenhuber emuckenhuber committed with bstansberry Apr 24, 2012
Showing with 199 additions and 276 deletions.
  1. +7 −0 controller/src/main/java/org/jboss/as/controller/remote/BlockingQueueOperationListener.java
  2. +4 −2 controller/src/main/java/org/jboss/as/controller/remote/TransactionalProtocolClientImpl.java
  3. +10 −12 ...rc/main/java/org/jboss/as/domain/controller/operations/coordination/DomainRolloutStepHandler.java
  4. +7 −6 ...-controller/src/main/java/org/jboss/as/domain/controller/plan/AbstractServerGroupRolloutTask.java
  5. +5 −5 ...as/domain/controller/plan/{ConcurrentGroupUpdateTask.java → ConcurrentServerGroupUpdateTask.java}
  6. +5 −5 ...jboss/as/domain/controller/plan/{RollingGroupUpdateTask.java → RollingServerGroupUpdateTask.java}
  7. +6 −9 host-controller/src/main/java/org/jboss/as/domain/controller/plan/RolloutPlanController.java
  8. +2 −6 host-controller/src/main/java/org/jboss/as/domain/controller/plan/RunningServerUpdateTask.java
  9. +0 −23 host-controller/src/main/java/org/jboss/as/domain/controller/plan/ServerOperationExecutor.java
  10. +2 −5 host-controller/src/main/java/org/jboss/as/domain/controller/plan/ServerRestartTask.java
  11. +0 −154 host-controller/src/main/java/org/jboss/as/domain/controller/plan/ServerRolloutTaskHandler.java
  12. +0 −36 host-controller/src/main/java/org/jboss/as/domain/controller/plan/ServerTask.java
  13. +145 −8 host-controller/src/main/java/org/jboss/as/domain/controller/plan/ServerTaskExecutor.java
  14. +6 −5 .../java/org/jboss/as/domain/controller/plan/{AbstractServerUpdateTask.java → ServerUpdateTask.java}
@@ -26,6 +26,7 @@
import org.jboss.dmr.ModelNode;
import org.jboss.threads.AsyncFuture;
+import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
@@ -84,6 +85,12 @@ public void operationComplete(T operation, ModelNode result) {
return queue.take();
}
+ protected void drainTo(final Collection<TransactionalProtocolClient.PreparedOperation<T>> collection) {
+ if(queue.size() > 0) {
+ queue.drainTo(collection);
+ }
+ }
+
/**
* Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.
*
@@ -82,7 +82,7 @@ public TransactionalProtocolClientImpl(final ManagementChannelAssociation channe
if (operationType == ModelControllerProtocol.HANDLE_REPORT_REQUEST) {
return new HandleReportRequestHandler();
} else if (operationType == ModelControllerProtocol.GET_INPUTSTREAM_REQUEST) {
- return new ReadAttachmentInputStreamRequestHandler();
+ return ReadAttachmentInputStreamRequestHandler.INSTANCE;
}
return handlers.resolveNext();
}
@@ -243,7 +243,9 @@ public void handleRequest(final DataInput input, final ActiveOperation.ResultHan
* Handles reads on the inputstreams returned by {@link org.jboss.as.controller.client.OperationAttachments#getInputStreams()}
* done in the remote target controller
*/
- private class ReadAttachmentInputStreamRequestHandler implements ManagementRequestHandler<ModelNode, ExecuteRequestContext> {
+ private static class ReadAttachmentInputStreamRequestHandler implements ManagementRequestHandler<ModelNode, ExecuteRequestContext> {
+
+ static final ReadAttachmentInputStreamRequestHandler INSTANCE = new ReadAttachmentInputStreamRequestHandler();
@Override
public void handleRequest(final DataInput input, final ActiveOperation.ResultHandler<ModelNode> resultHandler,
@@ -64,7 +64,6 @@
import org.jboss.as.controller.ProxyController;
import org.jboss.as.domain.controller.ServerIdentity;
import org.jboss.as.domain.controller.plan.RolloutPlanController;
-import org.jboss.as.domain.controller.plan.ServerRolloutTaskHandler;
import org.jboss.as.domain.controller.plan.ServerTaskExecutor;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.Property;
@@ -136,19 +135,18 @@ public void execute(final OperationContext context, final ModelNode operation) t
// We no longer roll back by default
domainOperationContext.setCompleteRollback(false);
- final Map<ServerIdentity, ServerRolloutTaskHandler.ServerExecutedRequest> submittedTasks = new HashMap<ServerIdentity, ServerRolloutTaskHandler.ServerExecutedRequest>();
- final List<ServerRolloutTaskHandler.ServerPreparedResponse> preparedResults = new ArrayList<ServerRolloutTaskHandler.ServerPreparedResponse>();
- final ServerRolloutTaskHandler rolloutHandler = new ServerRolloutTaskHandler(submittedTasks, preparedResults);
+ final Map<ServerIdentity, ServerTaskExecutor.ExecutedServerRequest> submittedTasks = new HashMap<ServerIdentity, ServerTaskExecutor.ExecutedServerRequest>();
+ final List<ServerTaskExecutor.ServerPreparedResponse> preparedResults = new ArrayList<ServerTaskExecutor.ServerPreparedResponse>();
try {
- pushToServers(context, rolloutHandler);
+ pushToServers(context, submittedTasks, preparedResults);
context.completeStep();
} finally {
// Inform the remote hosts whether to commit or roll back their updates
// Do them all before reading results so the commits/rollbacks can be executed in parallel
boolean completeRollback = domainOperationContext.isCompleteRollback();
final String localHostName = domainOperationContext.getLocalHostInfo().getLocalHostName();
- for(final ServerRolloutTaskHandler.ServerPreparedResponse preparedResult : preparedResults) {
+ for(final ServerTaskExecutor.ServerPreparedResponse preparedResult : preparedResults) {
boolean rollback = completeRollback || domainOperationContext.isServerGroupRollback(preparedResult.getServerGroupName());
// Require a server reload, in case the operation failed, but the overall state was commit
if(! preparedResult.finalizeTransaction(! rollback)) {
@@ -171,7 +169,7 @@ public void execute(final OperationContext context, final ModelNode operation) t
}
final Future<ModelNode> future = executorService.submit(new ServerRequireRestartTask(identity, proxy, result));
// replace the existing future
- submittedTasks.put(identity, new ServerRolloutTaskHandler.ServerExecutedRequest(identity, future));
+ submittedTasks.put(identity, new ServerTaskExecutor.ExecutedServerRequest(identity, future));
} catch (Exception ignore) {
// getUncommittedResult() won't fail here
}
@@ -181,7 +179,7 @@ public void execute(final OperationContext context, final ModelNode operation) t
// before we expose the servers to further requests
boolean interrupted = false;
try {
- for (Map.Entry<ServerIdentity, ServerRolloutTaskHandler.ServerExecutedRequest> entry : submittedTasks.entrySet()) {
+ for (Map.Entry<ServerIdentity, ServerTaskExecutor.ExecutedServerRequest> entry : submittedTasks.entrySet()) {
Future<ModelNode> future = entry.getValue().getFinalResult();
try {
ModelNode finalResult = future.isCancelled() ? getCancelledResult() : future.get();
@@ -213,8 +211,8 @@ private ModelNode getCancelledResult() {
return cancelled;
}
- private void pushToServers(final OperationContext context, final ServerRolloutTaskHandler rolloutHandler) throws OperationFailedException {
-
+ private void pushToServers(final OperationContext context, final Map<ServerIdentity,ServerTaskExecutor.ExecutedServerRequest> submittedTasks,
+ final List<ServerTaskExecutor.ServerPreparedResponse> preparedResults) throws OperationFailedException {
final String localHostName = domainOperationContext.getLocalHostInfo().getLocalHostName();
Map<String, ModelNode> hostResults = new HashMap<String, ModelNode>(domainOperationContext.getHostControllerResults());
@@ -229,7 +227,7 @@ private void pushToServers(final OperationContext context, final ServerRolloutTa
HOST_CONTROLLER_LOGGER.tracef("Rollout plan is %s", rolloutPlan);
}
- final ServerTaskExecutor taskExecutor = new ServerTaskExecutor(context, rolloutHandler) {
+ final ServerTaskExecutor taskExecutor = new ServerTaskExecutor(context, submittedTasks, preparedResults) {
@Override
protected boolean execute(TransactionalProtocolClient.TransactionalOperationListener<ServerTaskExecutor.ServerOperation> listener, ServerIdentity server, ModelNode original) {
@@ -252,7 +250,7 @@ protected boolean execute(TransactionalProtocolClient.TransactionalOperationList
return executeOperation(listener, client, server, transformedOperation);
}
};
- RolloutPlanController rolloutPlanController = new RolloutPlanController(opsByGroup, rolloutPlan, domainOperationContext, taskExecutor, rolloutHandler, executorService);
+ RolloutPlanController rolloutPlanController = new RolloutPlanController(opsByGroup, rolloutPlan, domainOperationContext, taskExecutor, executorService);
RolloutPlanController.Result planResult = rolloutPlanController.execute();
if (trace) {
HOST_CONTROLLER_LOGGER.tracef("Rollout plan result is %s", planResult);
@@ -31,20 +31,21 @@
import java.util.List;
/**
+ * Task responsible for updating a single server-group.
+ *
* @author Emanuel Muckenhuber
*/
+// TODO cleanup ServerGroupRolloutTask vs. ServerUpdateTask vs. Concurrent/RollingUpdateTask
abstract class AbstractServerGroupRolloutTask implements Runnable {
- protected final List<ServerTask> tasks;
+ protected final List<ServerUpdateTask> tasks;
protected final ServerUpdatePolicy updatePolicy;
- protected final ServerRolloutTaskHandler rolloutHandler;
protected final ServerTaskExecutor executor;
- protected final AbstractServerUpdateTask.ServerUpdateResultHandler resultHandler;
+ protected final ServerUpdateTask.ServerUpdateResultHandler resultHandler;
- public AbstractServerGroupRolloutTask(List<ServerTask> tasks, ServerUpdatePolicy updatePolicy, ServerRolloutTaskHandler rolloutHandler, ServerTaskExecutor executor, final AbstractServerUpdateTask.ServerUpdateResultHandler resultHandler) {
+ public AbstractServerGroupRolloutTask(List<ServerUpdateTask> tasks, ServerUpdatePolicy updatePolicy, ServerTaskExecutor executor, final ServerUpdateTask.ServerUpdateResultHandler resultHandler) {
this.tasks = tasks;
this.updatePolicy = updatePolicy;
- this.rolloutHandler = rolloutHandler;
this.executor = executor;
this.resultHandler = resultHandler;
}
@@ -67,7 +68,7 @@ public void run() {
*/
protected void recordPreparedOperation(final ServerIdentity identity, final TransactionalProtocolClient.PreparedOperation<ServerTaskExecutor.ServerOperation> prepared) {
updatePolicy.recordServerResult(identity, prepared.getPreparedResult());
- rolloutHandler.recordPreparedOperation(prepared);
+ executor.recordPreparedOperation(prepared);
resultHandler.handleServerUpdateResult(identity, prepared.getPreparedResult());
}
@@ -32,18 +32,18 @@
/**
* @author Emanuel Muckenhuber
*/
-class ConcurrentGroupUpdateTask extends AbstractServerGroupRolloutTask implements Runnable {
+class ConcurrentServerGroupUpdateTask extends AbstractServerGroupRolloutTask implements Runnable {
- public ConcurrentGroupUpdateTask(List<ServerTask> tasks, ServerUpdatePolicy updatePolicy, ServerRolloutTaskHandler rolloutHandler,
- ServerTaskExecutor executor, AbstractServerUpdateTask.ServerUpdateResultHandler resultHandler) {
- super(tasks, updatePolicy, rolloutHandler, executor, resultHandler);
+ public ConcurrentServerGroupUpdateTask(List<ServerUpdateTask> tasks, ServerUpdatePolicy updatePolicy,
+ ServerTaskExecutor executor, ServerUpdateTask.ServerUpdateResultHandler resultHandler) {
+ super(tasks, updatePolicy, executor, resultHandler);
}
@Override
public void execute() {
final Set<ServerIdentity> outstanding = new HashSet<ServerIdentity>();
final ServerTaskExecutor.ServerOperationListener listener = new ServerTaskExecutor.ServerOperationListener();
- for(final ServerTask task : tasks) {
+ for(final ServerUpdateTask task : tasks) {
final ServerIdentity identity = task.getServerIdentity();
if(updatePolicy.canUpdateServer(identity)) {
// Execute the task
@@ -30,18 +30,18 @@
/**
* @author Emanuel Muckenhuber
*/
-class RollingGroupUpdateTask extends AbstractServerGroupRolloutTask implements Runnable {
+class RollingServerGroupUpdateTask extends AbstractServerGroupRolloutTask implements Runnable {
- public RollingGroupUpdateTask(List<ServerTask> tasks, ServerUpdatePolicy updatePolicy, ServerRolloutTaskHandler rolloutHandler,
- ServerTaskExecutor executor, AbstractServerUpdateTask.ServerUpdateResultHandler resultHandler) {
- super(tasks, updatePolicy, rolloutHandler, executor, resultHandler);
+ public RollingServerGroupUpdateTask(List<ServerUpdateTask> tasks, ServerUpdatePolicy updatePolicy,
+ ServerTaskExecutor executor, ServerUpdateTask.ServerUpdateResultHandler resultHandler) {
+ super(tasks, updatePolicy, executor, resultHandler);
}
@Override
public void execute() {
boolean interrupted = false;
final ServerTaskExecutor.ServerOperationListener listener = new ServerTaskExecutor.ServerOperationListener();
- for(final ServerTask task : tasks) {
+ for(final ServerUpdateTask task : tasks) {
final ServerIdentity identity = task.getServerIdentity();
if(interrupted || ! updatePolicy.canUpdateServer(identity)) {
sendCancelledResponse(identity);
@@ -26,7 +26,7 @@
import org.jboss.as.domain.controller.ServerIdentity;
import org.jboss.as.domain.controller.operations.coordination.DomainOperationContext;
-import org.jboss.as.domain.controller.plan.AbstractServerUpdateTask.ServerUpdateResultHandler;
+import org.jboss.as.domain.controller.plan.ServerUpdateTask.ServerUpdateResultHandler;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.Property;
@@ -50,18 +50,15 @@
private final long gracefulShutdownPeriod;
private final ServerTaskExecutor taskExecutor;
private final DomainOperationContext domainOperationContext;
- private final ServerRolloutTaskHandler rolloutHandler;
private final ConcurrentMap<String, Map<ServerIdentity, ModelNode>> serverResults = new ConcurrentHashMap<String, Map<ServerIdentity, ModelNode>>();
public RolloutPlanController(final Map<String, Map<ServerIdentity, ModelNode>> opsByGroup,
final ModelNode rolloutPlan,
final DomainOperationContext domainOperationContext,
final ServerTaskExecutor taskExecutor,
- final ServerRolloutTaskHandler rolloutHandler,
final ExecutorService executor) {
this.domainOperationContext = domainOperationContext;
this.taskExecutor = taskExecutor;
- this.rolloutHandler = rolloutHandler;
this.rollbackAcrossGroups = !rolloutPlan.hasDefined(ROLLBACK_ACROSS_GROUPS) || rolloutPlan.get(ROLLBACK_ACROSS_GROUPS).asBoolean();
this.shutdown = rolloutPlan.hasDefined(SHUTDOWN) && rolloutPlan.get(SHUTDOWN).asBoolean();
@@ -100,7 +97,7 @@ public RolloutPlanController(final Map<String, Map<ServerIdentity, ModelNode>> o
continue;
}
- final List<ServerTask> groupTasks = new ArrayList<ServerTask>();
+ final List<ServerUpdateTask> groupTasks = new ArrayList<ServerUpdateTask>();
final ModelNode policyNode = prop.getValue();
final boolean rollingGroup = policyNode.hasDefined(ROLLING_TO_SERVERS) && policyNode.get(ROLLING_TO_SERVERS).asBoolean();
@@ -115,8 +112,8 @@ else if (policyNode.hasDefined(MAX_FAILED_SERVERS)) {
}
ServerUpdatePolicy policy = new ServerUpdatePolicy(parent, serverGroupName, servers, maxFailures);
- seriesTasks.add(rollingGroup ? new RollingGroupUpdateTask(groupTasks, policy, rolloutHandler, taskExecutor, this)
- : new ConcurrentGroupUpdateTask(groupTasks, policy, rolloutHandler, taskExecutor, this));
+ seriesTasks.add(rollingGroup ? new RollingServerGroupUpdateTask(groupTasks, policy, taskExecutor, this)
+ : new ConcurrentServerGroupUpdateTask(groupTasks, policy, taskExecutor, this));
updatePolicies.put(serverGroupName, policy);
@@ -165,8 +162,8 @@ public void handleServerUpdateResult(ServerIdentity serverId, ModelNode response
groupResults.put(serverId, response);
}
- private ServerTask createServerTask(final ServerIdentity serverIdentity, final ModelNode serverOp, final ServerUpdatePolicy policy) {
- ServerTask result;
+ private ServerUpdateTask createServerTask(final ServerIdentity serverIdentity, final ModelNode serverOp, final ServerUpdatePolicy policy) {
+ ServerUpdateTask result;
if (shutdown) {
result = new ServerRestartTask(serverIdentity, policy, this, gracefulShutdownPeriod);
}
@@ -24,20 +24,16 @@
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.HOST;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP_ADDR;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.RUNNING_SERVER;
-import static org.jboss.as.domain.controller.DomainControllerLogger.HOST_CONTROLLER_LOGGER;
import org.jboss.as.domain.controller.ServerIdentity;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.Property;
/**
- * {@link org.jboss.as.domain.controller.plan.AbstractServerUpdateTask} that performs the updates by applying them
+ * {@link ServerUpdateTask} that performs the updates by applying them
* to a running server.
- *
- * Thread-Safety: This class is immutable, but is intended to only have its
- * {@link #run()} method executed once.
*/
-class RunningServerUpdateTask extends AbstractServerUpdateTask {
+class RunningServerUpdateTask extends ServerUpdateTask {
private final ModelNode serverUpdate;
@@ -1,23 +0,0 @@
-/**
- *
- */
-package org.jboss.as.domain.controller.plan;
-
-import org.jboss.as.domain.controller.ServerIdentity;
-import org.jboss.dmr.ModelNode;
-
-/**
- * Callback from a task when it wants to execute an operation.
- *
- * @author Brian Stansberry (c) 2011 Red Hat Inc.
- */
-public interface ServerOperationExecutor {
-
- /**
- * Execute an operation against the given server
- * @param server the identity of the server
- * @param operation the operation
- * @return the result, or {@code null} if the server is unknown
- */
- ModelNode executeServerOperation(ServerIdentity server, ModelNode operation);
-}
@@ -29,14 +29,11 @@
import org.jboss.dmr.ModelNode;
/**
- * {@link org.jboss.as.domain.controller.plan.AbstractServerUpdateTask} that performs the update by triggering a
+ * {@link ServerUpdateTask} that performs the update by triggering a
* restart of the server. The restart results in the server getting the current
* model state.
- *
- * Thread-Safety: This class is immutable, but is intended to only have its
- * {@link #run()} method executed once.
*/
-class ServerRestartTask extends AbstractServerUpdateTask {
+class ServerRestartTask extends ServerUpdateTask {
private final long gracefulTimeout;
Oops, something went wrong.

0 comments on commit 01514c8

Please sign in to comment.