Skip to content

Commit

Permalink
[WFLY-10493]: Support JMS Java EE resources definition for remote Art…
Browse files Browse the repository at this point in the history
…emis-based broker.

* Creating queue / topic on deployments
* Deleting queue / topic in undeployments
* Looking for default ee resource adapter if none is specified.
* Making @JMSConnectionFactoryDefinition work without internal broker.
* Adding support for description of runtime external destinations.
* Adding support for management address definition in the @JMSDestinationDefinition.
* Fixing WFWIP-147: @JMSConnectionFactoryDefinition with property resourceAdapater to pooled-connection-factory defined in messaging-active subsystem does not work.
* Adding support for authentication of management queue.

JIRA: https://issues.jboss.org/browse/WFLY-10493
  • Loading branch information
ehsavoie committed Jan 23, 2019
1 parent bb04653 commit 24425c2
Show file tree
Hide file tree
Showing 21 changed files with 1,434 additions and 571 deletions.
Expand Up @@ -44,6 +44,15 @@ public class ConnectionFactoryReferenceFactoryService implements Service<Managed

private final InjectedValue<Object> connectionFactoryValue = new InjectedValue<Object>();
private ManagedReference reference;
private final String name;

public ConnectionFactoryReferenceFactoryService(String name) {
this.name = name;
}

public String getName() {
return name;
}

public synchronized void start(StartContext startContext) throws StartException {
reference = new ValueManagedReference(new ImmediateValue<Object>(connectionFactoryValue.getValue()));
Expand Down
Expand Up @@ -426,7 +426,7 @@ public String[] bindConnectionFactory(URL url, String deployment, Object cf, fin
// to distinguish CFs with same name in different application (or module).
final ContextNames.BindInfo bindInfo = getBindInfo(jndi);

final ConnectionFactoryReferenceFactoryService referenceFactoryService = new ConnectionFactoryReferenceFactoryService();
final ConnectionFactoryReferenceFactoryService referenceFactoryService = new ConnectionFactoryReferenceFactoryService(deployment);
final ServiceName referenceFactoryServiceName = ConnectionFactoryReferenceFactoryService.SERVICE_NAME_BASE
.append(bindInfo.getBinderServiceName());
serviceTarget.addService(referenceFactoryServiceName, referenceFactoryService)
Expand Down
Expand Up @@ -114,7 +114,29 @@ process the consumed messages and it can use the JMSReplyTo destination
if it is defined on the message. +
If the MDB needs any other JMS destinations defined on the remote
server, it must use client-side JNDI by following the
http://activemq.apache.org/artemis/docs/1.1.0/using-jms.html#jndi-configuration[Artemis
http://http://activemq.apache.org/artemis/docs/2.6.0/using-jms.html#jndi-configuration[Artemis
documentation] or configure external configuration context in the naming
subsystem (which allows to inject the JMS resources using the
`@Resource` annotation).

[[configuration of a remote destination using annotations]]
==== Configuration of a remote destination using annotations

The annotation `@JMSDestinationDefinition` can be used to create a destination on a remote Artemis Server. This will work in the same way as for a local server.+
For this it needs to be able to access Artemis management queue. If your remote Artemis Server management queue is not the default one you can pass the management queue address as a property to the `@JMSDestinationDefinition`.
Please note that the destination is created remotely but won't be removed once the deployement is undeployed/removed.

[source, java]
----
@JMSDestinationDefinition(
// explicitly mention a resourceAdapter corresponding to a pooled-connection-factory resource to the remote server
resourceAdapter = "activemq-ra",
name="java:global/env/myQueue2",
interfaceName="javax.jms.Queue",
destinationName="myQueue2",
properties = {
"management-address=my.management.queue",
"selector=color = 'red'"
}
)
----
Expand Up @@ -32,6 +32,8 @@
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.registry.Resource;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;

/**
* Requires a reload only if the {@link ActiveMQServerService} service is up and running.
Expand All @@ -53,15 +55,15 @@ public AddStepHandler(AttributeDefinition... attributes) {

@Override
protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException {
if (ActiveMQServerService.isServiceInstalled(context)) {
if (isServiceInstalled(context)) {
context.reloadRequired();
reloadRequired = true;
}
}

@Override
protected void rollbackRuntime(OperationContext context, ModelNode operation, Resource resource) {
if (reloadRequired && ActiveMQServerService.isServiceInstalled(context)) {
if (reloadRequired && isServiceInstalled(context)) {
context.revertReloadRequired();
}
}
Expand All @@ -73,15 +75,15 @@ final class RemoveStepHandler extends AbstractRemoveStepHandler {

@Override
protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException {
if (ActiveMQServerService.isServiceInstalled(context)) {
if (isServiceInstalled(context)) {
context.reloadRequired();
reloadRequired = true;
}
}

@Override
protected void recoverServices(OperationContext context, ModelNode operation, ModelNode model) throws OperationFailedException {
if (reloadRequired && ActiveMQServerService.isServiceInstalled(context)) {
if (reloadRequired && isServiceInstalled(context)) {
context.revertReloadRequired();
}
}
Expand All @@ -102,15 +104,33 @@ protected boolean applyUpdateToRuntime(OperationContext context, ModelNode opera
ModelNode resolvedValue, ModelNode currentValue,
org.jboss.as.controller.AbstractWriteAttributeHandler.HandbackHolder<Void> handbackHolder)
throws OperationFailedException {
return ActiveMQServerService.isServiceInstalled(context);
return isServiceInstalled(context);
}

@Override
protected void revertUpdateToRuntime(OperationContext context, ModelNode operation, String attributeName,
ModelNode valueToRestore, ModelNode valueToRevert, Void handback) throws OperationFailedException {
if (ActiveMQServerService.isServiceInstalled(context)) {
if (isServiceInstalled(context)) {
context.revertReloadRequired();
}
}
}

/**
* Returns true if a {@link ServiceController} for this service has been {@link org.jboss.msc.service.ServiceBuilder#install() installed}
* in MSC under the
* {@link MessagingServices#getActiveMQServiceName(org.jboss.as.controller.PathAddress) service name appropriate to the given operation}.
*
* @param context the operation context
* @return {@code true} if a {@link ServiceController} is installed
*/
static boolean isServiceInstalled(final OperationContext context) {
if (context.isNormalServer()) {
final ServiceName serviceName = MessagingServices.getActiveMQServiceName(context.getCurrentAddress());
if (serviceName != null) {
return context.getServiceRegistry(false).getService(serviceName) != null;
}
}
return false;
}
}
Expand Up @@ -51,16 +51,13 @@
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.services.path.AbsolutePathService;
import org.jboss.as.controller.services.path.PathManager;
import org.jboss.as.network.ManagedBinding;
import org.jboss.as.network.OutboundSocketBinding;
import org.jboss.as.network.SocketBinding;
import org.jboss.as.security.plugins.SecurityDomainContext;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
Expand Down Expand Up @@ -155,6 +152,7 @@ public ActiveMQServerService(Configuration configuration,
}
}

