Skip to content

Commit

Permalink
[WFLY-7648] Configuration of ActiveMQ client thread pools
Browse files Browse the repository at this point in the history
add global-client-thread-pool-max-size and
global-client-scheduled-thread-pool-max-size to configure the size of
thread pools used by any ActiveMQ client running inside the server.

These values are global for the JVM and will apply to any ActiveMQ
client (it has no relation to thread pools used by ActiveMQ server or
resource adapter).

JIRA: https://issues.jboss.org/browse/WFLY-7648
  • Loading branch information
jmesnil committed Nov 22, 2016
1 parent 2e40b65 commit d5cea7a
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 12 deletions.
Expand Up @@ -22,6 +22,10 @@

package org.wildfly.extension.messaging.activemq;

import static org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE;

import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.jboss.as.controller.AbstractBoottimeAddStepHandler;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
Expand Down Expand Up @@ -50,11 +54,7 @@ class MessagingSubsystemAdd extends AbstractBoottimeAddStepHandler {
public static final MessagingSubsystemAdd INSTANCE = new MessagingSubsystemAdd();

private MessagingSubsystemAdd() {
}

@Override
protected void populateModel(ModelNode operation, ModelNode model) throws OperationFailedException {
model.setEmptyObject();
super(MessagingSubsystemRootResourceDefinition.ATTRIBUTES);
}

@Override
Expand All @@ -75,6 +75,9 @@ protected void execute(DeploymentProcessorTarget processorTarget) {
processorTarget.addDeploymentProcessor(MessagingExtension.SUBSYSTEM_NAME, Phase.INSTALL, Phase.INSTALL_MESSAGING_XML_RESOURCES, new MessagingXmlInstallDeploymentUnitProcessor());
}
}, OperationContext.Stage.RUNTIME);
}

int threadPoolMaxSize = GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, operation).asInt();
int scheduledThreadPoolMaxSize = GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, operation).asInt();
ActiveMQClient.setGlobalThreadPoolProperties(threadPoolMaxSize, scheduledThreadPoolMaxSize);
}
}
Expand Up @@ -72,7 +72,10 @@ public class MessagingSubsystemParser_1_1 implements XMLStreamConstants, XMLElem
private static final PersistentResourceXMLDescription xmlDescription;

