diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java index f8776fa30980..a0968d067eb1 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/ActiveMQServerService.java @@ -284,9 +284,7 @@ public synchronized void start(final StartContext context) throws StartException if (jgroupFactories.containsKey(key)) { ChannelFactory channelFactory = jgroupFactories.get(key); String channelName = jgroupsChannels.get(key); - JChannel channel = (JChannel) channelFactory.createChannel(channelName); - channels.put(channelName, channel); - newConfigs.add(BroadcastGroupAdd.createBroadcastGroupConfiguration(name, config, channel, channelName)); + newConfigs.add(BroadcastGroupAdd.createBroadcastGroupConfiguration(name, config, channelFactory, channelName)); } else { final SocketBinding binding = groupBindings.get(key); if (binding == null) { @@ -304,16 +302,11 @@ public synchronized void start(final StartContext context) throws StartException for(final Map.Entry entry : discoveryGroups.entrySet()) { final String name = entry.getKey(); final String key = "discovery" + name; - DiscoveryGroupConfiguration config = null; + final DiscoveryGroupConfiguration config; if (jgroupFactories.containsKey(key)) { ChannelFactory channelFactory = jgroupFactories.get(key); String channelName = jgroupsChannels.get(key); - JChannel channel = channels.get(channelName); - if (channel == null) { - channel = (JChannel) channelFactory.createChannel(channelName); - channels.put(channelName, channel); - } - config = DiscoveryGroupAdd.createDiscoveryGroupConfiguration(name, entry.getValue(), channel, channelName); + config = DiscoveryGroupAdd.createDiscoveryGroupConfiguration(name, entry.getValue(), channelFactory, channelName, channels); } else { final SocketBinding binding = groupBindings.get(key); if (binding == null) { diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java index b1b98d14e959..062977122325 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/BroadcastGroupAdd.java @@ -34,7 +34,6 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; -import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.core.config.Configuration; import org.jboss.as.controller.AbstractAddStepHandler; @@ -52,7 +51,7 @@ import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.ServiceRegistry; import org.jboss.msc.service.ServiceTarget; -import org.jgroups.JChannel; +import org.wildfly.clustering.jgroups.spi.ChannelFactory; import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement; import org.wildfly.extension.messaging.activemq.logging.MessagingLogger; @@ -176,12 +175,12 @@ static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final Strin .setEndpointFactory(endpointFactory); } - static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final String name, final BroadcastGroupConfiguration config, final JChannel channel, final String channelName) throws Exception { + static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final String name, final BroadcastGroupConfiguration config, final ChannelFactory channelFactory, final String channelName) throws Exception { final long broadcastPeriod = config.getBroadcastPeriod(); final List connectorRefs = config.getConnectorInfos(); - final BroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName); + final BroadcastEndpointFactory endpointFactory = new JGroupsBroadcastEndpointFactory(channelName, channelFactory); return new BroadcastGroupConfiguration() .setName(name) diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java index 817abc84a4e5..0015c0fd885b 100644 --- a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/DiscoveryGroupAdd.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; -import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.core.config.Configuration; @@ -49,6 +48,7 @@ import org.jboss.msc.service.ServiceRegistry; import org.jboss.msc.service.ServiceTarget; import org.jgroups.JChannel; +import org.wildfly.clustering.jgroups.spi.ChannelFactory; import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement; /** @@ -149,11 +149,11 @@ static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final Strin } - static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final String name, final DiscoveryGroupConfiguration config, final JChannel channel, final String channelName) throws Exception { + static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final String name, final DiscoveryGroupConfiguration config, final ChannelFactory channelFactory, final String channelName, Map channels) throws Exception { final long refreshTimeout = config.getRefreshTimeout(); final long initialWaitTimeout = config.getDiscoveryInitialWaitTimeout(); - final BroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName); + final BroadcastEndpointFactory endpointFactory = new JGroupsBroadcastEndpointFactory(channelName, channelFactory, channels); return new DiscoveryGroupConfiguration() .setName(name) diff --git a/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/JGroupsBroadcastEndpointFactory.java b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/JGroupsBroadcastEndpointFactory.java new file mode 100644 index 000000000000..e08cc4f75ddf --- /dev/null +++ b/messaging-activemq/src/main/java/org/wildfly/extension/messaging/activemq/JGroupsBroadcastEndpointFactory.java @@ -0,0 +1,79 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2017, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.wildfly.extension.messaging.activemq; + +import java.util.Map; + +import org.apache.activemq.artemis.api.core.BroadcastEndpoint; +import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper; +import org.jgroups.JChannel; +import org.wildfly.clustering.jgroups.spi.ChannelFactory; + +/** + * @author Jeff Mesnil (c) 2017 Red Hat inc. + */ +class JGroupsBroadcastEndpointFactory implements BroadcastEndpointFactory { + private final String channelName; + private final ChannelFactory channelFactory; + // can be null + private Map channels; + + /** + * @param channels a Map of <channel names, JChannel> can will be filled with channels created from the broadcast endpoints + */ + public JGroupsBroadcastEndpointFactory(String channelName, ChannelFactory channelFactory, Map channels) { + this.channelName = channelName; + this.channelFactory = channelFactory; + this.channels = channels; + } + + public JGroupsBroadcastEndpointFactory(String channelName, ChannelFactory channelFactory) { + this(channelName, channelFactory, null); + } + + @Override + public BroadcastEndpoint createBroadcastEndpoint() throws Exception { + JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint(JChannelManager.getInstance(), channelName) { + @Override + public JChannel createChannel() throws Exception { + JChannel channel = (JChannel) channelFactory.createChannel(channelName); + if (channels != null) { + channels.put(channelName, channel); + } + return channel; + } + + @Override + protected void internalCloseChannel(JChannelWrapper channel) { + if (channels != null) { + channels.remove(channelName); + } + super.internalCloseChannel(channel); + } + }; + return endpoint.initChannel(); + } +}