@Override
public synchronized void start(final StartContext context) throws StartException {
ClassLoader origTCCL = org.wildfly.security.manager.WildFlySecurityManager.getCurrentContextClassLoaderPrivileged();
// Validate whether the AIO native layer can be used
Expand Down Expand Up @@ -330,23 +328,6 @@ public synchronized ActiveMQServer getValue() throws IllegalStateException {
return server;
}

/**
* Returns true if a {@link ServiceController} for this service has been {@link org.jboss.msc.service.ServiceBuilder#install() installed}
* in MSC under the
* {@link MessagingServices#getActiveMQServiceName(org.jboss.as.controller.PathAddress) service name appropriate to the given operation}.
*
* @param context the operation context
* @return {@code true} if a {@link ServiceController} is installed
*/
static boolean isServiceInstalled(final OperationContext context) {
if (context.isNormalServer()) {
final ServiceName serviceName = MessagingServices.getActiveMQServiceName(context.getCurrentAddress());
if (serviceName != null) {
return context.getServiceRegistry(false).getService(serviceName) != null;
}
}
return false;
}

CommandDispatcherFactory getCommandDispatcherFactory(String key) {
return commandDispatcherFactories.get(key).get();
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
Expand Down Expand Up @@ -140,19 +139,15 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
}
}

