Skip to content

Commit

Permalink
[WFLY-6134] JMS resource definitions backed by messaging-activemq
Browse files Browse the repository at this point in the history
Allows JMS resource definitions to specify a pooled-connection-factory
from the messaging-activemq subsystem (using the resourceAdapter
property) in addition to regular resource adapters (configured in the
resource-adapters subsystem).

JIRA: https://issues.jboss.org/browse/WFLY-6134
  • Loading branch information
jmesnil committed Feb 5, 2016
1 parent d23b226 commit 726d590
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
Expand Up @@ -63,11 +63,14 @@
import org.jboss.msc.inject.Injector;
import org.jboss.msc.service.ServiceBuilder;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceRegistry;
import org.jboss.msc.service.ServiceTarget;
import org.wildfly.extension.messaging.activemq.CommonAttributes;
import org.wildfly.extension.messaging.activemq.MessagingExtension;
import org.wildfly.extension.messaging.activemq.MessagingServices;
import org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttribute;
import org.wildfly.extension.messaging.activemq.jms.ConnectionFactoryAttributes;
import org.wildfly.extension.messaging.activemq.jms.JMSServices;
import org.wildfly.extension.messaging.activemq.jms.PooledConnectionFactoryConfigProperties;
import org.wildfly.extension.messaging.activemq.jms.PooledConnectionFactoryConfigurationRuntimeHandler;
import org.wildfly.extension.messaging.activemq.jms.PooledConnectionFactoryDefinition;
Expand Down Expand Up @@ -151,8 +154,7 @@ void setMinPoolSize(int minPoolSize) {
public void getResourceValue(ResolutionContext context, ServiceBuilder<?> serviceBuilder, DeploymentPhaseContext phaseContext, Injector<ManagedReferenceFactory> injector) throws DeploymentUnitProcessingException {
final DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit();

// if the resourceAdapter is empty, we use the embedded Artemis server to create a pooled-connection-factory
if (resourceAdapter == null || resourceAdapter.isEmpty()) {
if (targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) {
try {
startedPooledConnectionFactory(context, jndiName, serviceBuilder, phaseContext.getServiceTarget(), deploymentUnit, injector);
} catch (OperationFailedException e) {
Expand Down Expand Up @@ -222,9 +224,10 @@ private void startedPooledConnectionFactory(ResolutionContext context, String na
List<PooledConnectionFactoryConfigProperties> adapterParams = getAdapterParams(model);
String txSupport = transactional ? XA_TX : NO_TX;

final String serverName = getActiveMQServerName(properties);
final String pcfName = uniqueName(context, name);
final ContextNames.BindInfo bindInfo = ContextNames.bindInfoForEnvEntry(context.getApplicationName(), context.getModuleName(), context.getComponentName(), !context.isCompUsesModule(), name);
PooledConnectionFactoryService.installService(serviceTarget, pcfName, getActiveMQServerName(), connectors,
PooledConnectionFactoryService.installService(serviceTarget, pcfName, serverName, connectors,
discoveryGroupName, jgroupsChannelName, adapterParams,
bindInfo,
txSupport, minPoolSize, maxPoolSize, managedConnectionPoolClassName, enlistmentTrace, true);
Expand All @@ -235,13 +238,13 @@ private void startedPooledConnectionFactory(ResolutionContext context, String na

//create the management registration
String managementName = managementName(context, name);
final PathElement serverElement = PathElement.pathElement(SERVER, getActiveMQServerName());
final PathElement serverElement = PathElement.pathElement(SERVER, serverName);
final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT);
deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement);
final PathElement pcfPath = PathElement.pathElement(POOLED_CONNECTION_FACTORY, managementName);
PathAddress registration = PathAddress.pathAddress(serverElement, pcfPath);
MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit);
PooledConnectionFactoryConfigurationRuntimeHandler.INSTANCE.registerResource(getActiveMQServerName(), managementName, model);
PooledConnectionFactoryConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, managementName, model);
}

private List<String> getConnectors(Map<String, String> props) {
Expand Down Expand Up @@ -309,11 +312,28 @@ private List<PooledConnectionFactoryConfigProperties> getAdapterParams(ModelNode
return props;
}


/**
* Return whether the definition targets an existing pooled connection factory or use a JCA-based ConnectionFactory.
*
* Checks the service registry for a PooledConnectionFactoryService with the ServiceName
* created by the {@code server} property (or {@code "default") and the {@code resourceAdapter} property.
*/
static boolean targetsPooledConnectionFactory(String server, String resourceAdapter, ServiceRegistry serviceRegistry) {
// if the resourceAdapter is not defined, the default behaviour is to create a pooled-connection-factory.
if (resourceAdapter == null || resourceAdapter.isEmpty()) {
return true;
}
ServiceName activeMQServiceName = MessagingServices.getActiveMQServiceName(server);
ServiceName pcfName = JMSServices.getPooledConnectionFactoryBaseServiceName(activeMQServiceName).append(resourceAdapter);
return serviceRegistry.getServiceNames().contains(pcfName);
}

/**
* The JMS connection factory can specify another server to deploy its destinations
* by passing a property server=&lt;name of the server>. Otherwise, "default" is used by default.
*/
private String getActiveMQServerName() {
static String getActiveMQServerName(Map<String, String> properties) {
return properties.containsKey(SERVER) ? properties.get(SERVER) : DEFAULT;
}

Expand Down
Expand Up @@ -22,14 +22,15 @@

package org.wildfly.extension.messaging.activemq.deployment;

import static org.wildfly.extension.messaging.activemq.CommonAttributes.DEFAULT;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.DURABLE;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.ENTRIES;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.JMS_QUEUE;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.JMS_TOPIC;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.NAME;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.SELECTOR;
import static org.wildfly.extension.messaging.activemq.CommonAttributes.SERVER;
import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.getActiveMQServerName;
import static org.wildfly.extension.messaging.activemq.deployment.JMSConnectionFactoryDefinitionInjectionSource.targetsPooledConnectionFactory;
import static org.wildfly.extension.messaging.activemq.logging.MessagingLogger.ROOT_LOGGER;

import java.util.Map;
Expand Down Expand Up @@ -122,8 +123,7 @@ private String uniqueName(InjectionSource.ResolutionContext context) {
}

public void getResourceValue(final InjectionSource.ResolutionContext context, final ServiceBuilder<?> serviceBuilder, final DeploymentPhaseContext phaseContext, final Injector<ManagedReferenceFactory> injector) throws DeploymentUnitProcessingException {
// if the resourceAdapter is empty, we use the embedded Artemis server to create the JMS destination
if (resourceAdapter == null || resourceAdapter.isEmpty()) {
if (targetsPooledConnectionFactory(getActiveMQServerName(properties), resourceAdapter, phaseContext.getServiceRegistry())) {
startActiveMQDestination(context, serviceBuilder, phaseContext, injector);
} else {
// delegate to the resource-adapter subsystem to create a generic JCA admin object.
Expand All @@ -143,7 +143,7 @@ private void startActiveMQDestination(ResolutionContext context, ServiceBuilder<
final DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit();
final String uniqueName = uniqueName(context);
try {
ServiceName serviceName = MessagingServices.getActiveMQServiceName(getActiveMQServerName());
ServiceName serviceName = MessagingServices.getActiveMQServiceName(getActiveMQServerName(properties));

if (interfaceName.equals(Queue.class.getName())) {
startQueue(uniqueName, phaseContext.getServiceTarget(), serviceName, serviceBuilder, deploymentUnit, injector);
Expand Down Expand Up @@ -182,13 +182,14 @@ private void startQueue(final String queueName,
inject(serviceBuilder, injector, queueService);

//create the management registration
final PathElement serverElement = PathElement.pathElement(SERVER, getActiveMQServerName());
String serverName = getActiveMQServerName(properties);
final PathElement serverElement = PathElement.pathElement(SERVER, serverName);
final PathElement dest = PathElement.pathElement(JMS_QUEUE, queueName);
final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT);
deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement);
PathAddress registration = PathAddress.pathAddress(serverElement, dest);
MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit);
JMSQueueConfigurationRuntimeHandler.INSTANCE.registerResource(getActiveMQServerName(), queueName, destination);
JMSQueueConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, queueName, destination);
}

private void startTopic(String topicName,
Expand All @@ -205,13 +206,14 @@ private void startTopic(String topicName,
inject(serviceBuilder, injector, topicService);

//create the management registration
final PathElement serverElement = PathElement.pathElement(SERVER, getActiveMQServerName());
String serverName = getActiveMQServerName(properties);
final PathElement serverElement = PathElement.pathElement(SERVER, serverName);
final PathElement dest = PathElement.pathElement(JMS_TOPIC, topicName);
final DeploymentResourceSupport deploymentResourceSupport = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_RESOURCE_SUPPORT);
deploymentResourceSupport.getDeploymentSubModel(MessagingExtension.SUBSYSTEM_NAME, serverElement);
PathAddress registration = PathAddress.pathAddress(serverElement, dest);
MessagingXmlInstallDeploymentUnitProcessor.createDeploymentSubModel(registration, deploymentUnit);
JMSTopicConfigurationRuntimeHandler.INSTANCE.registerResource(getActiveMQServerName(), topicName, destination);
JMSTopicConfigurationRuntimeHandler.INSTANCE.registerResource(serverName, topicName, destination);
}

private <D extends Destination> void inject(ServiceBuilder<?> serviceBuilder, Injector<ManagedReferenceFactory> injector, Service<D> destinationService) {
Expand All @@ -237,14 +239,6 @@ public void transition(final ServiceController<? extends Object> controller, fin
});
}

/**
* The JMS destination can specify another server to deploy its destinations
* by passing a property server=&lt;name of the server>. Otherwise, "default" is used by default.
*/
private String getActiveMQServerName() {
return properties.containsKey(SERVER) ? properties.get(SERVER) : DEFAULT;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Expand Up @@ -60,6 +60,8 @@
destinationName="myTopic1"
),
@JMSDestinationDefinition(
// explicitly mention a resourceAdapter corresponding to a pooled-connection-factory resource
resourceAdapter = "activemq-ra",
name="java:global/env/myQueue2",
interfaceName="javax.jms.Queue",
destinationName="myQueue2",
Expand Down Expand Up @@ -96,6 +98,8 @@
}
)
@JMSConnectionFactoryDefinition(
// explicitly mention a resourceAdapter corresponding to a pooled-connection-factory resource
resourceAdapter = "activemq-ra",
name="java:global/myFactory3",
interfaceName = "javax.jms.QueueConnectionFactory",
properties = {
Expand Down

0 comments on commit 726d590

Please sign in to comment.