diff --git a/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/ConnectionFactoryReferenceFactoryService.java b/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/ConnectionFactoryReferenceFactoryService.java index ad5fcec5bac1..3f5d957b1d2f 100644 --- a/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/ConnectionFactoryReferenceFactoryService.java +++ b/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/ConnectionFactoryReferenceFactoryService.java @@ -44,6 +44,15 @@ public class ConnectionFactoryReferenceFactoryService implements Service connectionFactoryValue = new InjectedValue(); private ManagedReference reference; + private final String name; + + public ConnectionFactoryReferenceFactoryService(String name) { + this.name = name; + } + + public String getName() { + return name; + } public synchronized void start(StartContext startContext) throws StartException { reference = new ValueManagedReference(new ImmediateValue(connectionFactoryValue.getValue())); diff --git a/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/deployment/AbstractResourceAdapterDeploymentService.java b/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/deployment/AbstractResourceAdapterDeploymentService.java index 071884454f8f..0a529727c140 100644 --- a/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/deployment/AbstractResourceAdapterDeploymentService.java +++ b/connector/src/main/java/org/jboss/as/connector/services/resourceadapters/deployment/AbstractResourceAdapterDeploymentService.java @@ -426,7 +426,7 @@ public String[] bindConnectionFactory(URL url, String deployment, Object cf, fin // to distinguish CFs with same name in different application (or module). final ContextNames.BindInfo bindInfo = getBindInfo(jndi); - final ConnectionFactoryReferenceFactoryService referenceFactoryService = new ConnectionFactoryReferenceFactoryService(); + final ConnectionFactoryReferenceFactoryService referenceFactoryService = new ConnectionFactoryReferenceFactoryService(deployment); final ServiceName referenceFactoryServiceName = ConnectionFactoryReferenceFactoryService.SERVICE_NAME_BASE .append(bindInfo.getBinderServiceName()); serviceTarget.addService(referenceFactoryServiceName, referenceFactoryService) diff --git a/docs/src/main/asciidoc/_admin-guide/subsystem-configuration/Messaging_Connect_a_pooled-connection-factory_to_a_Remote_Artemis_Server.adoc b/docs/src/main/asciidoc/_admin-guide/subsystem-configuration/Messaging_Connect_a_pooled-connection-factory_to_a_Remote_Artemis_Server.adoc index 08284a22e26f..6f0508a188ed 100644 --- a/docs/src/main/asciidoc/_admin-guide/subsystem-configuration/Messaging_Connect_a_pooled-connection-factory_to_a_Remote_Artemis_Server.adoc +++ b/docs/src/main/asciidoc/_admin-guide/subsystem-configuration/Messaging_Connect_a_pooled-connection-factory_to_a_Remote_Artemis_Server.adoc @@ -114,7 +114,29 @@ process the consumed messages and it can use the JMSReplyTo destination if it is defined on the message. + If the MDB needs any other JMS destinations defined on the remote server, it must use client-side JNDI by following the -http://activemq.apache.org/artemis/docs/1.1.0/using-jms.html#jndi-configuration[Artemis +http://http://activemq.apache.org/artemis/docs/2.6.0/using-jms.html#jndi-configuration[Artemis documentation] or configure external configuration context in the naming subsystem (which allows to inject the JMS resources using the `@Resource` annotation). + +[[configuration of a remote destination using annotations]] +==== Configuration of a remote destination using annotations + +The annotation `@JMSDestinationDefinition` can be used to create a destination on a remote Artemis Server. This will work in the same way as for a local server.+ +For this it needs to be able to access Artemis management queue. If your remote Artemis Server management queue is not the default one you can pass the management queue address as a property to the `@JMSDestinationDefinition`. +Please note that the destination is created remotely but won't be removed once the deployement is undeployed/removed. + +[source, java] +---- +@JMSDestinationDefinition( + // explicitly mention a resourceAdapter corresponding to a pooled-connection-factory resource to the remote server + resourceAdapter = "activemq-ra", + name="java:global/env/myQueue2", + interfaceName="javax.jms.Queue", + destinationName="myQueue2", + properties = { + "management-address=my.management.queue", + "selector=color = 'red'" + } +) +---- diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQReloadRequiredHandlers.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQReloadRequiredHandlers.java index 7c6b91d3d983..9ac24632208f 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQReloadRequiredHandlers.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQReloadRequiredHandlers.java @@ -32,6 +32,8 @@ import org.jboss.as.controller.OperationFailedException; import org.jboss.as.controller.registry.Resource; import org.jboss.dmr.ModelNode; +import org.jboss.msc.service.ServiceController; +import org.jboss.msc.service.ServiceName; /** * Requires a reload only if the {@link ActiveMQServerService} service is up and running. @@ -53,7 +55,7 @@ public AddStepHandler(AttributeDefinition... attributes) { @Override protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException { - if (ActiveMQServerService.isServiceInstalled(context)) { + if (isServiceInstalled(context)) { context.reloadRequired(); reloadRequired = true; } @@ -61,7 +63,7 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod @Override protected void rollbackRuntime(OperationContext context, ModelNode operation, Resource resource) { - if (reloadRequired && ActiveMQServerService.isServiceInstalled(context)) { + if (reloadRequired && isServiceInstalled(context)) { context.revertReloadRequired(); } } @@ -73,7 +75,7 @@ final class RemoveStepHandler extends AbstractRemoveStepHandler { @Override protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException { - if (ActiveMQServerService.isServiceInstalled(context)) { + if (isServiceInstalled(context)) { context.reloadRequired(); reloadRequired = true; } @@ -81,7 +83,7 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod @Override protected void recoverServices(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException { - if (reloadRequired && ActiveMQServerService.isServiceInstalled(context)) { + if (reloadRequired && isServiceInstalled(context)) { context.revertReloadRequired(); } } @@ -102,15 +104,33 @@ protected boolean applyUpdateToRuntime(OperationContext context, ModelNode opera ModelNode resolvedValue, ModelNode currentValue, org.jboss.as.controller.AbstractWriteAttributeHandler.HandbackHolder handbackHolder) throws OperationFailedException { - return ActiveMQServerService.isServiceInstalled(context); + return isServiceInstalled(context); } @Override protected void revertUpdateToRuntime(OperationContext context, ModelNode operation, String attributeName, ModelNode valueToRestore, ModelNode valueToRevert, Void handback) throws OperationFailedException { - if (ActiveMQServerService.isServiceInstalled(context)) { + if (isServiceInstalled(context)) { context.revertReloadRequired(); } } } + + /** + * Returns true if a {@link ServiceController} for this service has been {@link org.jboss.msc.service.ServiceBuilder#install() installed} + * in MSC under the + * {@link MessagingServices#getActiveMQServiceName(org.jboss.as.controller.PathAddress) service name appropriate to the given operation}. + * + * @param context the operation context + * @return {@code true} if a {@link ServiceController} is installed + */ + static boolean isServiceInstalled(final OperationContext context) { + if (context.isNormalServer()) { + final ServiceName serviceName = MessagingServices.getActiveMQServiceName(context.getCurrentAddress()); + if (serviceName != null) { + return context.getServiceRegistry(false).getService(serviceName) != null; + } + } + return false; + } } \ No newline at end of file diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java index 9459785b565f..a6d0270bc3f3 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java @@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; -import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.services.path.AbsolutePathService; import org.jboss.as.controller.services.path.PathManager; import org.jboss.as.network.ManagedBinding; @@ -59,8 +58,6 @@ import org.jboss.as.network.SocketBinding; import org.jboss.as.security.plugins.SecurityDomainContext; import org.jboss.msc.service.Service; -import org.jboss.msc.service.ServiceController; -import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.StartContext; import org.jboss.msc.service.StartException; import org.jboss.msc.service.StopContext; @@ -155,6 +152,7 @@ public ActiveMQServerService(Configuration configuration, } } + @Override public synchronized void start(final StartContext context) throws StartException { ClassLoader origTCCL = org.wildfly.security.manager.WildFlySecurityManager.getCurrentContextClassLoaderPrivileged(); // Validate whether the AIO native layer can be used @@ -330,23 +328,6 @@ public synchronized ActiveMQServer getValue() throws IllegalStateException { return server; } - /** - * Returns true if a {@link ServiceController} for this service has been {@link org.jboss.msc.service.ServiceBuilder#install() installed} - * in MSC under the - * {@link MessagingServices#getActiveMQServiceName(org.jboss.as.controller.PathAddress) service name appropriate to the given operation}. - * - * @param context the operation context - * @return {@code true} if a {@link ServiceController} is installed - */ - static boolean isServiceInstalled(final OperationContext context) { - if (context.isNormalServer()) { - final ServiceName serviceName = MessagingServices.getActiveMQServiceName(context.getCurrentAddress()); - if (serviceName != null) { - return context.getServiceRegistry(false).getService(serviceName) != null; - } - } - return false; - } CommandDispatcherFactory getCommandDispatcherFactory(String key) { return commandDispatcherFactories.get(key).get(); diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java index 3205441109e2..fae9035acced 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java @@ -37,7 +37,6 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; -import org.apache.activemq.artemis.core.config.Configuration; import org.jboss.as.controller.AbstractAddStepHandler; import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationFailedException; @@ -140,19 +139,15 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod } } - static void addBroadcastGroupConfigs(final OperationContext context, final Configuration configuration, final ModelNode model) throws OperationFailedException { + static void addBroadcastGroupConfigs(final OperationContext context, final List configs, final Set connectors, final ModelNode model) throws OperationFailedException { if (model.hasDefined(CommonAttributes.BROADCAST_GROUP)) { - final List configs = configuration.getBroadcastGroupConfigurations(); - final Set connectors = configuration.getConnectorConfigurations().keySet(); for (Property prop : model.get(CommonAttributes.BROADCAST_GROUP).asPropertyList()) { configs.add(createBroadcastGroupConfiguration(context, connectors, prop.getName(), prop.getValue())); - } } } static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final OperationContext context, final Set connectors, final String name, final ModelNode model) throws OperationFailedException { - final long broadcastPeriod = BroadcastGroupDefinition.BROADCAST_PERIOD.resolveModelAttribute(context, model).asLong(); final List connectorRefs = new ArrayList(); if (model.hasDefined(CommonAttributes.CONNECTORS)) { diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java index 7336e60872bb..20e7f7eabb1e 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java @@ -35,7 +35,6 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; -import org.apache.activemq.artemis.core.config.Configuration; import org.jboss.as.controller.AbstractAddStepHandler; import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationFailedException; @@ -134,18 +133,14 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod } } - static void addDiscoveryGroupConfigs(final OperationContext context, final Configuration configuration, final ModelNode model) throws OperationFailedException { + static Map addDiscoveryGroupConfigs(final OperationContext context, final ModelNode model) throws OperationFailedException { + Map configs = new HashMap<>(); if (model.hasDefined(CommonAttributes.DISCOVERY_GROUP)) { - Map configs = configuration.getDiscoveryGroupConfigurations(); - if (configs == null) { - configs = new HashMap<>(); - configuration.setDiscoveryGroupConfigurations(configs); - } for (Property prop : model.get(CommonAttributes.DISCOVERY_GROUP).asPropertyList()) { configs.put(prop.getName(), createDiscoveryGroupConfiguration(context, prop.getName(), prop.getValue())); - } } + return configs; } static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final OperationContext context, final String name, final ModelNode model) throws OperationFailedException { diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ExternalBrokerConfigurationService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ExternalBrokerConfigurationService.java new file mode 100644 index 000000000000..57608c8db9d4 --- /dev/null +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ExternalBrokerConfigurationService.java @@ -0,0 +1,99 @@ +/* + * Copyright 2018 JBoss by Red Hat. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wildfly.extension.messaging.activemq; + +import java.util.Map; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.jboss.msc.service.Service; +import org.jboss.msc.service.ServiceName; +import org.jboss.msc.service.StartContext; +import org.jboss.msc.service.StartException; +import org.jboss.msc.service.StopContext; + +/** + * + * @author Emmanuel Hugonnet (c) 2018 Red Hat, inc. + */ +public class ExternalBrokerConfigurationService implements Service { + private final Map connectors; + private final Map discoveryGroupConfigurations; + private final Map socketBindings; + private final Map outboundSocketBindings; + private final Map groupBindings; + // mapping between the {broadcast|discovery}-groups and the cluster names they use + private final Map clusterNames; + // mapping between the {broadcast|discovery}-groups and the command dispatcher factory they use + private final Map commandDispatcherFactories; + + public ExternalBrokerConfigurationService(final Map connectors, + Map discoveryGroupConfigurations, + Map socketBindings, + Map outboundSocketBindings, + Map groupBindings, + Map commandDispatcherFactories, + Map clusterNames) { + this.connectors = connectors; + this.discoveryGroupConfigurations = discoveryGroupConfigurations; + this.clusterNames = clusterNames; + this.commandDispatcherFactories = commandDispatcherFactories; + this.groupBindings = groupBindings; + this.outboundSocketBindings = outboundSocketBindings; + this.socketBindings = socketBindings; + } + + @Override + public void start(StartContext context) throws StartException { + } + + @Override + public void stop(StopContext context) { + } + + public Map getConnectors() { + return connectors; + } + + public Map getSocketBindings() { + return socketBindings; + } + + public Map getOutboundSocketBindings() { + return outboundSocketBindings; + } + + public Map getGroupBindings() { + return groupBindings; + } + + public Map getClusterNames() { + return clusterNames; + } + + public Map getCommandDispatcherFactories() { + return commandDispatcherFactories; + } + + public Map getDiscoveryGroupConfigurations() { + return discoveryGroupConfigurations; + } + + @Override + public ExternalBrokerConfigurationService getValue() throws IllegalStateException, IllegalArgumentException { + return this; + } + +} diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemAdd.java index 8a176d1aec2e..cf9863bfaa24 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemAdd.java @@ -24,23 +24,46 @@ import static org.apache.activemq.artemis.api.core.client.ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY; import static org.apache.activemq.artemis.api.core.client.ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.BROADCAST_GROUP; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.DISCOVERY_GROUP; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.JGROUPS_CLUSTER; +import static org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition.CONFIGURATION_CAPABILITY; import static org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_SCHEDULED_THREAD_POOL_MAX_SIZE; import static org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition.GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.jboss.as.controller.AbstractBoottimeAddStepHandler; import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationFailedException; +import org.jboss.as.controller.OperationStepHandler; +import org.jboss.as.controller.PathAddress; +import org.jboss.as.controller.registry.Resource; +import org.jboss.as.network.OutboundSocketBinding; +import org.jboss.as.network.SocketBinding; import org.jboss.as.server.AbstractDeploymentChainStep; import org.jboss.as.server.DeploymentProcessorTarget; import org.jboss.as.server.deployment.Phase; import org.jboss.dmr.ModelNode; import org.jboss.msc.service.Service; +import org.jboss.msc.service.ServiceBuilder; +import org.jboss.msc.service.ServiceName; +import org.jboss.msc.service.ServiceTarget; import org.jboss.msc.service.StartContext; import org.jboss.msc.service.StartException; import org.jboss.msc.service.StopContext; +import org.wildfly.clustering.spi.ClusteringDefaultRequirement; +import org.wildfly.clustering.spi.ClusteringRequirement; import org.wildfly.extension.messaging.activemq.deployment.DefaultJMSConnectionFactoryBindingProcessor; import org.wildfly.extension.messaging.activemq.deployment.DefaultJMSConnectionFactoryResourceReferenceProcessor; import org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionAnnotationProcessor; @@ -60,6 +83,8 @@ */ class MessagingSubsystemAdd extends AbstractBoottimeAddStepHandler { + private static final ServiceName JBOSS_MESSAGING_ACTIVEMQ = ServiceName.JBOSS.append(MessagingExtension.SUBSYSTEM_NAME); + public static final MessagingSubsystemAdd INSTANCE = new MessagingSubsystemAdd(); private MessagingSubsystemAdd() { @@ -128,6 +153,95 @@ protected void execute(DeploymentProcessorTarget processorTarget) { } context.getServiceTarget().addService(MessagingServices.ACTIVEMQ_CLIENT_THREAD_POOL, new ThreadPoolService()) .install(); + context.addStep(new OperationStepHandler() { + @Override + public void execute(OperationContext context, ModelNode operation) throws OperationFailedException { + final ServiceTarget serviceTarget = context.getServiceTarget(); + final ServiceBuilder serviceBuilder = serviceTarget.addService(CONFIGURATION_CAPABILITY.getCapabilityServiceName()); + // Transform the configuration based on the recursive model + final ModelNode model = Resource.Tools.readModel(context.readResource(PathAddress.EMPTY_ADDRESS)); + // Process connectors + final Set connectorsSocketBindings = new HashSet(); + final Map connectors = TransportConfigOperationHandlers.processConnectors(context, "localhost", model, connectorsSocketBindings); + + Map outboundSocketBindings = new HashMap<>(); + Map outbounds = TransportConfigOperationHandlers.listOutBoundSocketBinding(context, connectorsSocketBindings); + Map socketBindings = new HashMap<>(); + for (final String connectorSocketBinding : connectorsSocketBindings) { + // find whether the connectorSocketBinding references a SocketBinding or an OutboundSocketBinding + if (outbounds.get(connectorSocketBinding)) { + final ServiceName outboundSocketName = OutboundSocketBinding.OUTBOUND_SOCKET_BINDING_BASE_SERVICE_NAME.append(connectorSocketBinding); + outboundSocketBindings.put(connectorSocketBinding, outboundSocketName); + } else { + // check if the socket binding has not already been added by the acceptors + if (!socketBindings.containsKey(connectorSocketBinding)) { + socketBindings.put(connectorSocketBinding, SocketBinding.JBOSS_BINDING_NAME.append(connectorSocketBinding)); + } + } + } + final List broadcastGroupConfigurations =new ArrayList<>(); + //this requires connectors + BroadcastGroupAdd.addBroadcastGroupConfigs(context, broadcastGroupConfigurations, connectors.keySet(), model); + final Map discoveryGroupConfigurations = DiscoveryGroupAdd.addDiscoveryGroupConfigs(context, model); + final Map clusterNames = new HashMap<>(); + final Map commandDispatcherFactories = new HashMap<>(); + final Set commandDispatcherFactoryServices = new HashSet<>(); + final Map groupBindings = new HashMap<>(); + final Set groupBindingServices = new HashSet<>(); + for (final BroadcastGroupConfiguration config : broadcastGroupConfigurations) { + final String name = config.getName(); + final String key = "broadcast" + name; + ModelNode broadcastGroupModel = model.get(BROADCAST_GROUP, name); + + if (broadcastGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { + ModelNode channel = BroadcastGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, broadcastGroupModel); + ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); + String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, broadcastGroupModel).asString(); + if (!commandDispatcherFactoryServices.contains(commandDispatcherFactoryServiceName)) { + commandDispatcherFactoryServices.add(commandDispatcherFactoryServiceName); + } + commandDispatcherFactories.put(key, commandDispatcherFactoryServiceName); + clusterNames.put(key, clusterName); + } else { + final ServiceName groupBindingServiceName = GroupBindingService.getBroadcastBaseServiceName(JBOSS_MESSAGING_ACTIVEMQ).append(name); + if (!groupBindingServices.contains(groupBindingServiceName)) { + groupBindingServices.add(groupBindingServiceName); + } + groupBindings.put(key, groupBindingServiceName); + } + } + for (final DiscoveryGroupConfiguration config : discoveryGroupConfigurations.values()) { + final String name = config.getName(); + final String key = "discovery" + name; + ModelNode discoveryGroupModel = model.get(DISCOVERY_GROUP, name); + if (discoveryGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { + ModelNode channel = DiscoveryGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, discoveryGroupModel); + ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); + String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, discoveryGroupModel).asString(); + if (!commandDispatcherFactoryServices.contains(commandDispatcherFactoryServiceName)) { + commandDispatcherFactoryServices.add(commandDispatcherFactoryServiceName); + } + commandDispatcherFactories.put(key, commandDispatcherFactoryServiceName); + clusterNames.put(key, clusterName); + } else { + final ServiceName groupBindingServiceName = GroupBindingService.getDiscoveryBaseServiceName(JBOSS_MESSAGING_ACTIVEMQ).append(name); + if (!groupBindingServices.contains(groupBindingServiceName)) { + groupBindingServices.add(groupBindingServiceName); + } + groupBindings.put(key, groupBindingServiceName); + } + } + serviceBuilder.setInstance(new ExternalBrokerConfigurationService( + connectors, + discoveryGroupConfigurations, + socketBindings, + outboundSocketBindings, + groupBindings, + commandDispatcherFactories, + clusterNames)) + .install(); + } + }, OperationContext.Stage.RUNTIME); } /** diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemRootResourceDefinition.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemRootResourceDefinition.java index 836f840f1676..f9628a3d768a 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemRootResourceDefinition.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/MessagingSubsystemRootResourceDefinition.java @@ -32,6 +32,8 @@ import org.jboss.as.controller.PersistentResourceDefinition; import org.jboss.as.controller.ReloadRequiredRemoveStepHandler; import org.jboss.as.controller.SimpleAttributeDefinition; +import org.jboss.as.controller.SimpleResourceDefinition; +import org.jboss.as.controller.capability.RuntimeCapability; /** * {@link org.jboss.as.controller.ResourceDefinition} for the messaging subsystem root resource. @@ -39,6 +41,9 @@ * @author Brian Stansberry (c) 2011 Red Hat Inc. */ public class MessagingSubsystemRootResourceDefinition extends PersistentResourceDefinition { + public static final RuntimeCapability CONFIGURATION_CAPABILITY = RuntimeCapability.Builder.of("org.wildfly.messaging.activemq.external.configuration", false) + .setServiceType(ExternalBrokerConfigurationService.class) + .build(); public static final SimpleAttributeDefinition GLOBAL_CLIENT_THREAD_POOL_MAX_SIZE = create("global-client-thread-pool-max-size", INT) .setAttributeGroup("global-client") @@ -64,10 +69,11 @@ public class MessagingSubsystemRootResourceDefinition extends PersistentResource public static final MessagingSubsystemRootResourceDefinition INSTANCE = new MessagingSubsystemRootResourceDefinition(); private MessagingSubsystemRootResourceDefinition() { - super(MessagingExtension.SUBSYSTEM_PATH, - MessagingExtension.getResourceDescriptionResolver(MessagingExtension.SUBSYSTEM_NAME), - MessagingSubsystemAdd.INSTANCE, - new ReloadRequiredRemoveStepHandler()); + super(new SimpleResourceDefinition.Parameters(MessagingExtension.SUBSYSTEM_PATH, + MessagingExtension.getResourceDescriptionResolver(MessagingExtension.SUBSYSTEM_NAME)) + .setAddHandler(MessagingSubsystemAdd.INSTANCE) + .setRemoveHandler(new ReloadRequiredRemoveStepHandler()) + .setCapabilities(CONFIGURATION_CAPABILITY)); } @Override diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ServerAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ServerAdd.java index 0c79c8e7f1f7..5eb0bca84e72 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ServerAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ServerAdd.java @@ -102,7 +102,6 @@ import static org.wildfly.extension.messaging.activemq.ServerDefinition.TRANSACTION_TIMEOUT; import static org.wildfly.extension.messaging.activemq.ServerDefinition.TRANSACTION_TIMEOUT_SCAN_PERIOD; import static org.wildfly.extension.messaging.activemq.ServerDefinition.WILD_CARD_ROUTING_ENABLED; -import static org.wildfly.extension.messaging.activemq.ha.HAPolicyConfigurationBuilder.addHAPolicyConfiguration; import java.util.ArrayList; import java.util.Collections; @@ -161,6 +160,7 @@ import org.wildfly.clustering.spi.ClusteringDefaultRequirement; import org.wildfly.clustering.spi.ClusteringRequirement; import org.wildfly.common.function.ExceptionSupplier; +import org.wildfly.extension.messaging.activemq.ha.HAPolicyConfigurationBuilder; import org.wildfly.extension.messaging.activemq.jms.JMSService; import org.wildfly.extension.messaging.activemq.logging.MessagingLogger; import org.wildfly.security.auth.server.SecurityDomain; @@ -255,505 +255,515 @@ private void checkNoAttributesIsDefined(String definedAttributeName, PathAddress protected void performRuntime(OperationContext context, ModelNode operation, Resource resource) throws OperationFailedException { // Add a RUNTIME step to actually install the ActiveMQ Service. This will execute after the runtime step // added by any child resources whose ADD handler executes after this one in the model stage. - context.addStep(new OperationStepHandler() { - @Override - public void execute(OperationContext context, ModelNode operation) throws OperationFailedException { - final ServiceTarget serviceTarget = context.getServiceTarget(); + context.addStep(new InstallServerHandler(resource), OperationContext.Stage.RUNTIME); + } - final String serverName = context.getCurrentAddressValue(); + private class InstallServerHandler implements OperationStepHandler { - // Transform the configuration based on the recursive model - final ModelNode model = Resource.Tools.readModel(resource); - final Configuration configuration = transformConfig(context, serverName, model); - - // Create path services - String bindingsPath = PATHS.get(BINDINGS_DIRECTORY).resolveModelAttribute(context, model.get(PATH, BINDINGS_DIRECTORY)).asString(); - String bindingsRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, BINDINGS_DIRECTORY)).asString(); - String journalPath = PATHS.get(JOURNAL_DIRECTORY).resolveModelAttribute(context, model.get(PATH, JOURNAL_DIRECTORY)).asString(); - String journalRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, JOURNAL_DIRECTORY)).asString(); - String largeMessagePath = PATHS.get(LARGE_MESSAGES_DIRECTORY).resolveModelAttribute(context, model.get(PATH, LARGE_MESSAGES_DIRECTORY)).asString(); - String largeMessageRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, LARGE_MESSAGES_DIRECTORY)).asString(); - String pagingPath = PATHS.get(PAGING_DIRECTORY).resolveModelAttribute(context, model.get(PATH, PAGING_DIRECTORY)).asString(); - String pagingRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, PAGING_DIRECTORY)).asString(); - - // Add the ActiveMQ Service - ServiceName activeMQServiceName = MessagingServices.getActiveMQServiceName(serverName); - final ServiceBuilder serviceBuilder = serviceTarget.addService(activeMQServiceName); - Supplier pathManager = serviceBuilder.requires(context.getCapabilityServiceName(PATH_MANAGER_CAPABILITY, PathManager.class)); - - Optional> dataSource = Optional.empty(); - ModelNode dataSourceModel = JOURNAL_DATASOURCE.resolveModelAttribute(context, model); - if (dataSourceModel.isDefined()) { - ServiceName dataSourceCapability = context.getCapabilityServiceName(DATA_SOURCE_CAPABILITY, dataSourceModel.asString(), DataSource.class); - dataSource = Optional.of(serviceBuilder.requires(dataSourceCapability)); - } - Optional> mbeanServer = Optional.empty(); - if (context.hasOptionalCapability(JMX_CAPABILITY, ACTIVEMQ_SERVER_CAPABILITY.getDynamicName(serverName), null)) { - ServiceName jmxCapability = context.getCapabilityServiceName(JMX_CAPABILITY, MBeanServer.class); - mbeanServer = Optional.of(serviceBuilder.requires(jmxCapability)); - } + private final Resource resource; - // Inject a reference to the Elytron security domain if one has been defined. - Optional> elytronSecurityDomain = Optional.empty(); - // legacy security - Optional> securityDomainContext = Optional.empty(); - final ModelNode elytronSecurityDomainModel = ELYTRON_DOMAIN.resolveModelAttribute(context, model); - if (elytronSecurityDomainModel.isDefined()) { - ServiceName elytronDomainCapability = context.getCapabilityServiceName(ELYTRON_DOMAIN_CAPABILITY, elytronSecurityDomainModel.asString(), SecurityDomain.class); - elytronSecurityDomain = Optional.of(serviceBuilder.requires(elytronDomainCapability)); - } else { - // Add legacy security - String domain = SECURITY_DOMAIN.resolveModelAttribute(context, model).asString(); - securityDomainContext = Optional.of(serviceBuilder.requires(SecurityDomainService.SERVICE_NAME.append(domain))); - // WFLY-6652 / WFLY-10292 this dependency ensures that Artemis will be able to destroy any queues created on behalf of a - // pooled-connection-factory client during server stop - serviceBuilder.requires(SecurityBootstrapService.SERVICE_NAME); - } + private InstallServerHandler(Resource resource) { + this.resource = resource; + } + + @Override + public void execute(OperationContext context, ModelNode operation) throws OperationFailedException { + final ServiceTarget serviceTarget = context.getServiceTarget(); + + final String serverName = context.getCurrentAddressValue(); + + // Transform the configuration based on the recursive model + final ModelNode model = Resource.Tools.readModel(resource); + final Configuration configuration = transformConfig(context, serverName, model); + + // Create path services + String bindingsPath = PATHS.get(BINDINGS_DIRECTORY).resolveModelAttribute(context, model.get(PATH, BINDINGS_DIRECTORY)).asString(); + String bindingsRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, BINDINGS_DIRECTORY)).asString(); + String journalPath = PATHS.get(JOURNAL_DIRECTORY).resolveModelAttribute(context, model.get(PATH, JOURNAL_DIRECTORY)).asString(); + String journalRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, JOURNAL_DIRECTORY)).asString(); + String largeMessagePath = PATHS.get(LARGE_MESSAGES_DIRECTORY).resolveModelAttribute(context, model.get(PATH, LARGE_MESSAGES_DIRECTORY)).asString(); + String largeMessageRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, LARGE_MESSAGES_DIRECTORY)).asString(); + String pagingPath = PATHS.get(PAGING_DIRECTORY).resolveModelAttribute(context, model.get(PATH, PAGING_DIRECTORY)).asString(); + String pagingRelativeToPath = RELATIVE_TO.resolveModelAttribute(context, model.get(PATH, PAGING_DIRECTORY)).asString(); + + // Add the ActiveMQ Service + ServiceName activeMQServiceName = MessagingServices.getActiveMQServiceName(serverName); + final ServiceBuilder serviceBuilder = serviceTarget.addService(activeMQServiceName); + Supplier pathManager = serviceBuilder.requires(context.getCapabilityServiceName(PATH_MANAGER_CAPABILITY, PathManager.class)); + + Optional> dataSource = Optional.empty(); + ModelNode dataSourceModel = JOURNAL_DATASOURCE.resolveModelAttribute(context, model); + if (dataSourceModel.isDefined()) { + ServiceName dataSourceCapability = context.getCapabilityServiceName(DATA_SOURCE_CAPABILITY, dataSourceModel.asString(), DataSource.class); + dataSource = Optional.of(serviceBuilder.requires(dataSourceCapability)); + } + Optional> mbeanServer = Optional.empty(); + if (context.hasOptionalCapability(JMX_CAPABILITY, ACTIVEMQ_SERVER_CAPABILITY.getDynamicName(serverName), null)) { + ServiceName jmxCapability = context.getCapabilityServiceName(JMX_CAPABILITY, MBeanServer.class); + mbeanServer = Optional.of(serviceBuilder.requires(jmxCapability)); + } - List incomingInterceptors = processInterceptors(INCOMING_INTERCEPTORS.resolveModelAttribute(context, operation)); - List outgoingInterceptors = processInterceptors(OUTGOING_INTERCEPTORS.resolveModelAttribute(context, operation)); + // Inject a reference to the Elytron security domain if one has been defined. + Optional> elytronSecurityDomain = Optional.empty(); + // legacy security + Optional> securityDomainContext = Optional.empty(); + final ModelNode elytronSecurityDomainModel = ELYTRON_DOMAIN.resolveModelAttribute(context, model); + if (elytronSecurityDomainModel.isDefined()) { + ServiceName elytronDomainCapability = context.getCapabilityServiceName(ELYTRON_DOMAIN_CAPABILITY, elytronSecurityDomainModel.asString(), SecurityDomain.class); + elytronSecurityDomain = Optional.of(serviceBuilder.requires(elytronDomainCapability)); + } else { + // Add legacy security + String domain = SECURITY_DOMAIN.resolveModelAttribute(context, model).asString(); + securityDomainContext = Optional.of(serviceBuilder.requires(SecurityDomainService.SERVICE_NAME.append(domain))); + // WFLY-6652 / WFLY-10292 this dependency ensures that Artemis will be able to destroy any queues created on behalf of a + // pooled-connection-factory client during server stop + serviceBuilder.requires(SecurityBootstrapService.SERVICE_NAME); + } - // Process acceptors and connectors - final Set socketBindingNames = new HashSet(); - TransportConfigOperationHandlers.processAcceptors(context, configuration, model, socketBindingNames); + List incomingInterceptors = processInterceptors(INCOMING_INTERCEPTORS.resolveModelAttribute(context, operation)); + List outgoingInterceptors = processInterceptors(OUTGOING_INTERCEPTORS.resolveModelAttribute(context, operation)); - Map> socketBindings = new HashMap<>(); - for (final String socketBindingName : socketBindingNames) { - Supplier socketBinding = serviceBuilder.requires(SocketBinding.JBOSS_BINDING_NAME.append(socketBindingName)); - socketBindings.put(socketBindingName, socketBinding); - } + // Process acceptors and connectors + final Set socketBindingNames = new HashSet(); + TransportConfigOperationHandlers.processAcceptors(context, configuration, model, socketBindingNames); - final Set connectorsSocketBindings = new HashSet(); - TransportConfigOperationHandlers.processConnectors(context, configuration, model, connectorsSocketBindings); - - Map> outboundSocketBindings = new HashMap<>(); - Map outbounds = TransportConfigOperationHandlers.listOutBoundSocketBinding(context, connectorsSocketBindings); - for (final String connectorSocketBinding : connectorsSocketBindings) { - // find whether the connectorSocketBinding references a SocketBinding or an OutboundSocketBinding - if (outbounds.get(connectorSocketBinding)) { - final ServiceName outboundSocketName = OutboundSocketBinding.OUTBOUND_SOCKET_BINDING_BASE_SERVICE_NAME.append(connectorSocketBinding); - Supplier outboundSocketBinding = serviceBuilder.requires(outboundSocketName); - outboundSocketBindings.put(connectorSocketBinding, outboundSocketBinding); - } else { - // check if the socket binding has not already been added by the acceptors - if (!socketBindings.containsKey(connectorSocketBinding)) { - Supplier socketBinding = serviceBuilder.requires(SocketBinding.JBOSS_BINDING_NAME.append(connectorSocketBinding)); - socketBindings.put(connectorSocketBinding, socketBinding); - } - } - } - // if there is any HTTP acceptor, add a dependency on the http-upgrade-registry service to - // make sure that ActiveMQ server will be stopped *after* the registry (and its underlying XNIO thread) - // is stopped. - Set httpListeners = new HashSet<>(); - if (model.hasDefined(HTTP_ACCEPTOR)) { - for (final Property property : model.get(HTTP_ACCEPTOR).asPropertyList()) { - String httpListener = HTTPAcceptorDefinition.HTTP_LISTENER.resolveModelAttribute(context, property.getValue()).asString(); - httpListeners.add(httpListener); + Map> socketBindings = new HashMap<>(); + for (final String socketBindingName : socketBindingNames) { + Supplier socketBinding = serviceBuilder.requires(SocketBinding.JBOSS_BINDING_NAME.append(socketBindingName)); + socketBindings.put(socketBindingName, socketBinding); + } + + final Set connectorsSocketBindings = new HashSet(); + configuration.setConnectorConfigurations(TransportConfigOperationHandlers.processConnectors(context, configuration.getName(), model, connectorsSocketBindings)); + + Map> outboundSocketBindings = new HashMap<>(); + Map outbounds = TransportConfigOperationHandlers.listOutBoundSocketBinding(context, connectorsSocketBindings); + for (final String connectorSocketBinding : connectorsSocketBindings) { + // find whether the connectorSocketBinding references a SocketBinding or an OutboundSocketBinding + if (outbounds.get(connectorSocketBinding)) { + final ServiceName outboundSocketName = OutboundSocketBinding.OUTBOUND_SOCKET_BINDING_BASE_SERVICE_NAME.append(connectorSocketBinding); + Supplier outboundSocketBinding = serviceBuilder.requires(outboundSocketName); + outboundSocketBindings.put(connectorSocketBinding, outboundSocketBinding); + } else { + // check if the socket binding has not already been added by the acceptors + if (!socketBindings.containsKey(connectorSocketBinding)) { + Supplier socketBinding = serviceBuilder.requires(SocketBinding.JBOSS_BINDING_NAME.append(connectorSocketBinding)); + socketBindings.put(connectorSocketBinding, socketBinding); } } - for (String httpListener : httpListeners) { - serviceBuilder.requires(MessagingServices.HTTP_UPGRADE_REGISTRY.append(httpListener)); + } + // if there is any HTTP acceptor, add a dependency on the http-upgrade-registry service to + // make sure that ActiveMQ server will be stopped *after* the registry (and its underlying XNIO thread) + // is stopped. + Set httpListeners = new HashSet<>(); + if (model.hasDefined(HTTP_ACCEPTOR)) { + for (final Property property : model.get(HTTP_ACCEPTOR).asPropertyList()) { + String httpListener = HTTPAcceptorDefinition.HTTP_LISTENER.resolveModelAttribute(context, property.getValue()).asString(); + httpListeners.add(httpListener); } + } + for (String httpListener : httpListeners) { + serviceBuilder.requires(MessagingServices.HTTP_UPGRADE_REGISTRY.append(httpListener)); + } - //this requires connectors - BroadcastGroupAdd.addBroadcastGroupConfigs(context, configuration, model); - - final List broadcastGroupConfigurations = configuration.getBroadcastGroupConfigurations(); - final Map discoveryGroupConfigurations = configuration.getDiscoveryGroupConfigurations(); - - final Map clusterNames = new HashMap<>(); - final Map> commandDispatcherFactories = new HashMap<>(); - final Map> commandDispatcherFactoryServices = new HashMap<>(); - final Map> groupBindings = new HashMap<>(); - final Map> groupBindingServices = new HashMap<>(); - - if(broadcastGroupConfigurations != null) { - for(final BroadcastGroupConfiguration config : broadcastGroupConfigurations) { - final String name = config.getName(); - final String key = "broadcast" + name; - ModelNode broadcastGroupModel = model.get(BROADCAST_GROUP, name); - - if (broadcastGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { - ModelNode channel = BroadcastGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, broadcastGroupModel); - ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); - String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, broadcastGroupModel).asString(); - if (!commandDispatcherFactoryServices.containsKey(commandDispatcherFactoryServiceName)) { - Supplier commandDispatcherFactory = serviceBuilder.requires(commandDispatcherFactoryServiceName); - commandDispatcherFactoryServices.put(commandDispatcherFactoryServiceName, commandDispatcherFactory); - } - commandDispatcherFactories.put(key, commandDispatcherFactoryServices.get(commandDispatcherFactoryServiceName)); - clusterNames.put(key, clusterName); - } else { - final ServiceName groupBindingServiceName = GroupBindingService.getBroadcastBaseServiceName(activeMQServiceName).append(name); - if (!groupBindingServices.containsKey(groupBindingServiceName)) { - Supplier groupBinding = serviceBuilder.requires(groupBindingServiceName); - groupBindingServices.put(groupBindingServiceName, groupBinding) ; - } - groupBindings.put(key, groupBindingServices.get(groupBindingServiceName)); + //this requires connectors + BroadcastGroupAdd.addBroadcastGroupConfigs(context, configuration.getBroadcastGroupConfigurations(), configuration.getConnectorConfigurations().keySet(), model); + + final List broadcastGroupConfigurations = configuration.getBroadcastGroupConfigurations(); + final Map discoveryGroupConfigurations = configuration.getDiscoveryGroupConfigurations(); + + final Map clusterNames = new HashMap<>(); + final Map> commandDispatcherFactories = new HashMap<>(); + final Map> commandDispatcherFactoryServices = new HashMap<>(); + final Map> groupBindings = new HashMap<>(); + final Map> groupBindingServices = new HashMap<>(); + + if (broadcastGroupConfigurations != null) { + for (final BroadcastGroupConfiguration config : broadcastGroupConfigurations) { + final String name = config.getName(); + final String key = "broadcast" + name; + ModelNode broadcastGroupModel = model.get(BROADCAST_GROUP, name); + + if (broadcastGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { + ModelNode channel = BroadcastGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, broadcastGroupModel); + ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); + String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, broadcastGroupModel).asString(); + if (!commandDispatcherFactoryServices.containsKey(commandDispatcherFactoryServiceName)) { + Supplier commandDispatcherFactory = serviceBuilder.requires(commandDispatcherFactoryServiceName); + commandDispatcherFactoryServices.put(commandDispatcherFactoryServiceName, commandDispatcherFactory); + } + commandDispatcherFactories.put(key, commandDispatcherFactoryServices.get(commandDispatcherFactoryServiceName)); + clusterNames.put(key, clusterName); + } else { + final ServiceName groupBindingServiceName = GroupBindingService.getBroadcastBaseServiceName(activeMQServiceName).append(name); + if (!groupBindingServices.containsKey(groupBindingServiceName)) { + Supplier groupBinding = serviceBuilder.requires(groupBindingServiceName); + groupBindingServices.put(groupBindingServiceName, groupBinding); } + groupBindings.put(key, groupBindingServices.get(groupBindingServiceName)); } } - if(discoveryGroupConfigurations != null) { - for(final DiscoveryGroupConfiguration config : discoveryGroupConfigurations.values()) { - final String name = config.getName(); - final String key = "discovery" + name; - ModelNode discoveryGroupModel = model.get(DISCOVERY_GROUP, name); - if (discoveryGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { - ModelNode channel = DiscoveryGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, discoveryGroupModel); - ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); - String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, discoveryGroupModel).asString(); - if (!commandDispatcherFactoryServices.containsKey(commandDispatcherFactoryServiceName)) { - Supplier commandDispatcherFactory = serviceBuilder.requires(commandDispatcherFactoryServiceName); - commandDispatcherFactoryServices.put(commandDispatcherFactoryServiceName, commandDispatcherFactory); - } - commandDispatcherFactories.put(key, commandDispatcherFactoryServices.get(commandDispatcherFactoryServiceName)); - clusterNames.put(key, clusterName); - } else { - final ServiceName groupBindingServiceName = GroupBindingService.getDiscoveryBaseServiceName(activeMQServiceName).append(name); - if (!groupBindingServices.containsKey(groupBindingServiceName)) { - Supplier groupBinding = serviceBuilder.requires(groupBindingServiceName); - groupBindingServices.put(groupBindingServiceName, groupBinding) ; - } - groupBindings.put(key, groupBindingServices.get(groupBindingServiceName)); + } + if (discoveryGroupConfigurations != null) { + for (final DiscoveryGroupConfiguration config : discoveryGroupConfigurations.values()) { + final String name = config.getName(); + final String key = "discovery" + name; + ModelNode discoveryGroupModel = model.get(DISCOVERY_GROUP, name); + if (discoveryGroupModel.hasDefined(JGROUPS_CLUSTER.getName())) { + ModelNode channel = DiscoveryGroupDefinition.JGROUPS_CHANNEL.resolveModelAttribute(context, discoveryGroupModel); + ServiceName commandDispatcherFactoryServiceName = channel.isDefined() ? ClusteringRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context, channel.asString()) : ClusteringDefaultRequirement.COMMAND_DISPATCHER_FACTORY.getServiceName(context); + String clusterName = JGROUPS_CLUSTER.resolveModelAttribute(context, discoveryGroupModel).asString(); + if (!commandDispatcherFactoryServices.containsKey(commandDispatcherFactoryServiceName)) { + Supplier commandDispatcherFactory = serviceBuilder.requires(commandDispatcherFactoryServiceName); + commandDispatcherFactoryServices.put(commandDispatcherFactoryServiceName, commandDispatcherFactory); } + commandDispatcherFactories.put(key, commandDispatcherFactoryServices.get(commandDispatcherFactoryServiceName)); + clusterNames.put(key, clusterName); + } else { + final ServiceName groupBindingServiceName = GroupBindingService.getDiscoveryBaseServiceName(activeMQServiceName).append(name); + if (!groupBindingServices.containsKey(groupBindingServiceName)) { + Supplier groupBinding = serviceBuilder.requires(groupBindingServiceName); + groupBindingServices.put(groupBindingServiceName, groupBinding); + } + groupBindings.put(key, groupBindingServices.get(groupBindingServiceName)); } } + } - // Create the ActiveMQ Service - final ActiveMQServerService serverService = new ActiveMQServerService( - configuration, - new ActiveMQServerService.PathConfig(bindingsPath, bindingsRelativeToPath, journalPath, journalRelativeToPath, largeMessagePath, largeMessageRelativeToPath, pagingPath, pagingRelativeToPath), - pathManager, - incomingInterceptors, - outgoingInterceptors, - socketBindings, - outboundSocketBindings, - groupBindings, - commandDispatcherFactories, - clusterNames, - elytronSecurityDomain, - securityDomainContext, - mbeanServer, - dataSource - ); - // inject credential-references for bridges - addBridgeCredentialStoreReference(serverService, configuration, BridgeDefinition.CREDENTIAL_REFERENCE, context, model, serviceBuilder); - addClusterCredentialStoreReference(serverService, ServerDefinition.CREDENTIAL_REFERENCE, context, model, serviceBuilder); - - // Install the ActiveMQ Service - ServiceController activeMQServerServiceController = serviceBuilder.setInstance(serverService) - .install(); - - //Add the queue services for the core queues created throught the internal broker configuration (those queues are not added as service via the QueueAdd OSH) - for (CoreQueueConfiguration queueConfiguration : configuration.getQueueConfigurations()) { - final ServiceName queueServiceName = activeMQServiceName.append(queueConfiguration.getName()); - final ServiceBuilder sb = context.getServiceTarget().addService(queueServiceName); - sb.requires(ActiveMQActivationService.getServiceName(activeMQServiceName)); - Supplier serverSupplier = sb.requires(activeMQServiceName); - final QueueService queueService = new QueueService(serverSupplier, queueConfiguration, false, false); - sb.setInitialMode(Mode.PASSIVE); - sb.setInstance(queueService); - sb.install(); - } - // Provide our custom Resource impl a ref to the ActiveMQ server so it can create child runtime resources - ((ActiveMQServerResource)resource).setActiveMQServerServiceController(activeMQServerServiceController); - - // Install the JMSService - boolean overrideInVMSecurity = OVERRIDE_IN_VM_SECURITY.resolveModelAttribute(context, operation).asBoolean(); - JMSService.addService(serviceTarget, activeMQServiceName, overrideInVMSecurity); - - context.completeStep(OperationContext.RollbackHandler.NOOP_ROLLBACK_HANDLER); + // Create the ActiveMQ Service + final ActiveMQServerService serverService = new ActiveMQServerService( + configuration, + new ActiveMQServerService.PathConfig(bindingsPath, bindingsRelativeToPath, journalPath, journalRelativeToPath, largeMessagePath, largeMessageRelativeToPath, pagingPath, pagingRelativeToPath), + pathManager, + incomingInterceptors, + outgoingInterceptors, + socketBindings, + outboundSocketBindings, + groupBindings, + commandDispatcherFactories, + clusterNames, + elytronSecurityDomain, + securityDomainContext, + mbeanServer, + dataSource + ); + + // inject credential-references for bridges + addBridgeCredentialStoreReference(serverService, configuration, BridgeDefinition.CREDENTIAL_REFERENCE, context, model, serviceBuilder); + addClusterCredentialStoreReference(serverService, ServerDefinition.CREDENTIAL_REFERENCE, context, model, serviceBuilder); + + // Install the ActiveMQ Service + ServiceController activeMQServerServiceController = serviceBuilder.setInstance(serverService) + .install(); + //Add the queue services for the core queues created throught the internal broker configuration (those queues are not added as service via the QueueAdd OSH) + for (CoreQueueConfiguration queueConfiguration : configuration.getQueueConfigurations()) { + final ServiceName queueServiceName = activeMQServiceName.append(queueConfiguration.getName()); + final ServiceBuilder sb = context.getServiceTarget().addService(queueServiceName); + sb.requires(ActiveMQActivationService.getServiceName(activeMQServiceName)); + Supplier serverSupplier = sb.requires(activeMQServiceName); + final QueueService queueService = new QueueService(serverSupplier, queueConfiguration, false, false); + sb.setInitialMode(Mode.PASSIVE); + sb.setInstance(queueService); + sb.install(); } - }, OperationContext.Stage.RUNTIME); - } + // Provide our custom Resource impl a ref to the ActiveMQ server so it can create child runtime resources + ((ActiveMQServerResource) resource).setActiveMQServerServiceController(activeMQServerServiceController); - /** - * Transform the detyped operation parameters into the ActiveMQ configuration. - * - * @param context the operation context - * @param serverName the name of the ActiveMQ instance - * @param model the subsystem root resource model - * @return the ActiveMQ configuration - */ - private Configuration transformConfig(final OperationContext context, String serverName, final ModelNode model) throws OperationFailedException { - - Configuration configuration = new ConfigurationImpl(); - - configuration.setName(serverName); - - configuration.setEnabledAsyncConnectionExecution(ASYNC_CONNECTION_EXECUTION_ENABLED.resolveModelAttribute(context, model).asBoolean()); - - configuration.setClusterPassword(CLUSTER_PASSWORD.resolveModelAttribute(context, model).asString()); - configuration.setClusterUser(CLUSTER_USER.resolveModelAttribute(context, model).asString()); - configuration.setConnectionTTLOverride(CONNECTION_TTL_OVERRIDE.resolveModelAttribute(context, model).asInt()); - configuration.setCreateBindingsDir(CREATE_BINDINGS_DIR.resolveModelAttribute(context, model).asBoolean()); - configuration.setCreateJournalDir(CREATE_JOURNAL_DIR.resolveModelAttribute(context, model).asBoolean()); - configuration.setGlobalMaxSize(GLOBAL_MAX_MEMORY_SIZE.resolveModelAttribute(context, model).asLong()); - configuration.setMaxDiskUsage(GLOBAL_MAX_DISK_USAGE.resolveModelAttribute(context, model).asInt()); - configuration.setDiskScanPeriod(DISK_SCAN_PERIOD.resolveModelAttribute(context, model).asInt()); - configuration.setIDCacheSize(ID_CACHE_SIZE.resolveModelAttribute(context, model).asInt()); - // TODO do we want to allow the jmx configuration ? - configuration.setJMXDomain(JMX_DOMAIN.resolveModelAttribute(context, model).asString()); - configuration.setJMXManagementEnabled(JMX_MANAGEMENT_ENABLED.resolveModelAttribute(context, model).asBoolean()); - // Journal - final JournalType journalType = JournalType.valueOf(JOURNAL_TYPE.resolveModelAttribute(context, model).asString()); - configuration.setJournalType(journalType); - - ModelNode value = JOURNAL_BUFFER_SIZE.resolveModelAttribute(context, model); - if(value.isDefined()) { - configuration.setJournalBufferSize_AIO(value.asInt()); - configuration.setJournalBufferSize_NIO(value.asInt()); - } - value = JOURNAL_BUFFER_TIMEOUT.resolveModelAttribute(context, model); - if(value.isDefined()) { - configuration.setJournalBufferTimeout_AIO(value.asInt()); - configuration.setJournalBufferTimeout_NIO(value.asInt()); - } - value = JOURNAL_MAX_IO.resolveModelAttribute(context, model); - if(value.isDefined()) { - configuration.setJournalMaxIO_AIO(value.asInt()); - configuration.setJournalMaxIO_NIO(value.asInt()); - } - configuration.setJournalCompactMinFiles(JOURNAL_COMPACT_MIN_FILES.resolveModelAttribute(context, model).asInt()); - configuration.setJournalCompactPercentage(JOURNAL_COMPACT_PERCENTAGE.resolveModelAttribute(context, model).asInt()); - configuration.setJournalFileSize(JOURNAL_FILE_SIZE.resolveModelAttribute(context, model).asInt()); - configuration.setJournalMinFiles(JOURNAL_MIN_FILES.resolveModelAttribute(context, model).asInt()); - configuration.setJournalPoolFiles(JOURNAL_POOL_FILES.resolveModelAttribute(context, model).asInt()); - configuration.setJournalSyncNonTransactional(JOURNAL_SYNC_NON_TRANSACTIONAL.resolveModelAttribute(context, model).asBoolean()); - configuration.setJournalSyncTransactional(JOURNAL_SYNC_TRANSACTIONAL.resolveModelAttribute(context, model).asBoolean()); - configuration.setLogJournalWriteRate(LOG_JOURNAL_WRITE_RATE.resolveModelAttribute(context, model).asBoolean()); - - configuration.setManagementAddress(SimpleString.toSimpleString(MANAGEMENT_ADDRESS.resolveModelAttribute(context, model).asString())); - configuration.setManagementNotificationAddress(SimpleString.toSimpleString(MANAGEMENT_NOTIFICATION_ADDRESS.resolveModelAttribute(context, model).asString())); - - configuration.setMemoryMeasureInterval(MEMORY_MEASURE_INTERVAL.resolveModelAttribute(context, model).asLong()); - configuration.setMemoryWarningThreshold(MEMORY_WARNING_THRESHOLD.resolveModelAttribute(context, model).asInt()); - - configuration.setMessageCounterEnabled(STATISTICS_ENABLED.resolveModelAttribute(context, model).asBoolean()); - configuration.setMessageCounterSamplePeriod(MESSAGE_COUNTER_SAMPLE_PERIOD.resolveModelAttribute(context, model).asInt()); - configuration.setMessageCounterMaxDayHistory(MESSAGE_COUNTER_MAX_DAY_HISTORY.resolveModelAttribute(context, model).asInt()); - configuration.setMessageExpiryScanPeriod(MESSAGE_EXPIRY_SCAN_PERIOD.resolveModelAttribute(context, model).asLong()); - configuration.setMessageExpiryThreadPriority(MESSAGE_EXPIRY_THREAD_PRIORITY.resolveModelAttribute(context, model).asInt()); - - configuration.setPersistDeliveryCountBeforeDelivery(PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY.resolveModelAttribute(context, model).asBoolean()); - - configuration.setPageMaxConcurrentIO(PAGE_MAX_CONCURRENT_IO.resolveModelAttribute(context, model).asInt()); - - configuration.setPersistenceEnabled(PERSISTENCE_ENABLED.resolveModelAttribute(context, model).asBoolean()); - configuration.setPersistIDCache(PERSIST_ID_CACHE.resolveModelAttribute(context, model).asBoolean()); - - configuration.setScheduledThreadPoolMaxSize(SCHEDULED_THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, model).asInt()); - configuration.setSecurityEnabled(SECURITY_ENABLED.resolveModelAttribute(context, model).asBoolean()); - configuration.setSecurityInvalidationInterval(SECURITY_INVALIDATION_INTERVAL.resolveModelAttribute(context, model).asLong()); - configuration.setServerDumpInterval(SERVER_DUMP_INTERVAL.resolveModelAttribute(context, model).asLong()); - configuration.setThreadPoolMaxSize(THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, model).asInt()); - configuration.setTransactionTimeout(TRANSACTION_TIMEOUT.resolveModelAttribute(context, model).asLong()); - configuration.setTransactionTimeoutScanPeriod(TRANSACTION_TIMEOUT_SCAN_PERIOD.resolveModelAttribute(context, model).asLong()); - configuration.getWildcardConfiguration().setRoutingEnabled(WILD_CARD_ROUTING_ENABLED.resolveModelAttribute(context, model).asBoolean()); - - processStorageConfiguration(context, model, configuration); - addHAPolicyConfiguration(context, configuration, model); - - processAddressSettings(context, configuration, model); - processSecuritySettings(context, configuration, model); - - // Add in items from child resources - GroupingHandlerAdd.addGroupingHandlerConfig(context,configuration, model); - DiscoveryGroupAdd.addDiscoveryGroupConfigs(context, configuration, model); - DivertAdd.addDivertConfigs(context, configuration, model); - QueueAdd.addQueueConfigs(context, configuration, model); - BridgeAdd.addBridgeConfigs(context, configuration, model); - ClusterConnectionAdd.addClusterConnectionConfigs(context, configuration, model); - ConnectorServiceDefinition.addConnectorServiceConfigs(context, configuration, model); - - return configuration; - } + // Install the JMSService + boolean overrideInVMSecurity = OVERRIDE_IN_VM_SECURITY.resolveModelAttribute(context, operation).asBoolean(); + JMSService.addService(serviceTarget, activeMQServiceName, overrideInVMSecurity); - private static void processStorageConfiguration(OperationContext context, ModelNode model, Configuration configuration) throws OperationFailedException { - ModelNode journalDataSource = JOURNAL_DATASOURCE.resolveModelAttribute(context, model); - if (!journalDataSource.isDefined()) { - return; - } - DatabaseStorageConfiguration storageConfiguration = new DatabaseStorageConfiguration(); - storageConfiguration.setBindingsTableName(JOURNAL_BINDINGS_TABLE.resolveModelAttribute(context, model).asString()); - storageConfiguration.setMessageTableName(JOURNAL_MESSAGES_TABLE.resolveModelAttribute(context, model).asString()); - storageConfiguration.setLargeMessageTableName(JOURNAL_LARGE_MESSAGES_TABLE.resolveModelAttribute(context, model).asString()); - storageConfiguration.setPageStoreTableName(JOURNAL_PAGE_STORE_TABLE.resolveModelAttribute(context, model).asString()); - long networkTimeout = SECONDS.toMillis(JOURNAL_JDBC_NETWORK_TIMEOUT.resolveModelAttribute(context, model).asInt()); - // ARTEMIS-1493: Artemis API is not correct. the value must be in millis but it requires an int instead of a long. - storageConfiguration.setJdbcNetworkTimeout((int)networkTimeout); - // WFLY-9513 - check for System properties for HA JDBC store attributes - // - // if the attribute is defined, we use its value - // otherwise, we check first for a system property - // finally we use the attribute's default value - // - // this behaviour applies to JOURNAL_NODE_MANAGER_STORE_TABLE, JOURNAL_JDBC_LOCK_EXPIRATION - // and JOURNAL_JDBC_LOCK_RENEW_PERIOD attributes. - final String nodeManagerStoreTableName; - if (model.hasDefined(JOURNAL_NODE_MANAGER_STORE_TABLE.getName())) { - nodeManagerStoreTableName = JOURNAL_NODE_MANAGER_STORE_TABLE.resolveModelAttribute(context, model).asString(); - } else if ( org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME)) { - nodeManagerStoreTableName = org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME); - } else { - nodeManagerStoreTableName = JOURNAL_NODE_MANAGER_STORE_TABLE.getDefaultValue().asString(); + context.completeStep(OperationContext.RollbackHandler.NOOP_ROLLBACK_HANDLER); } - // the system property is removed, otherwise Artemis will use it to override the value from the configuration - org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME); - storageConfiguration.setNodeManagerStoreTableName(nodeManagerStoreTableName); - final long lockExpirationInMillis; - if (model.hasDefined(JOURNAL_JDBC_LOCK_EXPIRATION.getName())) { - lockExpirationInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_EXPIRATION.resolveModelAttribute(context, model).asInt()); - } else if ( org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS)) { - lockExpirationInMillis = Long.parseLong( org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS)); - } else { - lockExpirationInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_EXPIRATION.getDefaultValue().asInt()); - } - // the system property is removed, otherwise Artemis will use it to override the value from the configuration - org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS); - storageConfiguration.setJdbcLockExpirationMillis(lockExpirationInMillis); - final long lockRenewPeriodInMillis; - if (model.hasDefined(JOURNAL_JDBC_LOCK_RENEW_PERIOD.getName())) { - lockRenewPeriodInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_RENEW_PERIOD.resolveModelAttribute(context, model).asInt()); - } else if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS)) { - lockRenewPeriodInMillis = Long.parseLong( org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS)); - } else { - lockRenewPeriodInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_RENEW_PERIOD.getDefaultValue().asInt()); - } - // the system property is removed, otherwise Artemis will use it to override the value from the configuration - org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS); - storageConfiguration.setJdbcLockRenewPeriodMillis(lockRenewPeriodInMillis); - // this property is used for testing only and has no corresponding model attribute. - // However the default value in Artemis is not correct (should be -1, not 60s) - final long jdbcLockAcquisitionTimeoutMillis; - if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS)) { - jdbcLockAcquisitionTimeoutMillis = Long.parseLong(org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS)); - } else { - jdbcLockAcquisitionTimeoutMillis = -1; - } - // the system property is removed, otherwise Artemis will use it to override the value from the configuration - org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS); - storageConfiguration.setJdbcLockAcquisitionTimeoutMillis(jdbcLockAcquisitionTimeoutMillis); - configuration.setStoreConfiguration(storageConfiguration); - } - /** - * Process the address settings. - * - * @param configuration the ActiveMQ configuration - * @param params the detyped operation parameters - */ - /** - * Process the address settings. - * - * @param configuration the ActiveMQ configuration - * @param params the detyped operation parameters - * @throws org.jboss.as.controller.OperationFailedException - */ - static void processAddressSettings(final OperationContext context, final Configuration configuration, final ModelNode params) throws OperationFailedException { - if (params.hasDefined(ADDRESS_SETTING)) { - for (final Property property : params.get(ADDRESS_SETTING).asPropertyList()) { - final String match = property.getName(); - final ModelNode config = property.getValue(); - final AddressSettings settings = AddressSettingAdd.createSettings(context, config); - configuration.getAddressesSettings().put(match, settings); + private void processStorageConfiguration(OperationContext context, ModelNode model, Configuration configuration) throws OperationFailedException { + ModelNode journalDataSource = JOURNAL_DATASOURCE.resolveModelAttribute(context, model); + if (!journalDataSource.isDefined()) { + return; } + DatabaseStorageConfiguration storageConfiguration = new DatabaseStorageConfiguration(); + storageConfiguration.setBindingsTableName(JOURNAL_BINDINGS_TABLE.resolveModelAttribute(context, model).asString()); + storageConfiguration.setMessageTableName(JOURNAL_MESSAGES_TABLE.resolveModelAttribute(context, model).asString()); + storageConfiguration.setLargeMessageTableName(JOURNAL_LARGE_MESSAGES_TABLE.resolveModelAttribute(context, model).asString()); + storageConfiguration.setPageStoreTableName(JOURNAL_PAGE_STORE_TABLE.resolveModelAttribute(context, model).asString()); + long networkTimeout = SECONDS.toMillis(JOURNAL_JDBC_NETWORK_TIMEOUT.resolveModelAttribute(context, model).asInt()); + // ARTEMIS-1493: Artemis API is not correct. the value must be in millis but it requires an int instead of a long. + storageConfiguration.setJdbcNetworkTimeout((int) networkTimeout); + // WFLY-9513 - check for System properties for HA JDBC store attributes + // + // if the attribute is defined, we use its value + // otherwise, we check first for a system property + // finally we use the attribute's default value + // + // this behaviour applies to JOURNAL_NODE_MANAGER_STORE_TABLE, JOURNAL_JDBC_LOCK_EXPIRATION + // and JOURNAL_JDBC_LOCK_RENEW_PERIOD attributes. + final String nodeManagerStoreTableName; + if (model.hasDefined(JOURNAL_NODE_MANAGER_STORE_TABLE.getName())) { + nodeManagerStoreTableName = JOURNAL_NODE_MANAGER_STORE_TABLE.resolveModelAttribute(context, model).asString(); + } else if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME)) { + nodeManagerStoreTableName = org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME); + } else { + nodeManagerStoreTableName = JOURNAL_NODE_MANAGER_STORE_TABLE.getDefaultValue().asString(); + } + // the system property is removed, otherwise Artemis will use it to override the value from the configuration + org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_NODEMANAGER_STORE_TABLE_NAME); + storageConfiguration.setNodeManagerStoreTableName(nodeManagerStoreTableName); + final long lockExpirationInMillis; + if (model.hasDefined(JOURNAL_JDBC_LOCK_EXPIRATION.getName())) { + lockExpirationInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_EXPIRATION.resolveModelAttribute(context, model).asInt()); + } else if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS)) { + lockExpirationInMillis = Long.parseLong(org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS)); + } else { + lockExpirationInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_EXPIRATION.getDefaultValue().asInt()); + } + // the system property is removed, otherwise Artemis will use it to override the value from the configuration + org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_EXPIRATION_MILLIS); + storageConfiguration.setJdbcLockExpirationMillis(lockExpirationInMillis); + final long lockRenewPeriodInMillis; + if (model.hasDefined(JOURNAL_JDBC_LOCK_RENEW_PERIOD.getName())) { + lockRenewPeriodInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_RENEW_PERIOD.resolveModelAttribute(context, model).asInt()); + } else if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS)) { + lockRenewPeriodInMillis = Long.parseLong(org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS)); + } else { + lockRenewPeriodInMillis = SECONDS.toMillis(JOURNAL_JDBC_LOCK_RENEW_PERIOD.getDefaultValue().asInt()); + } + // the system property is removed, otherwise Artemis will use it to override the value from the configuration + org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JBDC_LOCK_RENEW_PERIOD_MILLIS); + storageConfiguration.setJdbcLockRenewPeriodMillis(lockRenewPeriodInMillis); + // this property is used for testing only and has no corresponding model attribute. + // However the default value in Artemis is not correct (should be -1, not 60s) + final long jdbcLockAcquisitionTimeoutMillis; + if (org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().containsKey(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS)) { + jdbcLockAcquisitionTimeoutMillis = Long.parseLong(org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().getProperty(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS)); + } else { + jdbcLockAcquisitionTimeoutMillis = -1; + } + // the system property is removed, otherwise Artemis will use it to override the value from the configuration + org.wildfly.security.manager.WildFlySecurityManager.getSystemPropertiesPrivileged().remove(ARTEMIS_BROKER_CONFIG_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS); + storageConfiguration.setJdbcLockAcquisitionTimeoutMillis(jdbcLockAcquisitionTimeoutMillis); + configuration.setStoreConfiguration(storageConfiguration); } - } - private List unwrapClasses(List classesModel) throws OperationFailedException { - List classes = new ArrayList<>(); + /** + * Transform the detyped operation parameters into the ActiveMQ configuration. + * + * @param context the operation context + * @param serverName the name of the ActiveMQ instance + * @param model the subsystem root resource model + * @return the ActiveMQ configuration + */ + private Configuration transformConfig(final OperationContext context, String serverName, final ModelNode model) throws OperationFailedException { + + Configuration configuration = new ConfigurationImpl(); + + configuration.setName(serverName); + + configuration.setEnabledAsyncConnectionExecution(ASYNC_CONNECTION_EXECUTION_ENABLED.resolveModelAttribute(context, model).asBoolean()); + + configuration.setClusterPassword(CLUSTER_PASSWORD.resolveModelAttribute(context, model).asString()); + configuration.setClusterUser(CLUSTER_USER.resolveModelAttribute(context, model).asString()); + configuration.setConnectionTTLOverride(CONNECTION_TTL_OVERRIDE.resolveModelAttribute(context, model).asInt()); + configuration.setCreateBindingsDir(CREATE_BINDINGS_DIR.resolveModelAttribute(context, model).asBoolean()); + configuration.setCreateJournalDir(CREATE_JOURNAL_DIR.resolveModelAttribute(context, model).asBoolean()); + configuration.setGlobalMaxSize(GLOBAL_MAX_MEMORY_SIZE.resolveModelAttribute(context, model).asLong()); + configuration.setMaxDiskUsage(GLOBAL_MAX_DISK_USAGE.resolveModelAttribute(context, model).asInt()); + configuration.setDiskScanPeriod(DISK_SCAN_PERIOD.resolveModelAttribute(context, model).asInt()); + configuration.setIDCacheSize(ID_CACHE_SIZE.resolveModelAttribute(context, model).asInt()); + // TODO do we want to allow the jmx configuration ? + configuration.setJMXDomain(JMX_DOMAIN.resolveModelAttribute(context, model).asString()); + configuration.setJMXManagementEnabled(JMX_MANAGEMENT_ENABLED.resolveModelAttribute(context, model).asBoolean()); + // Journal + final JournalType journalType = JournalType.valueOf(JOURNAL_TYPE.resolveModelAttribute(context, model).asString()); + configuration.setJournalType(journalType); + + ModelNode value = JOURNAL_BUFFER_SIZE.resolveModelAttribute(context, model); + if (value.isDefined()) { + configuration.setJournalBufferSize_AIO(value.asInt()); + configuration.setJournalBufferSize_NIO(value.asInt()); + } + value = JOURNAL_BUFFER_TIMEOUT.resolveModelAttribute(context, model); + if (value.isDefined()) { + configuration.setJournalBufferTimeout_AIO(value.asInt()); + configuration.setJournalBufferTimeout_NIO(value.asInt()); + } + value = JOURNAL_MAX_IO.resolveModelAttribute(context, model); + if (value.isDefined()) { + configuration.setJournalMaxIO_AIO(value.asInt()); + configuration.setJournalMaxIO_NIO(value.asInt()); + } + configuration.setJournalCompactMinFiles(JOURNAL_COMPACT_MIN_FILES.resolveModelAttribute(context, model).asInt()); + configuration.setJournalCompactPercentage(JOURNAL_COMPACT_PERCENTAGE.resolveModelAttribute(context, model).asInt()); + configuration.setJournalFileSize(JOURNAL_FILE_SIZE.resolveModelAttribute(context, model).asInt()); + configuration.setJournalMinFiles(JOURNAL_MIN_FILES.resolveModelAttribute(context, model).asInt()); + configuration.setJournalPoolFiles(JOURNAL_POOL_FILES.resolveModelAttribute(context, model).asInt()); + configuration.setJournalSyncNonTransactional(JOURNAL_SYNC_NON_TRANSACTIONAL.resolveModelAttribute(context, model).asBoolean()); + configuration.setJournalSyncTransactional(JOURNAL_SYNC_TRANSACTIONAL.resolveModelAttribute(context, model).asBoolean()); + configuration.setLogJournalWriteRate(LOG_JOURNAL_WRITE_RATE.resolveModelAttribute(context, model).asBoolean()); + + configuration.setManagementAddress(SimpleString.toSimpleString(MANAGEMENT_ADDRESS.resolveModelAttribute(context, model).asString())); + configuration.setManagementNotificationAddress(SimpleString.toSimpleString(MANAGEMENT_NOTIFICATION_ADDRESS.resolveModelAttribute(context, model).asString())); + + configuration.setMemoryMeasureInterval(MEMORY_MEASURE_INTERVAL.resolveModelAttribute(context, model).asLong()); + configuration.setMemoryWarningThreshold(MEMORY_WARNING_THRESHOLD.resolveModelAttribute(context, model).asInt()); + + configuration.setMessageCounterEnabled(STATISTICS_ENABLED.resolveModelAttribute(context, model).asBoolean()); + configuration.setMessageCounterSamplePeriod(MESSAGE_COUNTER_SAMPLE_PERIOD.resolveModelAttribute(context, model).asInt()); + configuration.setMessageCounterMaxDayHistory(MESSAGE_COUNTER_MAX_DAY_HISTORY.resolveModelAttribute(context, model).asInt()); + configuration.setMessageExpiryScanPeriod(MESSAGE_EXPIRY_SCAN_PERIOD.resolveModelAttribute(context, model).asLong()); + configuration.setMessageExpiryThreadPriority(MESSAGE_EXPIRY_THREAD_PRIORITY.resolveModelAttribute(context, model).asInt()); + + configuration.setPersistDeliveryCountBeforeDelivery(PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY.resolveModelAttribute(context, model).asBoolean()); + + configuration.setPageMaxConcurrentIO(PAGE_MAX_CONCURRENT_IO.resolveModelAttribute(context, model).asInt()); + + configuration.setPersistenceEnabled(PERSISTENCE_ENABLED.resolveModelAttribute(context, model).asBoolean()); + configuration.setPersistIDCache(PERSIST_ID_CACHE.resolveModelAttribute(context, model).asBoolean()); + + configuration.setScheduledThreadPoolMaxSize(SCHEDULED_THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, model).asInt()); + configuration.setSecurityEnabled(SECURITY_ENABLED.resolveModelAttribute(context, model).asBoolean()); + configuration.setSecurityInvalidationInterval(SECURITY_INVALIDATION_INTERVAL.resolveModelAttribute(context, model).asLong()); + configuration.setServerDumpInterval(SERVER_DUMP_INTERVAL.resolveModelAttribute(context, model).asLong()); + configuration.setThreadPoolMaxSize(THREAD_POOL_MAX_SIZE.resolveModelAttribute(context, model).asInt()); + configuration.setTransactionTimeout(TRANSACTION_TIMEOUT.resolveModelAttribute(context, model).asLong()); + configuration.setTransactionTimeoutScanPeriod(TRANSACTION_TIMEOUT_SCAN_PERIOD.resolveModelAttribute(context, model).asLong()); + configuration.getWildcardConfiguration().setRoutingEnabled(WILD_CARD_ROUTING_ENABLED.resolveModelAttribute(context, model).asBoolean()); + + processStorageConfiguration(context, model, configuration); + HAPolicyConfigurationBuilder.addHAPolicyConfiguration(context, configuration, model); + + processAddressSettings(context, configuration, model); + processSecuritySettings(context, configuration, model); + + // Add in items from child resources + GroupingHandlerAdd.addGroupingHandlerConfig(context, configuration, model); + configuration.setDiscoveryGroupConfigurations(DiscoveryGroupAdd.addDiscoveryGroupConfigs(context, model)); + DivertAdd.addDivertConfigs(context, configuration, model); + QueueAdd.addQueueConfigs(context, configuration, model); + BridgeAdd.addBridgeConfigs(context, configuration, model); + ClusterConnectionAdd.addClusterConnectionConfigs(context, configuration, model); + ConnectorServiceDefinition.addConnectorServiceConfigs(context, configuration, model); + + return configuration; + } - for (ModelNode classModel : classesModel) { - Class clazz = unwrapClass(classModel); - classes.add(clazz); + /** + * Process the address settings. + * + * @param configuration the ActiveMQ configuration + * @param params the detyped operation parameters + */ + /** + * Process the address settings. + * + * @param configuration the ActiveMQ configuration + * @param params the detyped operation parameters + * @throws org.jboss.as.controller.OperationFailedException + */ + private void processAddressSettings(final OperationContext context, final Configuration configuration, final ModelNode params) throws OperationFailedException { + if (params.hasDefined(ADDRESS_SETTING)) { + for (final Property property : params.get(ADDRESS_SETTING).asPropertyList()) { + final String match = property.getName(); + final ModelNode config = property.getValue(); + final AddressSettings settings = AddressSettingAdd.createSettings(context, config); + configuration.getAddressesSettings().put(match, settings); + } + } } - return classes; - } + private List unwrapClasses(List classesModel) throws OperationFailedException { + List classes = new ArrayList<>(); - private static Class unwrapClass(ModelNode classModel) throws OperationFailedException { - String className = classModel.get(NAME).asString(); - String moduleName = classModel.get(MODULE).asString(); - try { - ModuleIdentifier moduleID = ModuleIdentifier.fromString(moduleName); - Module module = Module.getCallerModuleLoader().loadModule(moduleID); - Class clazz = module.getClassLoader().loadClass(className); - return clazz; - } catch (Exception e) { - throw MessagingLogger.ROOT_LOGGER.unableToLoadClassFromModule(className, moduleName); - } - } + for (ModelNode classModel : classesModel) { + Class clazz = unwrapClass(classModel); + classes.add(clazz); + } - private List processInterceptors(ModelNode model) throws OperationFailedException { - if (!model.isDefined()) { - return Collections.emptyList(); + return classes; } - List interceptors = new ArrayList<>(); - List interceptorModels = model.asList(); - for (Class clazz : unwrapClasses(interceptorModels)) { + + private Class unwrapClass(ModelNode classModel) throws OperationFailedException { + String className = classModel.get(NAME).asString(); + String moduleName = classModel.get(MODULE).asString(); try { - Interceptor interceptor = Interceptor.class.cast(clazz.newInstance()); - interceptors.add(interceptor); + ModuleIdentifier moduleID = ModuleIdentifier.fromString(moduleName); + Module module = Module.getCallerModuleLoader().loadModule(moduleID); + Class clazz = module.getClassLoader().loadClass(className); + return clazz; } catch (Exception e) { - throw new OperationFailedException(e); + throw MessagingLogger.ROOT_LOGGER.unableToLoadClassFromModule(className, moduleName); } } - return interceptors; - } - /** - * Process the security settings. - * - * @param configuration the ActiveMQ configuration - * @param params the detyped operation parameters - */ - static void processSecuritySettings(final OperationContext context, final Configuration configuration, final ModelNode params) throws OperationFailedException { - if (params.get(SECURITY_SETTING).isDefined()) { - for (final Property property : params.get(SECURITY_SETTING).asPropertyList()) { - final String match = property.getName(); - final ModelNode config = property.getValue(); - - if(config.hasDefined(CommonAttributes.ROLE)) { - final Set roles = new HashSet(); - for (final Property role : config.get(CommonAttributes.ROLE).asPropertyList()) { - roles.add(SecurityRoleDefinition.transform(context, role.getName(), role.getValue())); + private List processInterceptors(ModelNode model) throws OperationFailedException { + if (!model.isDefined()) { + return Collections.emptyList(); + } + List interceptors = new ArrayList<>(); + List interceptorModels = model.asList(); + for (Class clazz : unwrapClasses(interceptorModels)) { + try { + Interceptor interceptor = Interceptor.class.cast(clazz.newInstance()); + interceptors.add(interceptor); + } catch (Exception e) { + throw new OperationFailedException(e); + } + } + return interceptors; + } + + /** + * Process the security settings. + * + * @param configuration the ActiveMQ configuration + * @param params the detyped operation parameters + */ + private void processSecuritySettings(final OperationContext context, final Configuration configuration, final ModelNode params) throws OperationFailedException { + if (params.get(SECURITY_SETTING).isDefined()) { + for (final Property property : params.get(SECURITY_SETTING).asPropertyList()) { + final String match = property.getName(); + final ModelNode config = property.getValue(); + + if (config.hasDefined(CommonAttributes.ROLE)) { + final Set roles = new HashSet(); + for (final Property role : config.get(CommonAttributes.ROLE).asPropertyList()) { + roles.add(SecurityRoleDefinition.transform(context, role.getName(), role.getValue())); + } + configuration.getSecurityRoles().put(match, roles); } - configuration.getSecurityRoles().put(match, roles); } } } - } - private static void addBridgeCredentialStoreReference(ActiveMQServerService amqService, Configuration configuration, ObjectTypeAttributeDefinition credentialReferenceAttributeDefinition, OperationContext context, ModelNode model, ServiceBuilder serviceBuilder) throws OperationFailedException { - for (BridgeConfiguration bridgeConfiguration: configuration.getBridgeConfigurations()) { - String name = bridgeConfiguration.getName(); - InjectedValue> injector = amqService.getBridgeCredentialSourceSupplierInjector(name); + private void addBridgeCredentialStoreReference(ActiveMQServerService amqService, Configuration configuration, ObjectTypeAttributeDefinition credentialReferenceAttributeDefinition, OperationContext context, ModelNode model, ServiceBuilder serviceBuilder) throws OperationFailedException { + for (BridgeConfiguration bridgeConfiguration : configuration.getBridgeConfigurations()) { + String name = bridgeConfiguration.getName(); + InjectedValue> injector = amqService.getBridgeCredentialSourceSupplierInjector(name); - String[] modelFilter = { CommonAttributes.BRIDGE, name }; + String[] modelFilter = {CommonAttributes.BRIDGE, name}; - ModelNode filteredModelNode = model; - if (modelFilter != null && modelFilter.length > 0) { - for (String path : modelFilter) { - if (filteredModelNode.get(path).isDefined()) - filteredModelNode = filteredModelNode.get(path); - else - break; + ModelNode filteredModelNode = model; + if (modelFilter != null && modelFilter.length > 0) { + for (String path : modelFilter) { + if (filteredModelNode.get(path).isDefined()) { + filteredModelNode = filteredModelNode.get(path); + } else { + break; + } + } + } + ModelNode value = credentialReferenceAttributeDefinition.resolveModelAttribute(context, filteredModelNode); + if (value.isDefined()) { + injector.inject(CredentialReference.getCredentialSourceSupplier(context, credentialReferenceAttributeDefinition, filteredModelNode, serviceBuilder)); } - } - ModelNode value = credentialReferenceAttributeDefinition.resolveModelAttribute(context, filteredModelNode); - if (value.isDefined()) { - injector.inject(CredentialReference.getCredentialSourceSupplier(context, credentialReferenceAttributeDefinition, filteredModelNode, serviceBuilder)); } } - } - private static void addClusterCredentialStoreReference(ActiveMQServerService amqService, ObjectTypeAttributeDefinition credentialReferenceAttributeDefinition, OperationContext context, ModelNode model, ServiceBuilder serviceBuilder) throws OperationFailedException { - ModelNode value = credentialReferenceAttributeDefinition.resolveModelAttribute(context, model); - if (value.isDefined()) { - amqService.getClusterCredentialSourceSupplierInjector() - .inject(CredentialReference.getCredentialSourceSupplier(context, credentialReferenceAttributeDefinition, model, serviceBuilder)); + private void addClusterCredentialStoreReference(ActiveMQServerService amqService, ObjectTypeAttributeDefinition credentialReferenceAttributeDefinition, OperationContext context, ModelNode model, ServiceBuilder serviceBuilder) throws OperationFailedException { + ModelNode value = credentialReferenceAttributeDefinition.resolveModelAttribute(context, model); + if (value.isDefined()) { + amqService.getClusterCredentialSourceSupplierInjector() + .inject(CredentialReference.getCredentialSourceSupplier(context, credentialReferenceAttributeDefinition, model, serviceBuilder)); + } } } } diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/TransportConfigOperationHandlers.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/TransportConfigOperationHandlers.java index 07f96d0ed9c0..e139b0076319 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/TransportConfigOperationHandlers.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/TransportConfigOperationHandlers.java @@ -350,7 +350,7 @@ public static Map getParameters(final OperationContext context, * @param bindings the referenced socket bindings * @throws OperationFailedException */ - static void processConnectors(final OperationContext context, final Configuration configuration, final ModelNode params, final Set bindings) throws OperationFailedException { + static Map processConnectors(final OperationContext context, final String configServerName, final ModelNode params, final Set bindings) throws OperationFailedException { final Map connectors = new HashMap(); if (params.hasDefined(CONNECTOR)) { for (final Property property : params.get(CONNECTOR).asPropertyList()) { @@ -407,13 +407,13 @@ static void processConnectors(final OperationContext context, final Configuratio parameters.put(HTTPConnectorDefinition.SOCKET_BINDING.getName(), binding); ModelNode serverNameModelNode = HTTPConnectorDefinition.SERVER_NAME.resolveModelAttribute(context, config); // use the name of this server if the server-name attribute is undefined - String serverName = serverNameModelNode.isDefined() ? serverNameModelNode.asString() : configuration.getName(); + String serverName = serverNameModelNode.isDefined() ? serverNameModelNode.asString() : configServerName; parameters.put(ACTIVEMQ_SERVER_NAME, serverName); connectors.put(connectorName, new TransportConfiguration(NettyConnectorFactory.class.getName(), parameters, connectorName, extraParameters)); } } - configuration.setConnectorConfigurations(connectors); + return connectors; } public static TransportConfiguration[] processConnectors(final OperationContext context, final Collection names, Set bindings) throws OperationFailedException { diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSConnectionFactoryDefinitionInjectionSource.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSConnectionFactoryDefinitionInjectionSource.java index 33761722b627..cff2d52adc0e 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSConnectionFactoryDefinitionInjectionSource.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSConnectionFactoryDefinitionInjectionSource.java @@ -22,6 +22,8 @@ package org.wildfly.extension.messaging.activemq.deployment; +import static org.jboss.as.server.deployment.Attachments.CAPABILITY_SERVICE_SUPPORT; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.CONNECTOR; import static org.wildfly.extension.messaging.activemq.CommonAttributes.CONNECTORS; import static org.wildfly.extension.messaging.activemq.CommonAttributes.DEFAULT; import static org.wildfly.extension.messaging.activemq.CommonAttributes.JGROUPS_CLUSTER; @@ -36,19 +38,25 @@ import static org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttributes.Pooled.MIN_POOL_SIZE; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import javax.resource.spi.TransactionSupport; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.jboss.as.connector.deployers.ra.ConnectionFactoryDefinitionInjectionSource; import org.jboss.as.connector.services.resourceadapters.ConnectionFactoryReferenceFactoryService; import org.jboss.as.controller.OperationFailedException; import org.jboss.as.controller.PathAddress; import org.jboss.as.controller.PathElement; +import org.jboss.as.controller.descriptions.ModelDescriptionConstants; +import org.jboss.as.ee.component.EEModuleDescription; import org.jboss.as.ee.component.InjectionSource; import org.jboss.as.ee.resource.definition.ResourceDefinitionInjectionSource; import org.jboss.as.naming.ManagedReferenceFactory; @@ -62,14 +70,18 @@ import org.jboss.dmr.Property; import org.jboss.msc.inject.Injector; import org.jboss.msc.service.ServiceBuilder; +import org.jboss.msc.service.ServiceController; import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.ServiceRegistry; import org.jboss.msc.service.ServiceTarget; import org.wildfly.extension.messaging.activemq.CommonAttributes; +import org.wildfly.extension.messaging.activemq.ExternalBrokerConfigurationService; import org.wildfly.extension.messaging.activemq.MessagingExtension; import org.wildfly.extension.messaging.activemq.MessagingServices; +import org.wildfly.extension.messaging.activemq.MessagingSubsystemRootResourceDefinition; import org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttribute; import org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttributes; +import org.wildfly.extension.messaging.activemq.jms.ExternalPooledConnectionFactoryService; import org.wildfly.extension.messaging.activemq.jms.JMSServices; import org.wildfly.extension.messaging.activemq.jms.PooledConnectionFactoryConfigProperties; import org.wildfly.extension.messaging.activemq.jms.PooledConnectionFactoryConfigurationRuntimeHandler; @@ -153,10 +165,13 @@ void setMinPoolSize(int minPoolSize) { @Override public void getResourceValue(ResolutionContext context, ServiceBuilder serviceBuilder, DeploymentPhaseContext phaseContext, Injector injector) throws DeploymentUnitProcessingException { final DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit(); - - if (targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) { + if(resourceAdapter == null || resourceAdapter.isEmpty()) { + resourceAdapter = getDefaulResourceAdapter(deploymentUnit); + } + boolean external = targetsExternalPooledConnectionFactory(resourceAdapter, phaseContext.getServiceRegistry()); + if (external || targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) { try { - startedPooledConnectionFactory(context, jndiName, serviceBuilder, phaseContext.getServiceTarget(), deploymentUnit, injector); + startedPooledConnectionFactory(context, jndiName, serviceBuilder, phaseContext.getServiceTarget(), deploymentUnit, injector, external); } catch (OperationFailedException e) { throw new DeploymentUnitProcessingException(e); } @@ -183,7 +198,9 @@ public void getResourceValue(ResolutionContext context, ServiceBuilder servic } } - private void startedPooledConnectionFactory(ResolutionContext context, String name, ServiceBuilder serviceBuilder, ServiceTarget serviceTarget, DeploymentUnit deploymentUnit, Injector injector) throws DeploymentUnitProcessingException, OperationFailedException { + private void startedPooledConnectionFactory(ResolutionContext context, String name, ServiceBuilder serviceBuilder, + ServiceTarget serviceTarget, DeploymentUnit deploymentUnit, Injector injector, + boolean external) throws DeploymentUnitProcessingException, OperationFailedException { Map props = new HashMap<>(properties); List connectors = getConnectors(props); clearUnknownProperties(properties); @@ -225,13 +242,41 @@ private void startedPooledConnectionFactory(ResolutionContext context, String na List adapterParams = getAdapterParams(model); String txSupport = transactional ? XA_TX : NO_TX; - final String serverName = getActiveMQServerName(properties); + final String serverName; final String pcfName = uniqueName(context, name); final ContextNames.BindInfo bindInfo = ContextNames.bindInfoForEnvEntry(context.getApplicationName(), context.getModuleName(), context.getComponentName(), !context.isCompUsesModule(), name); - PooledConnectionFactoryService.installService(serviceTarget, pcfName, serverName, connectors, - discoveryGroupName, jgroupsChannelName, adapterParams, - bindInfo, - txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, true); + if(external) { + serverName = null; + Set connectorsSocketBindings = new HashSet<>(); + ExternalBrokerConfigurationService configuration = (ExternalBrokerConfigurationService)deploymentUnit.getServiceRegistry().getRequiredService(MessagingSubsystemRootResourceDefinition.CONFIGURATION_CAPABILITY.getCapabilityServiceName()).getService().getValue(); + TransportConfiguration[] tcs = new TransportConfiguration[connectors.size()]; + for (int i = 0; i < tcs.length; i++) { + tcs[i] = configuration.getConnectors().get(connectors.get(i)); + if (tcs[i].getParams().containsKey(ModelDescriptionConstants.SOCKET_BINDING)) { + connectorsSocketBindings.add(tcs[i].getParams().get(ModelDescriptionConstants.SOCKET_BINDING).toString()); + } + } + DiscoveryGroupConfiguration discoveryGroupConfiguration = null; + if(discoveryGroupName != null) { + discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations().get(discoveryGroupName); + } + if (connectors.isEmpty() && discoveryGroupConfiguration == null) { + tcs = getExternalPooledConnectionFactory(resourceAdapter, deploymentUnit.getServiceRegistry()).getConnectors(); + for(int i = 0 ; i < tcs.length; i++) { + if(tcs[i].getParams().containsKey(ModelDescriptionConstants.SOCKET_BINDING)) { + connectorsSocketBindings.add(tcs[i].getParams().get(ModelDescriptionConstants.SOCKET_BINDING).toString()); + } + } + } + ExternalPooledConnectionFactoryService.installService(serviceTarget, configuration, pcfName, tcs, discoveryGroupConfiguration, + connectorsSocketBindings, null, jgroupsChannelName, adapterParams, bindInfo, Collections.emptyList(), + txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, deploymentUnit.getAttachment(CAPABILITY_SERVICE_SUPPORT)); + } else { + serverName = getActiveMQServerName(properties); + PooledConnectionFactoryService.installService(serviceTarget, pcfName, serverName, connectors, + discoveryGroupName, jgroupsChannelName, adapterParams, bindInfo, txSupport, minPoolSize, + maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, true); + } final ServiceName referenceFactoryServiceName = ConnectionFactoryReferenceFactoryService.SERVICE_NAME_BASE .append(bindInfo.getBinderServiceName()); @@ -239,11 +284,17 @@ private void startedPooledConnectionFactory(ResolutionContext context, String na //create the management registration String managementName = managementName(context, name); - final PathElement serverElement = PathElement.pathElement(SERVER, serverName); final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT); - deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); final PathElement pcfPath = PathElement.pathElement(POOLED_CONNECTION_FACTORY, managementName); - PathAddress registration = PathAddress.pathAddress(serverElement, pcfPath); + PathAddress registration; + if (external) { + deploymentResourceSupport.getDeploymentSubsystemModel(MessagingExtension.SUBSYSTEM_NAME); + registration = PathAddress.pathAddress(pcfPath); + } else { + final PathElement serverElement = PathElement.pathElement(SERVER, serverName); + deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); + registration = PathAddress.pathAddress(serverElement, pcfPath); + } MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit); PooledConnectionFactoryConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, managementName, model); } @@ -259,6 +310,12 @@ private List getConnectors(Map props) { } } } + if (props.containsKey(CONNECTOR)) { + String connector = properties.remove(CONNECTOR).trim(); + if (!connector.isEmpty()) { + connectors.add(connector); + } + } return connectors; } @@ -280,16 +337,16 @@ void clearUnknownProperties(final Map props) { private static String uniqueName(InjectionSource.ResolutionContext context, final String jndiName) { StringBuilder uniqueName = new StringBuilder(); - return uniqueName.append(context.getApplicationName() + "_") + return uniqueName.append(context.getApplicationName()).append("_") .append(managementName(context, jndiName)) .toString(); } private static String managementName(InjectionSource.ResolutionContext context, final String jndiName) { StringBuilder uniqueName = new StringBuilder(); - uniqueName.append(context.getModuleName() + "_"); + uniqueName.append(context.getModuleName()).append("_"); if (context.getComponentName() != null) { - uniqueName.append(context.getComponentName() + "_"); + uniqueName.append(context.getComponentName()).append("_"); } return uniqueName .append(jndiName.replace(':', '_')) @@ -330,12 +387,50 @@ static boolean targetsPooledConnectionFactory(String server, String resourceAdap return serviceRegistry.getServiceNames().contains(pcfName); } + /** + * Return whether the definition targets an existing external pooled connection factory. + * + * Checks the service registry for a PooledConnectionFactoryService with the ServiceName + * created by the {@code server} property (or {@code "default") and the {@code resourceAdapter} property. + */ + static boolean targetsExternalPooledConnectionFactory(String resourceAdapter, ServiceRegistry serviceRegistry) { + // if the resourceAdapter is not defined, the default behaviour is to create a pooled-connection-factory. + if (resourceAdapter == null || resourceAdapter.isEmpty()) { + return false; + } + //let's look into the external-pooled-connection-factory + ServiceName pcfName = JMSServices.getPooledConnectionFactoryBaseServiceName(MessagingServices.getActiveMQServiceName("")).append(resourceAdapter); + return serviceRegistry.getServiceNames().contains(pcfName); + } + + private static ExternalPooledConnectionFactoryService getExternalPooledConnectionFactory(String resourceAdapter, ServiceRegistry serviceRegistry) { + //let's look into the external-pooled-connection-factory + ServiceName pcfName = JMSServices.getPooledConnectionFactoryBaseServiceName(MessagingServices.getActiveMQServiceName("")).append(resourceAdapter); + return (ExternalPooledConnectionFactoryService) serviceRegistry.getService(pcfName).getValue(); + } + + static String getDefaulResourceAdapter(DeploymentUnit deploymentUnit) { + EEModuleDescription eeDescription = deploymentUnit.getAttachment(org.jboss.as.ee.component.Attachments.EE_MODULE_DESCRIPTION); + if (eeDescription != null) { + String defaultJndiName = eeDescription.getDefaultResourceJndiNames().getJmsConnectionFactory(); + ContextNames.BindInfo bindInfo = ContextNames.bindInfoFor(defaultJndiName); + ServiceController binder = deploymentUnit.getServiceRegistry().getService(bindInfo.getBinderServiceName()); + if (binder != null) { + Object pcf = binder.getService().getValue(); + if (pcf != null && pcf instanceof ConnectionFactoryReferenceFactoryService) { + return ((ConnectionFactoryReferenceFactoryService) pcf).getName(); + } + } + } + return null; + } + /** * The JMS connection factory can specify another server to deploy its destinations * by passing a property server=<name of the server>. Otherwise, "default" is used by default. */ static String getActiveMQServerName(Map properties) { - return properties.containsKey(SERVER) ? properties.get(SERVER) : DEFAULT; + return properties.getOrDefault(SERVER, DEFAULT); } @Override diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSDestinationDefinitionInjectionSource.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSDestinationDefinitionInjectionSource.java index 0214f3c812f9..1bef75e7435f 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSDestinationDefinitionInjectionSource.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/deployment/JMSDestinationDefinitionInjectionSource.java @@ -24,12 +24,17 @@ import static org.wildfly.extension.messaging.activemq.CommonAttributes.DURABLE; import static org.wildfly.extension.messaging.activemq.CommonAttributes.ENTRIES; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.EXTERNAL_JMS_QUEUE; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.EXTERNAL_JMS_TOPIC; import static org.wildfly.extension.messaging.activemq.CommonAttributes.JMS_QUEUE; import static org.wildfly.extension.messaging.activemq.CommonAttributes.JMS_TOPIC; import static org.wildfly.extension.messaging.activemq.CommonAttributes.NAME; import static org.wildfly.extension.messaging.activemq.CommonAttributes.SELECTOR; import static org.wildfly.extension.messaging.activemq.CommonAttributes.SERVER; +import static org.wildfly.extension.messaging.activemq.ServerDefinition.MANAGEMENT_ADDRESS; import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.getActiveMQServerName; +import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.getDefaulResourceAdapter; +import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.targetsExternalPooledConnectionFactory; import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.targetsPooledConnectionFactory; import static org.wildfly.extension.messaging.activemq.logging.MessagingLogger.ROOT_LOGGER; @@ -62,15 +67,19 @@ import org.jboss.msc.service.ServiceTarget; import org.wildfly.extension.messaging.activemq.MessagingExtension; import org.wildfly.extension.messaging.activemq.MessagingServices; +import org.wildfly.extension.messaging.activemq.jms.DestinationConfiguration; +import org.wildfly.extension.messaging.activemq.jms.ExternalJMSQueueService; +import org.wildfly.extension.messaging.activemq.jms.ExternalJMSTopicService; import org.wildfly.extension.messaging.activemq.jms.JMSQueueConfigurationRuntimeHandler; import org.wildfly.extension.messaging.activemq.jms.JMSQueueService; +import org.wildfly.extension.messaging.activemq.jms.JMSServices; import org.wildfly.extension.messaging.activemq.jms.JMSTopicConfigurationRuntimeHandler; import org.wildfly.extension.messaging.activemq.jms.JMSTopicService; import org.wildfly.extension.messaging.activemq.jms.WildFlyBindingRegistry; /** * A binding description for JMS Destination definitions. - *

+ * * The referenced JMS definition must be directly visible to the * component declaring the annotation. @@ -78,7 +87,6 @@ * @author Eduardo Martins */ public class JMSDestinationDefinitionInjectionSource extends ResourceDefinitionInjectionSource { - private final String interfaceName; // optional attributes @@ -114,18 +122,23 @@ private String uniqueName(InjectionSource.ResolutionContext context) { } StringBuilder uniqueName = new StringBuilder(); - uniqueName.append(context.getApplicationName() + "_"); - uniqueName.append(context.getModuleName() + "_"); + uniqueName.append(context.getApplicationName()).append("_"); + uniqueName.append(context.getModuleName()).append("_"); if (context.getComponentName() != null) { - uniqueName.append(context.getComponentName() + "_"); + uniqueName.append(context.getComponentName()).append("_"); } uniqueName.append(jndiName); return uniqueName.toString(); } + @Override public void getResourceValue(final InjectionSource.ResolutionContext context, final ServiceBuilder serviceBuilder, final DeploymentPhaseContext phaseContext, final Injector injector) throws DeploymentUnitProcessingException { - if (targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) { - startActiveMQDestination(context, serviceBuilder, phaseContext, injector); + if(resourceAdapter == null || resourceAdapter.isEmpty()) { + resourceAdapter = getDefaulResourceAdapter(phaseContext.getDeploymentUnit()); + } + boolean external = targetsExternalPooledConnectionFactory(resourceAdapter, phaseContext.getServiceRegistry()); + if (external || targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) { + startActiveMQDestination(context, serviceBuilder, phaseContext, injector, external); } else { // delegate to the resource-adapter subsystem to create a generic JCA admin object. AdministeredObjectDefinitionInjectionSource aodis = new AdministeredObjectDefinitionInjectionSource(jndiName, className, resourceAdapter); @@ -136,20 +149,23 @@ public void getResourceValue(final InjectionSource.ResolutionContext context, fi aodis.addProperty(property.getKey(), property.getValue()); } aodis.getResourceValue(context, serviceBuilder, phaseContext, injector); - } } - private void startActiveMQDestination(ResolutionContext context, ServiceBuilder serviceBuilder, DeploymentPhaseContext phaseContext, Injector injector) throws DeploymentUnitProcessingException { + private void startActiveMQDestination(ResolutionContext context, ServiceBuilder serviceBuilder, DeploymentPhaseContext phaseContext, Injector injector, boolean external) throws DeploymentUnitProcessingException { final DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit(); final String uniqueName = uniqueName(context); try { - ServiceName serviceName = MessagingServices.getActiveMQServiceName(getActiveMQServerName(properties)); - + ServiceName serviceName; + if(external) { + serviceName = MessagingServices.getActiveMQServiceName(""); + } else { + serviceName = MessagingServices.getActiveMQServiceName(getActiveMQServerName(properties)); + } if (interfaceName.equals(Queue.class.getName())) { - startQueue(uniqueName, phaseContext.getServiceTarget(), serviceName, serviceBuilder, deploymentUnit, injector); + startQueue(uniqueName, phaseContext.getServiceTarget(), serviceName, serviceBuilder, deploymentUnit, injector, external); } else { - startTopic(uniqueName, phaseContext.getServiceTarget(), serviceName, serviceBuilder, deploymentUnit, injector); + startTopic(uniqueName, phaseContext.getServiceTarget(), serviceName, serviceBuilder, deploymentUnit, injector, external); } } catch (Exception e) { throw new DeploymentUnitProcessingException(e); @@ -166,10 +182,14 @@ private void startQueue(final String queueName, final ServiceName serverServiceName, final ServiceBuilder serviceBuilder, final DeploymentUnit deploymentUnit, - final Injector injector) { + final Injector injector, + final boolean external) { final String selector = properties.containsKey(SELECTOR.getName()) ? properties.get(SELECTOR.getName()) : null; final boolean durable = properties.containsKey(DURABLE.getName()) ? Boolean.valueOf(properties.get(DURABLE.getName())) : DURABLE.getDefaultValue().asBoolean(); + final String managementAddress = properties.containsKey(MANAGEMENT_ADDRESS.getName()) ? properties.get(MANAGEMENT_ADDRESS.getName()) : MANAGEMENT_ADDRESS.getDefaultValue().asString(); + final String user = properties.containsKey("management-user") ? properties.get("management-user") : null; + final String password = properties.containsKey("management-password") ? properties.get("\"management-password") : null; ModelNode destination = new ModelNode(); destination.get(NAME).set(queueName); @@ -178,17 +198,45 @@ private void startQueue(final String queueName, destination.get(SELECTOR.getName()).set(selector); } destination.get(ENTRIES).add(jndiName); + Service queueService; + if(external) { + ServiceName pcfName= JMSServices.getPooledConnectionFactoryBaseServiceName(serverServiceName).append(resourceAdapter); + final ServiceName jmsQueueServiceName = JMSServices.getJmsQueueBaseServiceName(serverServiceName).append(queueName); + queueService = ExternalJMSQueueService.installRuntimeQueueService( + DestinationConfiguration.Builder.getInstance() + .setResourceAdapter(resourceAdapter) + .setName(queueName) + .setManagementQueueAddress(managementAddress) + .setDestinationServiceName(jmsQueueServiceName) + .setDurable(durable) + .setSelector(selector) + .setManagementUsername(user) + .setManagementPassword(password) + .build(), + serviceTarget, + pcfName); + } else { + queueService = JMSQueueService.installService(queueName, serviceTarget, serverServiceName, selector, durable); + } + - Service queueService = JMSQueueService.installService(queueName, serviceTarget, serverServiceName, selector, durable); inject(serviceBuilder, injector, queueService); //create the management registration - String serverName = getActiveMQServerName(properties); - final PathElement serverElement = PathElement.pathElement(SERVER, serverName); - final PathElement dest = PathElement.pathElement(JMS_QUEUE, queueName); + String serverName = null; final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT); - deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); - PathAddress registration = PathAddress.pathAddress(serverElement, dest); + PathAddress registration; + if (external) { + final PathElement dest = PathElement.pathElement(EXTERNAL_JMS_QUEUE, queueName); + deploymentResourceSupport.getDeploymentSubsystemModel(MessagingExtension.SUBSYSTEM_NAME); + registration = PathAddress.pathAddress(dest); + } else { + serverName = getActiveMQServerName(properties); + final PathElement dest = PathElement.pathElement(JMS_QUEUE, queueName); + final PathElement serverElement = PathElement.pathElement(SERVER, serverName); + deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); + registration = PathAddress.pathAddress(serverElement, dest); + } MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit); JMSQueueConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, queueName, destination); } @@ -198,21 +246,50 @@ private void startTopic(String topicName, ServiceName serverServiceName, ServiceBuilder serviceBuilder, DeploymentUnit deploymentUnit, - Injector injector) { + Injector injector, + final boolean external) { + final String managementAddress = properties.containsKey(MANAGEMENT_ADDRESS.getName()) ? properties.get(MANAGEMENT_ADDRESS.getName()) : MANAGEMENT_ADDRESS.getDefaultValue().asString(); + final String user = properties.containsKey("management-user") ? properties.get("management-user") : null; + final String password = properties.containsKey("management-password") ? properties.get("\"management-password") : null; ModelNode destination = new ModelNode(); destination.get(NAME).set(topicName); destination.get(ENTRIES).add(jndiName); - Service topicService = JMSTopicService.installService(topicName, serverServiceName, serviceTarget); + Service topicService; + if(external) { + ServiceName pcfName = JMSServices.getPooledConnectionFactoryBaseServiceName(serverServiceName).append(resourceAdapter); + final ServiceName jmsTopicServiceName = JMSServices.getJmsTopicBaseServiceName(serverServiceName).append(topicName); + topicService = ExternalJMSTopicService.installRuntimeTopicService( + DestinationConfiguration.Builder.getInstance() + .setResourceAdapter(resourceAdapter) + .setName(topicName) + .setManagementQueueAddress(managementAddress) + .setManagementUsername(user) + .setManagementPassword(password) + .setDestinationServiceName(jmsTopicServiceName) + .build(), + serviceTarget, + pcfName); + } else { + topicService = JMSTopicService.installService(topicName, serverServiceName, serviceTarget); + } inject(serviceBuilder, injector, topicService); //create the management registration - String serverName = getActiveMQServerName(properties); - final PathElement serverElement = PathElement.pathElement(SERVER, serverName); - final PathElement dest = PathElement.pathElement(JMS_TOPIC, topicName); + String serverName = null; final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT); - deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); - PathAddress registration = PathAddress.pathAddress(serverElement, dest); + PathAddress registration; + if (external) { + final PathElement dest = PathElement.pathElement(EXTERNAL_JMS_TOPIC, topicName); + deploymentResourceSupport.getDeploymentSubsystemModel(MessagingExtension.SUBSYSTEM_NAME); + registration = PathAddress.pathAddress(dest); + } else { + serverName = getActiveMQServerName(properties); + final PathElement dest = PathElement.pathElement(JMS_TOPIC, topicName); + final PathElement serverElement = PathElement.pathElement(SERVER, serverName); + deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement); + registration = PathAddress.pathAddress(serverElement, dest); + } MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit); JMSTopicConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, topicName, destination); } diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/AbstractJMSRuntimeHandler.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/AbstractJMSRuntimeHandler.java index 65ea26bbe8ab..ad134fab0105 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/AbstractJMSRuntimeHandler.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/AbstractJMSRuntimeHandler.java @@ -23,15 +23,18 @@ package org.wildfly.extension.messaging.activemq.jms; import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.INCLUDE_DEFAULTS; +import static org.wildfly.extension.messaging.activemq.CommonAttributes.SERVER; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.jboss.as.controller.AbstractRuntimeOnlyHandler; import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationFailedException; import org.jboss.as.controller.PathAddress; +import org.jboss.as.controller.PathElement; import org.jboss.as.controller.descriptions.ModelDescriptionConstants; import org.jboss.dmr.ModelNode; import org.wildfly.extension.messaging.activemq.logging.MessagingLogger; @@ -78,8 +81,13 @@ private static IllegalStateException unknownOperation(String opName) { private T getResourceConfig(final PathAddress operationAddress) throws OperationFailedException { final String name = operationAddress.getLastElement().getValue(); - final String server = operationAddress.getElement(operationAddress.size() - 2).getValue(); - + PathElement serverElt = operationAddress.getParent().getLastElement(); + final String server; + if(serverElt != null && SERVER.equals(serverElt.getKey())) { + server = serverElt.getValue(); + } else { + server = ""; + } T config = resources.get(new ResourceConfig(server, name)); if (config == null) { @@ -95,28 +103,41 @@ private static final class ResourceConfig { private ResourceConfig(final String server, final String name) { this.name = name; - this.server = server; + if (server == null) { + this.server = ""; + } else { + this.server = server; + } } @Override - public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - final ResourceConfig that = (ResourceConfig) o; - - if (!name.equals(that.name)) return false; - if (!server.equals(that.server)) return false; - - return true; + public int hashCode() { + int result = Objects.hashCode(this.server); + result = 31 * result + Objects.hashCode(this.name); + return result; } @Override - public int hashCode() { - int result = server.hashCode(); - result = 31 * result + name.hashCode(); - return result; + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ResourceConfig other = (ResourceConfig) obj; + if (!Objects.equals(this.server, other.server)) { + return false; + } + if (!Objects.equals(this.name, other.name)) { + return false; + } + return true; } + } } diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/DestinationConfiguration.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/DestinationConfiguration.java new file mode 100644 index 000000000000..73518579c208 --- /dev/null +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/DestinationConfiguration.java @@ -0,0 +1,147 @@ +/* + * Copyright 2018 JBoss by Red Hat. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wildfly.extension.messaging.activemq.jms; + +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.jboss.msc.service.ServiceName; + +/** + * + * @author Emmanuel Hugonnet (c) 2018 Red Hat, inc. + */ +public class DestinationConfiguration { + + private final boolean durable; + private final String selector; + private final String name; + private final String managementQueueAddress; + private final String managementUsername; + private final String managementPassword; + private final String resourceAdapter; + private final ServiceName destinationServiceName; + + public DestinationConfiguration(boolean durable, String selector, String name, String managementQueueAddress, + String managementUsername, String managementPassword, String resourceAdapter, ServiceName destinationServiceName) { + this.durable = durable; + this.selector = selector; + this.name = name; + this.managementQueueAddress = managementQueueAddress; + this.managementUsername = managementUsername; + this.managementPassword = managementPassword; + this.resourceAdapter = resourceAdapter; + this.destinationServiceName = destinationServiceName; + } + + public boolean isDurable() { + return durable; + } + + public String getSelector() { + return selector; + } + + public String getName() { + return name; + } + + public ServiceName getDestinationServiceName() { + return destinationServiceName; + } + + public String getResourceAdapter() { + return resourceAdapter; + } + + public Queue getManagementQueue() { + return ActiveMQJMSClient.createQueue(this.managementQueueAddress); + } + + public QueueConnection createQueueConnection(QueueConnectionFactory cf) throws JMSException { + if(this.managementUsername != null && !this.managementUsername.isEmpty()) { + return cf.createQueueConnection(managementUsername, managementPassword); + } + return cf.createQueueConnection(); + } + + public static class Builder { + + private boolean durable = false; + private String selector = null; + private String name; + private String managementQueueAddress = "activemq.management"; + private String managementUsername = null; + private String managementPassword = null; + private String resourceAdapter; + private ServiceName destinationServiceName; + + private Builder() { + } + + public static Builder getInstance() { + return new Builder(); + } + + public Builder setDurable(boolean durable) { + this.durable = durable; + return this; + } + + public Builder setSelector(String selector) { + this.selector = selector; + return this; + } + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setDestinationServiceName(ServiceName destinationServiceName) { + this.destinationServiceName = destinationServiceName; + return this; + } + + public Builder setResourceAdapter(String resourceAdapter) { + this.resourceAdapter = resourceAdapter; + return this; + } + + public Builder setManagementQueueAddress(String managementQueueAddress) { + this.managementQueueAddress = managementQueueAddress; + return this; + } + + public Builder setManagementUsername(String managementUsername) { + this.managementUsername = managementUsername; + return this; + } + + public Builder setManagementPassword(String managementPassword) { + this.managementPassword = managementPassword; + return this; + } + + public DestinationConfiguration build() { + return new DestinationConfiguration(durable, selector, name, managementQueueAddress, managementUsername, + managementPassword,resourceAdapter, destinationServiceName); + } + + } +} diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueDefinition.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueDefinition.java index c8937c664019..f48b8ba3ff4e 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueDefinition.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueDefinition.java @@ -34,10 +34,9 @@ public class ExternalJMSQueueDefinition extends PersistentResourceDefinition { public static final AttributeDefinition[] ATTRIBUTES = {CommonAttributes.DESTINATION_ENTRIES}; - private final boolean registerRuntimeOnly; - public ExternalJMSQueueDefinition(final boolean registerRuntimeOnly) { + public ExternalJMSQueueDefinition(boolean registerRuntimeOnly) { super(MessagingExtension.EXTERNAL_JMS_QUEUE_PATH, MessagingExtension.getResourceDescriptionResolver(CommonAttributes.EXTERNAL_JMS_QUEUE), ExternalJMSQueueAdd.INSTANCE, diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java index 0d068ba73ce5..63ddc0c530d9 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java @@ -16,8 +16,24 @@ package org.wildfly.extension.messaging.activemq.jms; +import static org.wildfly.extension.messaging.activemq.logging.MessagingLogger.ROOT_LOGGER; + +import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.QueueRequestor; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.naming.NamingException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.jboss.as.connector.util.ConnectorServices; +import org.jboss.as.naming.NamingContext; +import org.jboss.as.naming.NamingStore; +import org.jboss.as.naming.service.NamingService; import org.jboss.msc.service.Service; import org.jboss.msc.service.ServiceBuilder; @@ -26,6 +42,7 @@ import org.jboss.msc.service.StartContext; import org.jboss.msc.service.StartException; import org.jboss.msc.service.StopContext; +import org.jboss.msc.value.InjectedValue; /** * Service responsible for creating and destroying a client {@code javax.jms.Queue}. @@ -36,19 +53,90 @@ public class ExternalJMSQueueService implements Service { static final String JMS_QUEUE_PREFIX = "jms.queue."; private final String queueName; + private final DestinationConfiguration config; + private final InjectedValue namingStoreInjector = new InjectedValue(); + private final InjectedValue pcfInjector = new InjectedValue(); private Queue queue; public ExternalJMSQueueService(final String queueName) { this.queueName = queueName; + this.config = null; + } + + private ExternalJMSQueueService(final DestinationConfiguration config) { + this.queueName = config.getName(); + this.config = config; } @Override public synchronized void start(final StartContext context) throws StartException { + NamingStore namingStore = namingStoreInjector.getOptionalValue(); + if(namingStore!= null) { + Queue managementQueue = config.getManagementQueue(); + final NamingContext storeBaseContext = new NamingContext(namingStore, null); + try { + QueueConnectionFactory cf = (QueueConnectionFactory) storeBaseContext.lookup(pcfInjector.getValue().getBindInfo().getAbsoluteJndiName()); + try (QueueConnection connection = config.createQueueConnection(cf)) { + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + QueueRequestor requestor = new QueueRequestor(session, managementQueue); + Message m = session.createMessage(); + if (config.getSelector() != null && !config.getSelector().isEmpty()) { + org.apache.activemq.artemis.api.jms.management.JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, "createQueue", JMS_QUEUE_PREFIX + queueName, JMS_QUEUE_PREFIX + queueName, config.getSelector(), config.isDurable(), RoutingType.ANYCAST.name()); + } else { + org.apache.activemq.artemis.api.jms.management.JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, "createQueue", JMS_QUEUE_PREFIX + queueName, JMS_QUEUE_PREFIX + queueName, config.isDurable(), RoutingType.ANYCAST.name()); + } + Message reply = requestor.request(m); + ROOT_LOGGER.debugf("Creating queue %s returned %s", JMS_QUEUE_PREFIX + queueName, reply); + if(!reply.getBooleanProperty("_AMQ_OperationSucceeded")) { + throw ROOT_LOGGER.remoteDestinationCreationFailed(JMS_QUEUE_PREFIX + queueName, reply.getBody(String.class)); + } + ROOT_LOGGER.debugf("Queue %s has been created", JMS_QUEUE_PREFIX + queueName); + } + } catch (NamingException | JMSException ex) { + throw new StartException(ex); + } finally { + try { + storeBaseContext.close(); + } catch (NamingException ex) { + throw new StartException(ex); + } + } + } queue = ActiveMQJMSClient.createQueue(JMS_QUEUE_PREFIX + queueName); } @Override public synchronized void stop(final StopContext context) { + NamingStore namingStore = namingStoreInjector.getOptionalValue(); + if(namingStore!= null) { + Queue managementQueue = config.getManagementQueue(); + final NamingContext storeBaseContext = new NamingContext(namingStore, null); + try { + QueueConnectionFactory cf = (QueueConnectionFactory) storeBaseContext.lookup(pcfInjector.getValue().getBindInfo().getAbsoluteJndiName()); + try (QueueConnection connection = config.createQueueConnection(cf)) { + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + QueueRequestor requestor = new QueueRequestor(session, managementQueue); + Message m = session.createMessage(); + org.apache.activemq.artemis.api.jms.management.JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, "destroyQueue", JMS_QUEUE_PREFIX + queueName, true, true); + Message reply = requestor.request(m); + ROOT_LOGGER.debugf("Deleting queue %s returned %s", JMS_QUEUE_PREFIX + queueName, reply); + if(!reply.getBooleanProperty("_AMQ_OperationSucceeded")) { + throw ROOT_LOGGER.remoteDestinationDeletionFailed(JMS_QUEUE_PREFIX + queueName, reply.getBody(String.class)); + } + ROOT_LOGGER.debugf("Queue %s has been deleted", JMS_QUEUE_PREFIX + queueName); + } + } catch (NamingException | JMSException ex) { + throw new RuntimeException(ex); + } finally { + try { + storeBaseContext.close(); + } catch (NamingException ex) { + throw new RuntimeException(ex); + } + } + } queue = null; } @@ -63,4 +151,14 @@ public static Service installService(final String name, final ServiceTarg serviceBuilder.install(); return service; } + + public static Service installRuntimeQueueService(final DestinationConfiguration config, final ServiceTarget serviceTarget, final ServiceName pcf) { + final ExternalJMSQueueService service = new ExternalJMSQueueService(config); + final ServiceBuilder serviceBuilder = serviceTarget.addService(config.getDestinationServiceName(), service); + serviceBuilder.addDependency(NamingService.SERVICE_NAME, NamingStore.class, service.namingStoreInjector); + serviceBuilder.addDependency(pcf, ExternalPooledConnectionFactoryService.class, service.pcfInjector); + serviceBuilder.addDependencies(ConnectorServices.RESOURCE_ADAPTER_SERVICE_PREFIX.append(config.getResourceAdapter())); + serviceBuilder.install(); + return service; + } } diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java index 947cdaeb3b93..590c28a6a255 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java @@ -16,8 +16,27 @@ package org.wildfly.extension.messaging.activemq.jms; + +import static org.wildfly.extension.messaging.activemq.jms.ExternalJMSQueueService.JMS_QUEUE_PREFIX; +import static org.wildfly.extension.messaging.activemq.logging.MessagingLogger.ROOT_LOGGER; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.QueueRequestor; +import javax.jms.QueueSession; +import javax.jms.Session; import javax.jms.Topic; +import javax.naming.NamingException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.jboss.as.connector.util.ConnectorServices; +import org.jboss.as.naming.NamingContext; +import org.jboss.as.naming.NamingStore; +import org.jboss.as.naming.service.NamingService; import org.jboss.msc.service.Service; import org.jboss.msc.service.ServiceBuilder; @@ -26,6 +45,7 @@ import org.jboss.msc.service.StartContext; import org.jboss.msc.service.StartException; import org.jboss.msc.service.StopContext; +import org.jboss.msc.value.InjectedValue; /** * Service responsible for creating and destroying a client {@code javax.jms.Topic}. @@ -34,21 +54,88 @@ */ public class ExternalJMSTopicService implements Service { private final String name; + private final DestinationConfiguration config; static final String JMS_TOPIC_PREFIX = "jms.topic."; + private final InjectedValue namingStoreInjector = new InjectedValue(); + private final InjectedValue pcfInjector = new InjectedValue(); private Topic topic; public ExternalJMSTopicService(String name) { this.name = name; + this.config = null; + } + + private ExternalJMSTopicService(final DestinationConfiguration config) { + this.name = config.getName(); + this.config = config; } @Override public synchronized void start(final StartContext context) throws StartException { + NamingStore namingStore = namingStoreInjector.getOptionalValue(); + if (namingStore != null) { + Queue managementQueue = config.getManagementQueue(); + final NamingContext storeBaseContext = new NamingContext(namingStore, null); + try { + QueueConnectionFactory cf = (QueueConnectionFactory) storeBaseContext.lookup(pcfInjector.getValue().getBindInfo().getAbsoluteJndiName()); + try (QueueConnection connection = config.createQueueConnection(cf)) { + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + QueueRequestor requestor = new QueueRequestor(session, managementQueue); + Message m = session.createMessage(); + org.apache.activemq.artemis.api.jms.management.JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, "createQueue", JMS_TOPIC_PREFIX + name, JMS_TOPIC_PREFIX + name, config.isDurable(), RoutingType.MULTICAST.name()); + Message reply = requestor.request(m); + ROOT_LOGGER.debugf("Creating topic " + JMS_TOPIC_PREFIX + name + " returned " + reply); + if(!reply.getBooleanProperty("_AMQ_OperationSucceeded")) { + throw ROOT_LOGGER.remoteDestinationCreationFailed(JMS_TOPIC_PREFIX + name, reply.getBody(String.class)); + } + ROOT_LOGGER.debugf("Topic %s has been created", JMS_TOPIC_PREFIX + name); + } + } catch (NamingException | JMSException ex) { + throw new StartException(ex); + } finally { + try { + storeBaseContext.close(); + } catch (NamingException ex) { + throw new StartException(ex); + } + } + } topic = ActiveMQJMSClient.createTopic(JMS_TOPIC_PREFIX + name); } @Override public synchronized void stop(final StopContext context) { + NamingStore namingStore = namingStoreInjector.getOptionalValue(); + if (namingStore != null) { + Queue managementQueue = config.getManagementQueue(); + final NamingContext storeBaseContext = new NamingContext(namingStore, null); + try { + QueueConnectionFactory cf = (QueueConnectionFactory) storeBaseContext.lookup(pcfInjector.getValue().getBindInfo().getAbsoluteJndiName()); + try (QueueConnection connection = config.createQueueConnection(cf)) { + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + QueueRequestor requestor = new QueueRequestor(session, managementQueue); + Message m = session.createMessage(); + org.apache.activemq.artemis.api.jms.management.JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, "destroyQueue", JMS_TOPIC_PREFIX + name, true, true); + Message reply = requestor.request(m); + ROOT_LOGGER.debugf("Deleting topic " + JMS_TOPIC_PREFIX + name + " returned " + reply); + if(!reply.getBooleanProperty("_AMQ_OperationSucceeded")) { + throw ROOT_LOGGER.remoteDestinationDeletionFailed(JMS_QUEUE_PREFIX + name, reply.getBody(String.class)); + } + ROOT_LOGGER.debugf("Topic %s has been deleted", JMS_TOPIC_PREFIX + name); + } + } catch (NamingException | JMSException ex) { + throw new RuntimeException(ex); + } finally { + try { + storeBaseContext.close(); + } catch (NamingException ex) { + throw new RuntimeException(ex); + } + } + } topic = null; } @@ -63,4 +150,14 @@ public static ExternalJMSTopicService installService(final String name, final Se serviceBuilder.install(); return service; } + + public static ExternalJMSTopicService installRuntimeTopicService(final DestinationConfiguration config, final ServiceTarget serviceTarget, final ServiceName pcf) { + final ExternalJMSTopicService service = new ExternalJMSTopicService(config); + final ServiceBuilder serviceBuilder = serviceTarget.addService(config.getDestinationServiceName(), service); + serviceBuilder.addDependency(NamingService.SERVICE_NAME, NamingStore.class, service.namingStoreInjector); + serviceBuilder.addDependency(pcf, ExternalPooledConnectionFactoryService.class, service.pcfInjector); + serviceBuilder.addDependencies(ConnectorServices.RESOURCE_ADAPTER_SERVICE_PREFIX.append(config.getResourceAdapter())); + serviceBuilder.install(); + return service; + } } diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalPooledConnectionFactoryService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalPooledConnectionFactoryService.java index 4f5e31a9f193..88d41240f350 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalPooledConnectionFactoryService.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalPooledConnectionFactoryService.java @@ -111,6 +111,7 @@ import org.wildfly.common.function.ExceptionSupplier; import org.wildfly.extension.messaging.activemq.ActiveMQResourceAdapter; import org.wildfly.extension.messaging.activemq.DiscoveryGroupAdd; +import org.wildfly.extension.messaging.activemq.ExternalBrokerConfigurationService; import org.wildfly.extension.messaging.activemq.GroupBindingService; import org.wildfly.extension.messaging.activemq.MessagingExtension; import org.wildfly.extension.messaging.activemq.MessagingServices; @@ -126,7 +127,7 @@ * * @author Emmanuel Hugonnet (c) 2018 Red Hat, inc. */ -public class ExternalPooledConnectionFactoryService implements Service { +public class ExternalPooledConnectionFactoryService implements Service { private static final ServiceName JBOSS_MESSAGING_ACTIVEMQ = ServiceName.JBOSS.append(MessagingExtension.SUBSYSTEM_NAME); private static final List EMPTY_LOCL = Collections.emptyList(); @@ -180,11 +181,13 @@ public class ExternalPooledConnectionFactoryService implements Service { // can be null. In that case the behaviour is depending on the IronJacamar container setting. private final Boolean enlistmentTrace; private ExceptionSupplier credentialSourceSupplier; + private final boolean createBinderService; private final CapabilityServiceSupport capabilityServiceSupport; + public ExternalPooledConnectionFactoryService(String name, TransportConfiguration[] connectors, DiscoveryGroupConfiguration groupConfiguration, String jgroupsClusterName, String jgroupsChannelName, List adapterParams, BindInfo bindInfo, List jndiAliases, String txSupport, - int minPoolSize, int maxPoolSize, String managedConnectionPoolClassName, Boolean enlistmentTrace, CapabilityServiceSupport capabilityServiceSupport) { + int minPoolSize, int maxPoolSize, String managedConnectionPoolClassName, Boolean enlistmentTrace, CapabilityServiceSupport capabilityServiceSupport, boolean createBinderService) { this.name = name; this.connectors = connectors; this.discoveryGroupConfiguration = groupConfiguration; @@ -193,6 +196,7 @@ public ExternalPooledConnectionFactoryService(String name, TransportConfiguratio this.adapterParams = adapterParams; this.bindInfo = bindInfo; this.jndiAliases = new ArrayList<>(jndiAliases); + this.createBinderService = createBinderService; this.txSupport = txSupport; this.minPoolSize = minPoolSize; this.maxPoolSize = maxPoolSize; @@ -201,11 +205,46 @@ public ExternalPooledConnectionFactoryService(String name, TransportConfiguratio this.capabilityServiceSupport = capabilityServiceSupport; } + public TransportConfiguration[] getConnectors() { + return Arrays.copyOf(connectors, connectors.length); + } + + public BindInfo getBindInfo() { + return bindInfo; + } static ServiceName getResourceAdapterActivatorsServiceName(String name) { return ConnectorServices.RESOURCE_ADAPTER_ACTIVATOR_SERVICE.append(name); } + public static ExternalPooledConnectionFactoryService installService(ServiceTarget serviceTarget, + ExternalBrokerConfigurationService configuration, + String name, + TransportConfiguration[] connectors, + DiscoveryGroupConfiguration groupConfiguration, + Set connectorsSocketBindings, + String jgroupClusterName, + String jgroupChannelName, + List adapterParams, + BindInfo bindInfo, + List jndiAliases, + String txSupport, + int minPoolSize, + int maxPoolSize, + String managedConnectionPoolClassName, + Boolean enlistmentTrace, + CapabilityServiceSupport capabilityServiceSupport) + throws OperationFailedException { + + ServiceName serviceName = JMSServices.getPooledConnectionFactoryBaseServiceName(JBOSS_MESSAGING_ACTIVEMQ).append(name); + ExternalPooledConnectionFactoryService service = new ExternalPooledConnectionFactoryService(name, + connectors, groupConfiguration, jgroupClusterName, jgroupChannelName, adapterParams, + bindInfo, jndiAliases, txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, + capabilityServiceSupport, false); + ServiceBuilder serviceBuilder = serviceTarget.addService(serviceName); + installService0(serviceBuilder, configuration, service, groupConfiguration, connectorsSocketBindings, capabilityServiceSupport); + return service; + } public static ExternalPooledConnectionFactoryService installService(OperationContext context, String name, @@ -227,9 +266,7 @@ public static ExternalPooledConnectionFactoryService installService(OperationCon ServiceName serviceName = JMSServices.getPooledConnectionFactoryBaseServiceName(JBOSS_MESSAGING_ACTIVEMQ).append(name); ExternalPooledConnectionFactoryService service = new ExternalPooledConnectionFactoryService(name, connectors, groupConfiguration, jgroupClusterName, jgroupChannelName, adapterParams, - bindInfo, jndiAliases, txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, - context.getCapabilityServiceSupport()); - + bindInfo, jndiAliases, txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, context.getCapabilityServiceSupport(), true); installService0(context, serviceName, service, groupConfiguration, connectorsSocketBindings, model); return service; } @@ -278,9 +315,44 @@ private static void installService0(OperationContext context, serviceBuilder.install(); } + private static void installService0(ServiceBuilder serviceBuilder, + ExternalBrokerConfigurationService configuration, + ExternalPooledConnectionFactoryService service, + DiscoveryGroupConfiguration groupConfiguration, + Set connectorsSocketBindings, + CapabilityServiceSupport capabilityServiceSupport) throws OperationFailedException { + serviceBuilder.requires(capabilityServiceSupport.getCapabilityServiceName(MessagingServices.LOCAL_TRANSACTION_PROVIDER_CAPABILITY)); + // ensures that Artemis client thread pools are not stopped before any deployment depending on a pooled-connection-factory + serviceBuilder.requires(MessagingServices.ACTIVEMQ_CLIENT_THREAD_POOL); + Map outbounds = configuration.getOutboundSocketBindings(); + for (final String connectorSocketBinding : connectorsSocketBindings) { + // find whether the connectorSocketBinding references a SocketBinding or an OutboundSocketBinding + if (outbounds.containsKey(connectorSocketBinding)) { + Supplier outboundSocketBindingSupplier = serviceBuilder.requires(configuration.getOutboundSocketBindings().get(connectorSocketBinding)); + service.outboundSocketBindings.put(connectorSocketBinding, outboundSocketBindingSupplier); + } else { + Supplier socketBindingSupplier = serviceBuilder.requires(configuration.getSocketBindings().get(connectorSocketBinding)); + service.socketBindings.put(connectorSocketBinding, socketBindingSupplier); + } + } + if (groupConfiguration != null) { + final String key = "discovery" + groupConfiguration.getName(); + if (service.jgroupsClusterName != null) { + Supplier commandDispatcherFactorySupplier = serviceBuilder.requires(configuration.getCommandDispatcherFactories().get(key)); + service.commandDispatcherFactories.put(key, commandDispatcherFactorySupplier); + service.clusterNames.put(key, service.jgroupsClusterName); + } else { + Supplier socketBindingSupplier = serviceBuilder.requires(configuration.getGroupBindings().get(key)); + service.groupBindings.put(key, socketBindingSupplier); + } + } + serviceBuilder.setInstance(service); + serviceBuilder.install(); + } + @Override - public Void getValue() throws IllegalStateException, IllegalArgumentException { - return null; + public ExternalPooledConnectionFactoryService getValue() throws IllegalStateException, IllegalArgumentException { + return this; } @Override @@ -415,7 +487,7 @@ private void createService(ServiceTarget serviceTarget, ServiceContainer contain ResourceAdapterActivatorService activator = new ResourceAdapterActivatorService(cmd, activation, ExternalPooledConnectionFactoryService.class.getClassLoader(), name); activator.setBindInfo(bindInfo); - activator.setCreateBinderService(true); + activator.setCreateBinderService(createBinderService); activator.addJndiAliases(jndiAliases); final ServiceBuilder sb diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/logging/MessagingLogger.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/logging/MessagingLogger.java index f8e7f9f4b09f..742aa68a2aeb 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/logging/MessagingLogger.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/logging/MessagingLogger.java @@ -858,4 +858,10 @@ public interface MessagingLogger extends BasicLogger { @Message(id = 98, value = "Unable to load module %s - the module or one of its dependencies is missing [%s]") OperationFailedException moduleNotFound(String moduleName, String missingModule, @Cause ModuleNotFoundException e); + + @Message(id = 99, value = "Creating the remote destination %s failed with error %s") + StartException remoteDestinationCreationFailed(String destinationName, String error); + + @Message(id = 100, value = "Deleting the remote destination %s failed with error %s") + RuntimeException remoteDestinationDeletionFailed(String destinationName, String error); }