static void addBroadcastGroupConfigs(final OperationContext context, final Configuration configuration, final ModelNode model) throws OperationFailedException {
static void addBroadcastGroupConfigs(final OperationContext context, final List<BroadcastGroupConfiguration> configs, final Set<String> connectors, final ModelNode model) throws OperationFailedException {
if (model.hasDefined(CommonAttributes.BROADCAST_GROUP)) {
final List<BroadcastGroupConfiguration> configs = configuration.getBroadcastGroupConfigurations();
final Set<String> connectors = configuration.getConnectorConfigurations().keySet();
for (Property prop : model.get(CommonAttributes.BROADCAST_GROUP).asPropertyList()) {
configs.add(createBroadcastGroupConfiguration(context, connectors, prop.getName(), prop.getValue()));

}
}
}

static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final OperationContext context, final Set<String> connectors, final String name, final ModelNode model) throws OperationFailedException {

final long broadcastPeriod = BroadcastGroupDefinition.BROADCAST_PERIOD.resolveModelAttribute(context, model).asLong();
final List<String> connectorRefs = new ArrayList<String>();
if (model.hasDefined(CommonAttributes.CONNECTORS)) {
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
Expand Down Expand Up @@ -134,18 +133,14 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
}
}

static void addDiscoveryGroupConfigs(final OperationContext context, final Configuration configuration, final ModelNode model) throws OperationFailedException {
static Map<String, DiscoveryGroupConfiguration> addDiscoveryGroupConfigs(final OperationContext context, final ModelNode model) throws OperationFailedException {
Map<String, DiscoveryGroupConfiguration> configs = new HashMap<>();
if (model.hasDefined(CommonAttributes.DISCOVERY_GROUP)) {
Map<String, DiscoveryGroupConfiguration> configs = configuration.getDiscoveryGroupConfigurations();
if (configs == null) {
configs = new HashMap<>();
configuration.setDiscoveryGroupConfigurations(configs);
}
for (Property prop : model.get(CommonAttributes.DISCOVERY_GROUP).asPropertyList()) {
configs.put(prop.getName(), createDiscoveryGroupConfiguration(context, prop.getName(), prop.getValue()));

}
}
return configs;
}

static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final OperationContext context, final String name, final ModelNode model) throws OperationFailedException {
Expand Down
@@ -0,0 +1,99 @@
/*
* Copyright 2018 JBoss by Red Hat.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wildfly.extension.messaging.activemq;

import java.util.Map;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;

/**
*
* @author Emmanuel Hugonnet (c) 2018 Red Hat, inc.
*/
public class ExternalBrokerConfigurationService implements Service<ExternalBrokerConfigurationService> {
private final Map<String, TransportConfiguration> connectors;
private final Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations;
private final Map<String, ServiceName> socketBindings;
private final Map<String, ServiceName> outboundSocketBindings;
private final Map<String, ServiceName> groupBindings;
// mapping between the {broadcast|discovery}-groups and the cluster names they use
private final Map<String, String> clusterNames;
// mapping between the {broadcast|discovery}-groups and the command dispatcher factory they use
private final Map<String, ServiceName> commandDispatcherFactories;

public ExternalBrokerConfigurationService(final Map<String, TransportConfiguration> connectors,
Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations,
Map<String, ServiceName> socketBindings,
Map<String, ServiceName> outboundSocketBindings,
Map<String, ServiceName> groupBindings,
Map<String, ServiceName> commandDispatcherFactories,
Map<String, String> clusterNames) {
this.connectors = connectors;
this.discoveryGroupConfigurations = discoveryGroupConfigurations;
this.clusterNames = clusterNames;
this.commandDispatcherFactories = commandDispatcherFactories;
this.groupBindings = groupBindings;
this.outboundSocketBindings = outboundSocketBindings;
this.socketBindings = socketBindings;
}

@Override
public void start(StartContext context) throws StartException {
}

@Override
public void stop(StopContext context) {
}

public Map<String, TransportConfiguration> getConnectors() {
return connectors;
}

public Map<String, ServiceName> getSocketBindings() {
return socketBindings;
}

public Map<String, ServiceName> getOutboundSocketBindings() {
return outboundSocketBindings;
}

public Map<String, ServiceName> getGroupBindings() {
return groupBindings;
}

public Map<String, String> getClusterNames() {
return clusterNames;
}

public Map<String, ServiceName> getCommandDispatcherFactories() {
return commandDispatcherFactories;
}

public Map<String, DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations() {
return discoveryGroupConfigurations;
}

@Override
public ExternalBrokerConfigurationService getValue() throws IllegalStateException, IllegalArgumentException {
return this;
}

}

0 comments on commit 24425c2

Please sign in to comment.