Skip to content

Commit

Permalink
[WFLY-4478] Stop running batch jobs when a shutdown is issued for the…
Browse files Browse the repository at this point in the history
… container. This ensures the container won't wait to shutdown until the batch jobs are fully completed.
  • Loading branch information
jamezp committed Dec 19, 2015
1 parent 4161113 commit c74fdd9
Show file tree
Hide file tree
Showing 29 changed files with 846 additions and 199 deletions.
Expand Up @@ -33,7 +33,8 @@ public enum Attribute {
UNKNOWN(null), UNKNOWN(null),
JNDI_NAME("jndi-name"), JNDI_NAME("jndi-name"),
DATA_SOURCE("data-source"), DATA_SOURCE("data-source"),
NAME("name"); NAME("name"),
VALUE("value");


private final String name; private final String name;


Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wildfly.extension.batch.jberet;

import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;

import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.AttributeMarshaller;
import org.jboss.dmr.ModelNode;

/**
* @author <a href="mailto:jperkins@redhat.com">James R. Perkins</a>
*/
class AttributeMarshallers {
public static final AttributeMarshaller NAMED = new AttributeMarshaller() {

@Override
public void marshallAsElement(final AttributeDefinition attribute, final ModelNode resourceModel, final boolean marshallDefault, final XMLStreamWriter writer) throws XMLStreamException {
if (resourceModel.hasDefined(attribute.getName())) {
writer.writeStartElement(attribute.getName());
writer.writeAttribute(Attribute.NAME.getLocalName(), resourceModel.get(attribute.getName()).asString());
writer.writeEndElement();
}
}
};

public static final AttributeMarshaller VALUE = new AttributeMarshaller() {

@Override
public void marshallAsElement(final AttributeDefinition attribute, final ModelNode resourceModel, final boolean marshallDefault, final XMLStreamWriter writer) throws XMLStreamException {
if (resourceModel.hasDefined(attribute.getName())) {
writer.writeStartElement(attribute.getName());
writer.writeAttribute(Attribute.VALUE.getLocalName(), resourceModel.get(attribute.getName()).asString());
writer.writeEndElement();
}
}
};
}
@@ -0,0 +1,50 @@
/*
* Copyright 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wildfly.extension.batch.jberet;

import org.jberet.repository.JobRepository;
import org.jberet.spi.JobExecutor;

/**
* A configuration for the {@link org.jberet.spi.BatchEnvironment} behavior.
*
* @author <a href="mailto:jperkins@redhat.com">James R. Perkins</a>
*/
public interface BatchConfiguration {

/**
* Indicates whether or no batch jobs should be restarted on a resume operation if they were stopped during a
* suspend.
*
* @return {@code true} to restart jobs on resume otherwise {@code false} to leave the jobs in a stopped state
*/
boolean isRestartOnResume();

/**
* Returns the default job repository to use.
*
* @return the default job repository
*/
JobRepository getDefaultJobRepository();

/**
* Returns the default job executor to use.
*
* @return the default job executor
*/
JobExecutor getDefaultJobExecutor();
}
@@ -0,0 +1,77 @@
/*
* Copyright 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wildfly.extension.batch.jberet;

import org.jberet.repository.JobRepository;
import org.jberet.spi.JobExecutor;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;

/**
* A default batch configuration service.
*
* @author <a href="mailto:jperkins@redhat.com">James R. Perkins</a>
*/
class BatchConfigurationService implements BatchConfiguration, Service<BatchConfiguration> {

private final InjectedValue<JobRepository> jobRepositoryInjector = new InjectedValue<>();
private final InjectedValue<JobExecutor> jobExecutorInjector = new InjectedValue<>();
private volatile boolean restartOnResume;

@Override
public boolean isRestartOnResume() {
return restartOnResume;
}

protected void setRestartOnResume(final boolean restartOnResume) {
this.restartOnResume = restartOnResume;
}

@Override
public JobRepository getDefaultJobRepository() {
return jobRepositoryInjector.getValue();
}

@Override
public JobExecutor getDefaultJobExecutor() {
return jobExecutorInjector.getValue();
}

@Override
public void start(final StartContext context) throws StartException {
}

@Override
public void stop(final StopContext context) {
}

@Override
public BatchConfiguration getValue() throws IllegalStateException, IllegalArgumentException {
return this;
}

protected InjectedValue<JobRepository> getJobRepositoryInjector() {
return jobRepositoryInjector;
}

protected InjectedValue<JobExecutor> getJobExecutorInjector() {
return jobExecutorInjector;
}
}
Expand Up @@ -24,12 +24,12 @@


import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.SUBSYSTEM; import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.SUBSYSTEM;


import java.util.stream.Collectors; import java.util.Collections;
import java.util.stream.Stream;


import org.jberet.repository.JobRepository; import org.jberet.repository.JobRepository;
import org.jberet.spi.JobExecutor; import org.jberet.spi.JobExecutor;
import org.jboss.as.controller.AbstractAddStepHandler; import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.AbstractWriteAttributeHandler;
import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException; import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.OperationStepHandler; import org.jboss.as.controller.OperationStepHandler;
Expand All @@ -46,7 +46,6 @@
import org.jboss.as.server.DeploymentProcessorTarget; import org.jboss.as.server.DeploymentProcessorTarget;
import org.jboss.as.server.deployment.Phase; import org.jboss.as.server.deployment.Phase;
import org.jboss.as.server.deployment.jbossallxml.JBossAllXmlParserRegisteringProcessor; import org.jboss.as.server.deployment.jbossallxml.JBossAllXmlParserRegisteringProcessor;
import org.jboss.as.threads.ManagedJBossThreadPoolExecutorService;
import org.jboss.as.threads.ThreadFactoryResourceDefinition; import org.jboss.as.threads.ThreadFactoryResourceDefinition;
import org.jboss.dmr.ModelNode; import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType; import org.jboss.dmr.ModelType;
Expand All @@ -56,7 +55,6 @@
import org.wildfly.extension.batch.jberet.deployment.BatchDeploymentDescriptorParser_1_0; import org.wildfly.extension.batch.jberet.deployment.BatchDeploymentDescriptorParser_1_0;
import org.wildfly.extension.batch.jberet.deployment.BatchDeploymentResourceProcessor; import org.wildfly.extension.batch.jberet.deployment.BatchDeploymentResourceProcessor;
import org.wildfly.extension.batch.jberet.deployment.BatchEnvironmentProcessor; import org.wildfly.extension.batch.jberet.deployment.BatchEnvironmentProcessor;
import org.wildfly.extension.batch.jberet.impl.JobExecutorService;
import org.wildfly.extension.batch.jberet.job.repository.InMemoryJobRepositoryDefinition; import org.wildfly.extension.batch.jberet.job.repository.InMemoryJobRepositoryDefinition;
import org.wildfly.extension.batch.jberet.job.repository.JdbcJobRepositoryDefinition; import org.wildfly.extension.batch.jberet.job.repository.JdbcJobRepositoryDefinition;
import org.wildfly.extension.batch.jberet.thread.pool.BatchThreadPoolResourceDefinition; import org.wildfly.extension.batch.jberet.thread.pool.BatchThreadPoolResourceDefinition;
Expand All @@ -74,18 +72,25 @@ public class BatchSubsystemDefinition extends SimpleResourceDefinition {
static final SimpleAttributeDefinition DEFAULT_JOB_REPOSITORY = SimpleAttributeDefinitionBuilder.create("default-job-repository", ModelType.STRING, false) static final SimpleAttributeDefinition DEFAULT_JOB_REPOSITORY = SimpleAttributeDefinitionBuilder.create("default-job-repository", ModelType.STRING, false)
.setAllowExpression(false) .setAllowExpression(false)
.setAttributeGroup("environment") .setAttributeGroup("environment")
.setAttributeMarshaller(NameAttributeMarshaller.INSTANCE) .setAttributeMarshaller(AttributeMarshallers.NAMED)
.setCapabilityReference(Capabilities.JOB_REPOSITORY_CAPABILITY.getName(), Capabilities.DEFAULT_JOB_REPOSITORY_CAPABILITY) .setCapabilityReference(Capabilities.JOB_REPOSITORY_CAPABILITY.getName(), Capabilities.BATCH_CONFIGURATION_CAPABILITY)
.setRestartAllServices() .setRestartAllServices()
.build(); .build();


static final SimpleAttributeDefinition DEFAULT_THREAD_POOL = SimpleAttributeDefinitionBuilder.create("default-thread-pool", ModelType.STRING, false) static final SimpleAttributeDefinition DEFAULT_THREAD_POOL = SimpleAttributeDefinitionBuilder.create("default-thread-pool", ModelType.STRING, false)
.setAllowExpression(false) .setAllowExpression(false)
.setAttributeGroup("environment") .setAttributeGroup("environment")
.setAttributeMarshaller(NameAttributeMarshaller.INSTANCE) .setAttributeMarshaller(AttributeMarshallers.NAMED)
.setCapabilityReference(Capabilities.THREAD_POOL_CAPABILITY.getName(), Capabilities.BATCH_CONFIGURATION_CAPABILITY)
.setRestartAllServices() .setRestartAllServices()
.build(); .build();


static final SimpleAttributeDefinition RESTART_JOBS_ON_RESUME = SimpleAttributeDefinitionBuilder.create("restart-jobs-on-resume", ModelType.BOOLEAN, true)
.setAllowExpression(true)
.setDefaultValue(new ModelNode(true))
.setAttributeMarshaller(AttributeMarshallers.VALUE)
.build();

private final boolean registerRuntimeOnly; private final boolean registerRuntimeOnly;


BatchSubsystemDefinition(final boolean registerRuntimeOnly) { BatchSubsystemDefinition(final boolean registerRuntimeOnly) {
Expand Down Expand Up @@ -119,12 +124,29 @@ public void registerAttributes(final ManagementResourceRegistration resourceRegi
final OperationStepHandler writeHandler = new ReloadRequiredWriteAttributeHandler(DEFAULT_JOB_REPOSITORY, DEFAULT_THREAD_POOL); final OperationStepHandler writeHandler = new ReloadRequiredWriteAttributeHandler(DEFAULT_JOB_REPOSITORY, DEFAULT_THREAD_POOL);
resourceRegistration.registerReadWriteAttribute(DEFAULT_JOB_REPOSITORY, null, writeHandler); resourceRegistration.registerReadWriteAttribute(DEFAULT_JOB_REPOSITORY, null, writeHandler);
resourceRegistration.registerReadWriteAttribute(DEFAULT_THREAD_POOL, null, writeHandler); resourceRegistration.registerReadWriteAttribute(DEFAULT_THREAD_POOL, null, writeHandler);
resourceRegistration.registerReadWriteAttribute(RESTART_JOBS_ON_RESUME, null, new AbstractWriteAttributeHandler<Boolean>() {
@Override
protected boolean applyUpdateToRuntime(final OperationContext context, final ModelNode operation, final String attributeName, final ModelNode resolvedValue, final ModelNode currentValue, final HandbackHolder<Boolean> handbackHolder) throws OperationFailedException {
setValue(context, resolvedValue);
return false;
}

@Override
protected void revertUpdateToRuntime(final OperationContext context, final ModelNode operation, final String attributeName, final ModelNode valueToRestore, final ModelNode valueToRevert, final Boolean handback) throws OperationFailedException {
setValue(context, valueToRestore);
}

private void setValue(final OperationContext context, final ModelNode value) {
final BatchConfigurationService service = (BatchConfigurationService) context.getServiceRegistry(true)
.getService(context.getCapabilityServiceName(Capabilities.BATCH_CONFIGURATION_CAPABILITY.getName(), BatchConfiguration.class));
service.setRestartOnResume(value.asBoolean());
}
});
} }


@Override @Override
public void registerCapabilities(ManagementResourceRegistration resourceRegistration) { public void registerCapabilities(ManagementResourceRegistration resourceRegistration) {
resourceRegistration.registerCapability(Capabilities.DEFAULT_JOB_REPOSITORY_CAPABILITY); resourceRegistration.registerCapability(Capabilities.BATCH_CONFIGURATION_CAPABILITY);
resourceRegistration.registerCapability(Capabilities.DEFAULT_THREAD_POOL_CAPABILITY);
} }


/** /**
Expand All @@ -135,7 +157,7 @@ static class BatchSubsystemAdd extends AbstractAddStepHandler {
static final BatchSubsystemAdd INSTANCE = new BatchSubsystemAdd(); static final BatchSubsystemAdd INSTANCE = new BatchSubsystemAdd();


private BatchSubsystemAdd() { private BatchSubsystemAdd() {
super(Stream.of(Capabilities.DEFAULT_JOB_REPOSITORY_CAPABILITY, Capabilities.DEFAULT_THREAD_POOL_CAPABILITY).collect(Collectors.toSet()), DEFAULT_JOB_REPOSITORY, DEFAULT_THREAD_POOL); super(Collections.singleton(Capabilities.BATCH_CONFIGURATION_CAPABILITY), DEFAULT_JOB_REPOSITORY, DEFAULT_THREAD_POOL, RESTART_JOBS_ON_RESUME);
} }


@Override @Override
Expand All @@ -160,33 +182,24 @@ public void execute(DeploymentProcessorTarget processorTarget) {
} }
}, OperationContext.Stage.RUNTIME); }, OperationContext.Stage.RUNTIME);


final ServiceTarget target = context.getServiceTarget();

final ModelNode defaultJobRepository = DEFAULT_JOB_REPOSITORY.resolveModelAttribute(context, model); final ModelNode defaultJobRepository = DEFAULT_JOB_REPOSITORY.resolveModelAttribute(context, model);
if (defaultJobRepository.isDefined()) {
final String name = defaultJobRepository.asString();
final DefaultValueService<JobRepository> service = DefaultValueService.create();
target.addService(context.getCapabilityServiceName(Capabilities.DEFAULT_JOB_REPOSITORY_CAPABILITY.getName(), JobRepository.class), service)
.addDependency(
context.getCapabilityServiceName(Capabilities.JOB_REPOSITORY_CAPABILITY.getName(), name, JobRepository.class),
JobRepository.class,
service.getInjector()
)
.install();
}

final ModelNode defaultThreadPool = DEFAULT_THREAD_POOL.resolveModelAttribute(context, model); final ModelNode defaultThreadPool = DEFAULT_THREAD_POOL.resolveModelAttribute(context, model);
if (defaultThreadPool.isDefined()) { final boolean restartOnResume = RESTART_JOBS_ON_RESUME.resolveModelAttribute(context, model).asBoolean();
final String name = defaultThreadPool.asString();
final JobExecutorService service = new JobExecutorService(); final ServiceTarget target = context.getServiceTarget();
target.addService(context.getCapabilityServiceName(Capabilities.DEFAULT_THREAD_POOL_CAPABILITY.getName(), JobExecutor.class), service) final BatchConfigurationService service = new BatchConfigurationService();
.addDependency( service.setRestartOnResume(restartOnResume);
BatchServiceNames.BASE_BATCH_THREAD_POOL_NAME.append(name), target.addService(context.getCapabilityServiceName(Capabilities.BATCH_CONFIGURATION_CAPABILITY.getName(), BatchConfiguration.class), service)
ManagedJBossThreadPoolExecutorService.class, .addDependency(
service.getThreadPoolInjector() context.getCapabilityServiceName(Capabilities.JOB_REPOSITORY_CAPABILITY.getName(), defaultJobRepository.asString(), JobRepository.class),
) JobRepository.class,
.install(); service.getJobRepositoryInjector()
} )
.addDependency(
context.getCapabilityServiceName(Capabilities.THREAD_POOL_CAPABILITY.getName(), defaultThreadPool.asString(), JobExecutor.class),
JobExecutor.class,
service.getJobExecutorInjector()
).install();
} }
} }
} }
Expand Up @@ -73,6 +73,9 @@ public void readElement(final XMLExtendedStreamReader reader, final List<ModelNo
BatchSubsystemDefinition.DEFAULT_JOB_REPOSITORY.parseAndSetParameter(readNameAttribute(reader), subsystemAddOp, reader); BatchSubsystemDefinition.DEFAULT_JOB_REPOSITORY.parseAndSetParameter(readNameAttribute(reader), subsystemAddOp, reader);
ParseUtils.requireNoContent(reader); ParseUtils.requireNoContent(reader);
requiredElements.remove(Element.DEFAULT_JOB_REPOSITORY); requiredElements.remove(Element.DEFAULT_JOB_REPOSITORY);
} else if (element == Element.RESTART_JOBS_ON_RESUME) {
BatchSubsystemDefinition.RESTART_JOBS_ON_RESUME.parseAndSetParameter(readValueAttribute(reader), subsystemAddOp, reader);
ParseUtils.requireNoContent(reader);
} else if (element == Element.JOB_REPOSITORY) { } else if (element == Element.JOB_REPOSITORY) {
final String name = readNameAttribute(reader); final String name = readNameAttribute(reader);
parseJobRepository(reader, subsystemAddress, name, ops); parseJobRepository(reader, subsystemAddress, name, ops);
Expand Down Expand Up @@ -119,6 +122,10 @@ static String readNameAttribute(final XMLExtendedStreamReader reader) throws XML
return readRequiredAttributes(reader, EnumSet.of(Attribute.NAME)).get(Attribute.NAME); return readRequiredAttributes(reader, EnumSet.of(Attribute.NAME)).get(Attribute.NAME);
} }


static String readValueAttribute(final XMLExtendedStreamReader reader) throws XMLStreamException {
return readRequiredAttributes(reader, EnumSet.of(Attribute.VALUE)).get(Attribute.VALUE);
}

/** /**
* Reads the required attributes from an XML configuration. * Reads the required attributes from an XML configuration.
* <p> * <p>
Expand Down
Expand Up @@ -47,6 +47,7 @@ public void writeContent(final XMLExtendedStreamWriter writer, final SubsystemMa
final ModelNode model = context.getModelNode(); final ModelNode model = context.getModelNode();
BatchSubsystemDefinition.DEFAULT_JOB_REPOSITORY.marshallAsElement(model, writer); BatchSubsystemDefinition.DEFAULT_JOB_REPOSITORY.marshallAsElement(model, writer);
BatchSubsystemDefinition.DEFAULT_THREAD_POOL.marshallAsElement(model, writer); BatchSubsystemDefinition.DEFAULT_THREAD_POOL.marshallAsElement(model, writer);
BatchSubsystemDefinition.RESTART_JOBS_ON_RESUME.marshallAsElement(model, writer);


// Write the in-memory job repositories // Write the in-memory job repositories
if (model.hasDefined(InMemoryJobRepositoryDefinition.NAME)) { if (model.hasDefined(InMemoryJobRepositoryDefinition.NAME)) {
Expand Down

0 comments on commit c74fdd9

Please sign in to comment.