static {
xmlDescription = builder(MessagingSubsystemRootResourceDefinition.INSTANCE)
xmlDescription = builder(MessagingExtension.SUBSYSTEM_PATH)
.addAttributes(
MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE,
MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE)
.addChild(
builder(ServerDefinition.INSTANCE)
.addAttributes(
Expand Down
Expand Up @@ -22,20 +22,25 @@

package org.wildfly.extension.messaging.activemq;

import static org.jboss.as.controller.SimpleAttributeDefinitionBuilder.create;
import static org.jboss.as.controller.transform.description.RejectAttributeChecker.DEFINED;
import static org.jboss.dmr.ModelType.INT;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.PersistentResourceDefinition;
import org.jboss.as.controller.ReloadRequiredRemoveStepHandler;
import org.jboss.as.controller.SimpleAttributeDefinition;
import org.jboss.as.controller.SubsystemRegistration;
import org.jboss.as.controller.transform.description.AttributeConverter.DefaultValueAttributeConverter;
import org.jboss.as.controller.transform.description.DiscardAttributeChecker;
import org.jboss.as.controller.transform.description.ResourceTransformationDescriptionBuilder;
import org.jboss.as.controller.transform.description.TransformationDescription;
import org.jboss.as.controller.transform.description.TransformationDescriptionBuilder;
import org.jboss.dmr.ModelNode;
import org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttributes;

/**
Expand All @@ -45,6 +50,29 @@
*/
public class MessagingSubsystemRootResourceDefinition extends PersistentResourceDefinition {

public static final SimpleAttributeDefinition GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE = create("global-client-thread-pool-max-size", INT)
.setAttributeGroup("global-client")
.setXmlName("thread-pool-max-size")
.setAllowNull(true)
.setDefaultValue(new ModelNode(ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE))
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static final SimpleAttributeDefinition GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE = create("global-client-scheduled-thread-pool-max-size", INT)
.setAttributeGroup("global-client")
.setXmlName("scheduled-thread-pool-max-size")
.setAllowNull(true)
.setDefaultValue(new ModelNode(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE))
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static final AttributeDefinition[] ATTRIBUTES = {
GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE,
GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE
};

public static final MessagingSubsystemRootResourceDefinition INSTANCE = new MessagingSubsystemRootResourceDefinition();

private MessagingSubsystemRootResourceDefinition() {
Expand All @@ -56,17 +84,20 @@ private MessagingSubsystemRootResourceDefinition() {

@Override
public Collection<AttributeDefinition> getAttributes() {
return Collections.emptyList();
return Arrays.asList(ATTRIBUTES);
}

public static void registerTransformers(SubsystemRegistration subsystemRegistration) {
registerTransformers_EAP_7_0_0(subsystemRegistration);
}

private static void registerTransformers_EAP_7_0_0(SubsystemRegistration subsystemRegistration) {
final ResourceTransformationDescriptionBuilder builder = TransformationDescriptionBuilder.Factory.createSubsystemInstance();
final ResourceTransformationDescriptionBuilder subsystem = TransformationDescriptionBuilder.Factory.createSubsystemInstance();

rejectDefinedAttributeWithDefaultValue(subsystem, MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE,
MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE);

ResourceTransformationDescriptionBuilder server = builder.addChildResource(MessagingExtension.SERVER_PATH);
ResourceTransformationDescriptionBuilder server = subsystem.addChildResource(MessagingExtension.SERVER_PATH);
// reject journal-datasource, journal-bindings-table introduced in management version 2.0.0 if it is defined and different from the default value.
rejectDefinedAttributeWithDefaultValue(server, ServerDefinition.JOURNAL_DATASOURCE,
ServerDefinition.JOURNAL_MESSAGES_TABLE,
Expand All @@ -89,7 +120,7 @@ private static void registerTransformers_EAP_7_0_0(SubsystemRegistration subsyst
// reject min-pool-size whose default value has been changed in management version 2.0.0
defaultValueAttributeConverter(pooledConnectionFactory, ConnectionFactoryAttributes.Pooled.MIN_POOL_SIZE);

TransformationDescription.Tools.register(builder.build(), subsystemRegistration, MessagingExtension.VERSION_1_0_0);
TransformationDescription.Tools.register(subsystem.build(), subsystemRegistration, MessagingExtension.VERSION_1_0_0);
}

/**
Expand Down
Expand Up @@ -260,6 +260,8 @@ ha-policy.scale-down-group-name=Name of the group used to scale down.
ha-policy.scale-down=Configure whether this server send its messages to another live server in the scale-down cluster when it is shutdown cleanly.
ha-policy=A messaging resource that allows you to configure High Availability for the ActiveMQ server (the value of ha-policy can be live-only, replication-master, replication-slave, or replication-colocated).
legacy-connection-factory=Connection Factory resource used by legacy HornetQ clients to connect to the messaging-activemq subsystem.
messaging-activemq.global-client-scheduled-thread-pool-max-size=Maximum size of the pool of threads used by all ActiveMQ clients running inside this server.
messaging-activemq.global-client-thread-pool-max-size=Maximum size of the pool of scheduled threads used by all ActiveMQ clients running inside this server.
server.active=Whether the server is active (and accepting connections) or passive (in backup mode, waiting for failover).
server.add=Operation adding a ActiveMQ server.
server.allow-failback=Whether this server will automatically shutdown if the original live server comes back up.
Expand Down
Expand Up @@ -32,6 +32,12 @@
<xs:element name="subsystem">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="1" minOccurs="0" name="global-client">
<xs:complexType>
<xs:attribute name="thread-pool-max-size" type="xs:int" />
<xs:attribute name="scheduled-thread-pool-max-size" type="xs:int" />
</xs:complexType>
</xs:element>
<xs:element maxOccurs="unbounded" minOccurs="0" name="server" type="serverType" />
<xs:element maxOccurs="unbounded" minOccurs="0" name="jms-bridge" type="jms-bridgeType" />
</xs:sequence>
Expand Down
Expand Up @@ -141,6 +141,10 @@ private void testRejectingTransformers(ModelTestControllerVersion controllerVers
System.out.println("ops = " + ops);
PathAddress subsystemAddress = PathAddress.pathAddress(SUBSYSTEM_PATH);
ModelTestUtils.checkFailedTransformedBootOperations(mainServices, messagingVersion, ops, new FailedOperationTransformationConfig()
.addFailedAttribute(subsystemAddress,
new FailedOperationTransformationConfig.NewAttributesConfig(
MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE,
MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE))
.addFailedAttribute(subsystemAddress.append(SERVER_PATH),
new FailedOperationTransformationConfig.NewAttributesConfig(
ServerDefinition.JOURNAL_DATASOURCE,
Expand Down
@@ -1,4 +1,6 @@
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.1">
<global-client thread-pool-max-size="${global.client.thread-pool-max-size:32}"
scheduled-thread-pool-max-size="${global.client.scheduled.thread-pool-max-size:54}" />
<server name="default"
persistence-enabled="${persistence.enabled:false}"
persist-id-cache="${persist.id.cache:false}"
Expand Down
@@ -1,4 +1,6 @@
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.1">
<global-client thread-pool-max-size="${global.client.thread-pool-max-size:32}"
scheduled-thread-pool-max-size="${global.client.scheduled.thread-pool-max-size:54}" />
<server name="default">
<journal datasource="fooDS"
messages-table="MY_MESSAGES"
Expand Down

0 comments on commit d5cea7a

Please sign in to comment.