Skip to content

Commit

Permalink
[WFLY-9004] Artemis leaks JGroups channels on reload
Browse files Browse the repository at this point in the history
Refactor messaging-activemq subsystem code to create channels on demand
every time Artemis creates a JGroupsBroadcastEndpoint.

The ActiveMQServerServices track only the channels created by the discovery
groups since they can be referenced by Artemis RA (when the RA is using
JGroups to discover the Artemis servers).

JIRA: https://issues.jboss.org/browse/WFLY-9004
  • Loading branch information
jmesnil committed Jul 21, 2017
1 parent fe7be83 commit e6abeb2
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 17 deletions.
Expand Up @@ -284,9 +284,7 @@ public synchronized void start(final StartContext context) throws StartException
if (jgroupFactories.containsKey(key)) { if (jgroupFactories.containsKey(key)) {
ChannelFactory channelFactory = jgroupFactories.get(key); ChannelFactory channelFactory = jgroupFactories.get(key);
String channelName = jgroupsChannels.get(key); String channelName = jgroupsChannels.get(key);
JChannel channel = (JChannel) channelFactory.createChannel(channelName); newConfigs.add(BroadcastGroupAdd.createBroadcastGroupConfiguration(name, config, channelFactory, channelName));
channels.put(channelName, channel);
newConfigs.add(BroadcastGroupAdd.createBroadcastGroupConfiguration(name, config, channel, channelName));
} else { } else {
final SocketBinding binding = groupBindings.get(key); final SocketBinding binding = groupBindings.get(key);
if (binding == null) { if (binding == null) {
Expand All @@ -304,16 +302,11 @@ public synchronized void start(final StartContext context) throws StartException
for(final Map.Entry<String, DiscoveryGroupConfiguration> entry : discoveryGroups.entrySet()) { for(final Map.Entry<String, DiscoveryGroupConfiguration> entry : discoveryGroups.entrySet()) {
final String name = entry.getKey(); final String name = entry.getKey();
final String key = "discovery" + name; final String key = "discovery" + name;
DiscoveryGroupConfiguration config = null; final DiscoveryGroupConfiguration config;
if (jgroupFactories.containsKey(key)) { if (jgroupFactories.containsKey(key)) {
ChannelFactory channelFactory = jgroupFactories.get(key); ChannelFactory channelFactory = jgroupFactories.get(key);
String channelName = jgroupsChannels.get(key); String channelName = jgroupsChannels.get(key);
JChannel channel = channels.get(channelName); config = DiscoveryGroupAdd.createDiscoveryGroupConfiguration(name, entry.getValue(), channelFactory, channelName, channels);
if (channel == null) {
channel = (JChannel) channelFactory.createChannel(channelName);
channels.put(channelName, channel);
}
config = DiscoveryGroupAdd.createDiscoveryGroupConfiguration(name, entry.getValue(), channel, channelName);
} else { } else {
final SocketBinding binding = groupBindings.get(key); final SocketBinding binding = groupBindings.get(key);
if (binding == null) { if (binding == null) {
Expand Down
Expand Up @@ -34,7 +34,6 @@


import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.jboss.as.controller.AbstractAddStepHandler; import org.jboss.as.controller.AbstractAddStepHandler;
Expand All @@ -52,7 +51,7 @@
import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceRegistry; import org.jboss.msc.service.ServiceRegistry;
import org.jboss.msc.service.ServiceTarget; import org.jboss.msc.service.ServiceTarget;
import org.jgroups.JChannel; import org.wildfly.clustering.jgroups.spi.ChannelFactory;
import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement; import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement;
import org.wildfly.extension.messaging.activemq.logging.MessagingLogger; import org.wildfly.extension.messaging.activemq.logging.MessagingLogger;


Expand Down Expand Up @@ -176,12 +175,12 @@ static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final Strin
.setEndpointFactory(endpointFactory); .setEndpointFactory(endpointFactory);
} }


static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final String name, final BroadcastGroupConfiguration config, final JChannel channel, final String channelName) throws Exception { static BroadcastGroupConfiguration createBroadcastGroupConfiguration(final String name, final BroadcastGroupConfiguration config, final ChannelFactory channelFactory, final String channelName) throws Exception {


final long broadcastPeriod = config.getBroadcastPeriod(); final long broadcastPeriod = config.getBroadcastPeriod();
final List<String> connectorRefs = config.getConnectorInfos(); final List<String> connectorRefs = config.getConnectorInfos();


final BroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName); final BroadcastEndpointFactory endpointFactory = new JGroupsBroadcastEndpointFactory(channelName, channelFactory);


return new BroadcastGroupConfiguration() return new BroadcastGroupConfiguration()
.setName(name) .setName(name)
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.util.Map; import java.util.Map;


import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
Expand All @@ -49,6 +48,7 @@
import org.jboss.msc.service.ServiceRegistry; import org.jboss.msc.service.ServiceRegistry;
import org.jboss.msc.service.ServiceTarget; import org.jboss.msc.service.ServiceTarget;
import org.jgroups.JChannel; import org.jgroups.JChannel;
import org.wildfly.clustering.jgroups.spi.ChannelFactory;
import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement; import org.wildfly.clustering.jgroups.spi.JGroupsDefaultRequirement;


/** /**
Expand Down Expand Up @@ -149,11 +149,11 @@ static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final Strin
} }




static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final String name, final DiscoveryGroupConfiguration config, final JChannel channel, final String channelName) throws Exception { static DiscoveryGroupConfiguration createDiscoveryGroupConfiguration(final String name, final DiscoveryGroupConfiguration config, final ChannelFactory channelFactory, final String channelName, Map<String, JChannel> channels) throws Exception {
final long refreshTimeout = config.getRefreshTimeout(); final long refreshTimeout = config.getRefreshTimeout();
final long initialWaitTimeout = config.getDiscoveryInitialWaitTimeout(); final long initialWaitTimeout = config.getDiscoveryInitialWaitTimeout();


final BroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName); final BroadcastEndpointFactory endpointFactory = new JGroupsBroadcastEndpointFactory(channelName, channelFactory, channels);


return new DiscoveryGroupConfiguration() return new DiscoveryGroupConfiguration()
.setName(name) .setName(name)
Expand Down
@@ -0,0 +1,79 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2017, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.extension.messaging.activemq;

import java.util.Map;

import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
import org.jgroups.JChannel;
import org.wildfly.clustering.jgroups.spi.ChannelFactory;

/**
* @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2017 Red Hat inc.
*/
class JGroupsBroadcastEndpointFactory implements BroadcastEndpointFactory {
private final String channelName;
private final ChannelFactory channelFactory;
// can be null
private Map<String, JChannel> channels;

/**
* @param channels a Map of &lt;channel names, JChannel&gt; can will be filled with channels created from the broadcast endpoints
*/
public JGroupsBroadcastEndpointFactory(String channelName, ChannelFactory channelFactory, Map<String, JChannel> channels) {
this.channelName = channelName;
this.channelFactory = channelFactory;
this.channels = channels;
}

public JGroupsBroadcastEndpointFactory(String channelName, ChannelFactory channelFactory) {
this(channelName, channelFactory, null);
}

@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint(JChannelManager.getInstance(), channelName) {
@Override
public JChannel createChannel() throws Exception {
JChannel channel = (JChannel) channelFactory.createChannel(channelName);
if (channels != null) {
channels.put(channelName, channel);
}
return channel;
}

@Override
protected void internalCloseChannel(JChannelWrapper channel) {
if (channels != null) {
channels.remove(channelName);
}
super.internalCloseChannel(channel);
}
};
return endpoint.initChannel();
}
}

0 comments on commit e6abeb2

Please sign in to comment.