Skip to content

Commit

Permalink
WFLY-9940 Reduce metaspace usage in wildfly-clustering-singleton-exte…
Browse files Browse the repository at this point in the history
…nsion module.
  • Loading branch information
pferraro committed Mar 2, 2018
1 parent bf49295 commit 41b0c80
Show file tree
Hide file tree
Showing 39 changed files with 266 additions and 199 deletions.
Expand Up @@ -79,11 +79,11 @@
* all of which will share the same {@link MessageDispatcher} instance. * all of which will share the same {@link MessageDispatcher} instance.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class ChannelCommandDispatcherFactory implements AutoCloseableCommandDispatcherFactory, RequestHandler, org.wildfly.clustering.server.group.Group<Address>, MembershipListener { public class ChannelCommandDispatcherFactory implements AutoCloseableCommandDispatcherFactory, RequestHandler, org.wildfly.clustering.server.group.Group<Address>, MembershipListener, Runnable {


private static ThreadFactory createThreadFactory(Class<?> targetClass) { private static ThreadFactory createThreadFactory(Class<?> targetClass) {
PrivilegedAction<ThreadFactory> action = () -> new JBossThreadFactory(new ThreadGroup(targetClass.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null); PrivilegedAction<ThreadFactory> action = () -> new ClassLoaderThreadFactory(new JBossThreadFactory(new ThreadGroup(targetClass.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null), targetClass.getClassLoader());
return new ClassLoaderThreadFactory(WildFlySecurityManager.doUnchecked(action), targetClass.getClassLoader()); return WildFlySecurityManager.doUnchecked(action);
} }


private final ConcurrentMap<Address, Node> members = new ConcurrentHashMap<>(); private final ConcurrentMap<Address, Node> members = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -114,19 +114,22 @@ public ChannelCommandDispatcherFactory(ChannelCommandDispatcherFactoryConfigurat
this.view.compareAndSet(null, channel.getView()); this.view.compareAndSet(null, channel.getView());
} }


@Override
public void run() {
this.dispatcher.stop();
this.dispatcher.getChannel().setUpHandler(null);
// Cleanup any stray listeners
for (ExecutorService executor : this.listeners.values()) {
PrivilegedAction<List<Runnable>> action = () -> executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
}
this.listeners.clear();
this.executorService.shutdownNow();
}

@Override @Override
public void close() { public void close() {
this.executor.close(() -> { this.executor.close(this);
this.dispatcher.stop();
this.dispatcher.getChannel().setUpHandler(null);
// Cleanup any stray listeners
this.listeners.values().forEach(executor -> {
PrivilegedAction<List<Runnable>> action = () -> executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
});
this.listeners.clear();
this.executorService.shutdownNow();
});
} }


@Override @Override
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream;


import org.jboss.as.clustering.controller.CapabilityServiceBuilder; import org.jboss.as.clustering.controller.CapabilityServiceBuilder;
import org.jboss.as.clustering.function.Consumers; import org.jboss.as.clustering.function.Consumers;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.wildfly.clustering.marshalling.jboss.SimpleMarshallingContextFactory; import org.wildfly.clustering.marshalling.jboss.SimpleMarshallingContextFactory;
import org.wildfly.clustering.service.AsynchronousServiceBuilder; import org.wildfly.clustering.service.AsynchronousServiceBuilder;
import org.wildfly.clustering.service.Builder; import org.wildfly.clustering.service.Builder;
import org.wildfly.clustering.service.CompositeDependency;
import org.wildfly.clustering.service.InjectedValueDependency; import org.wildfly.clustering.service.InjectedValueDependency;
import org.wildfly.clustering.service.SuppliedValueService; import org.wildfly.clustering.service.SuppliedValueService;
import org.wildfly.clustering.service.ValueDependency; import org.wildfly.clustering.service.ValueDependency;
Expand All @@ -60,7 +60,7 @@
* Builds a channel-based {@link org.wildfly.clustering.dispatcher.CommandDispatcherFactory} service. * Builds a channel-based {@link org.wildfly.clustering.dispatcher.CommandDispatcherFactory} service.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class ChannelCommandDispatcherFactoryBuilder implements CapabilityServiceBuilder<CommandDispatcherFactory>, ChannelCommandDispatcherFactoryConfiguration, MarshallingConfigurationContext { public class ChannelCommandDispatcherFactoryBuilder implements CapabilityServiceBuilder<CommandDispatcherFactory>, ChannelCommandDispatcherFactoryConfiguration, MarshallingConfigurationContext, Supplier<AutoCloseableCommandDispatcherFactory> {


enum MarshallingVersion implements Function<MarshallingConfigurationContext, MarshallingConfiguration> { enum MarshallingVersion implements Function<MarshallingConfigurationContext, MarshallingConfiguration> {
VERSION_1() { VERSION_1() {
Expand Down Expand Up @@ -100,6 +100,11 @@ public ChannelCommandDispatcherFactoryBuilder(ServiceName name, String group) {
this.group = group; this.group = group;
} }


@Override
public AutoCloseableCommandDispatcherFactory get() {
return new ManagedCommandDispatcherFactory(new ChannelCommandDispatcherFactory(this));
}

@Override @Override
public ServiceName getServiceName() { public ServiceName getServiceName() {
return this.name; return this.name;
Expand All @@ -115,14 +120,12 @@ public Builder<CommandDispatcherFactory> configure(CapabilityServiceSupport supp


@Override @Override
public ServiceBuilder<CommandDispatcherFactory> build(ServiceTarget target) { public ServiceBuilder<CommandDispatcherFactory> build(ServiceTarget target) {
Supplier<AutoCloseableCommandDispatcherFactory> supplier = () -> new ManagedCommandDispatcherFactory(new ChannelCommandDispatcherFactory(this)); Service<CommandDispatcherFactory> service = new SuppliedValueService<>(Functions.identity(), this, Consumers.close());
Service<CommandDispatcherFactory> service = new SuppliedValueService<>(Functions.identity(), supplier, Consumers.close());
ServiceBuilder<CommandDispatcherFactory> builder = new AsynchronousServiceBuilder<>(this.name, service).build(target) ServiceBuilder<CommandDispatcherFactory> builder = new AsynchronousServiceBuilder<>(this.name, service).build(target)
.addDependency(Services.JBOSS_SERVICE_MODULE_LOADER, ModuleLoader.class, this.loader) .addDependency(Services.JBOSS_SERVICE_MODULE_LOADER, ModuleLoader.class, this.loader)
.setInitialMode(ServiceController.Mode.PASSIVE) .setInitialMode(ServiceController.Mode.PASSIVE)
; ;
Stream.of(this.channel, this.channelFactory, this.module).forEach(dependency -> dependency.register(builder)); return new CompositeDependency(this.channel, this.channelFactory, this.module).register(builder);
return builder;
} }


public ChannelCommandDispatcherFactoryBuilder timeout(long value, TimeUnit unit) { public ChannelCommandDispatcherFactoryBuilder timeout(long value, TimeUnit unit) {
Expand Down
Expand Up @@ -32,6 +32,6 @@
public class ChannelCommandDispatcherFactoryBuilderProvider extends CommandDispatcherFactoryBuilderProvider implements DistributedGroupBuilderProvider { public class ChannelCommandDispatcherFactoryBuilderProvider extends CommandDispatcherFactoryBuilderProvider implements DistributedGroupBuilderProvider {


public ChannelCommandDispatcherFactoryBuilderProvider() { public ChannelCommandDispatcherFactoryBuilderProvider() {
super((name, group) -> new ChannelCommandDispatcherFactoryBuilder(name, group)); super(ChannelCommandDispatcherFactoryBuilder::new);
} }
} }
Expand Up @@ -44,7 +44,7 @@
* Builds a non-clustered {@link org.wildfly.clustering.dispatcher.CommandDispatcherFactory} service. * Builds a non-clustered {@link org.wildfly.clustering.dispatcher.CommandDispatcherFactory} service.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class LocalCommandDispatcherFactoryBuilder implements CapabilityServiceBuilder<CommandDispatcherFactory> { public class LocalCommandDispatcherFactoryBuilder implements CapabilityServiceBuilder<CommandDispatcherFactory>, Supplier<AutoCloseableCommandDispatcherFactory> {


private final ServiceName name; private final ServiceName name;
private final String groupName; private final String groupName;
Expand All @@ -56,6 +56,11 @@ public LocalCommandDispatcherFactoryBuilder(ServiceName name, String groupName)
this.groupName = groupName; this.groupName = groupName;
} }


@Override
public AutoCloseableCommandDispatcherFactory get() {
return new ManagedCommandDispatcherFactory(new LocalCommandDispatcherFactory(this.group.getValue()));
}

@Override @Override
public ServiceName getServiceName() { public ServiceName getServiceName() {
return this.name; return this.name;
Expand All @@ -69,8 +74,7 @@ public Builder<CommandDispatcherFactory> configure(CapabilityServiceSupport supp


@Override @Override
public ServiceBuilder<CommandDispatcherFactory> build(ServiceTarget target) { public ServiceBuilder<CommandDispatcherFactory> build(ServiceTarget target) {
Supplier<AutoCloseableCommandDispatcherFactory> supplier = () -> new ManagedCommandDispatcherFactory(new LocalCommandDispatcherFactory(this.group.getValue())); Service<CommandDispatcherFactory> service = new SuppliedValueService<>(Functions.identity(), this, Consumers.close());
Service<CommandDispatcherFactory> service = new SuppliedValueService<>(Functions.identity(), supplier, Consumers.close());
return this.group.register(target.addService(this.name, service).setInitialMode(ServiceController.Mode.ON_DEMAND)); return this.group.register(target.addService(this.name, service).setInitialMode(ServiceController.Mode.ON_DEMAND));
} }
} }
Expand Up @@ -32,6 +32,6 @@
public class LocalCommandDispatcherFactoryBuilderProvider extends CommandDispatcherFactoryBuilderProvider implements LocalGroupBuilderProvider { public class LocalCommandDispatcherFactoryBuilderProvider extends CommandDispatcherFactoryBuilderProvider implements LocalGroupBuilderProvider {


public LocalCommandDispatcherFactoryBuilderProvider() { public LocalCommandDispatcherFactoryBuilderProvider() {
super((name, group) -> new LocalCommandDispatcherFactoryBuilder(name, group)); super(LocalCommandDispatcherFactoryBuilder::new);
} }
} }
Expand Up @@ -85,10 +85,10 @@ public void close() {
this.cache.removeListener(this); this.cache.removeListener(this);
this.cache.getCacheManager().removeListener(this); this.cache.getCacheManager().removeListener(this);
// Cleanup any unregistered listeners // Cleanup any unregistered listeners
this.listeners.values().forEach(executor -> { for (ExecutorService executor : this.listeners.values()) {
PrivilegedAction<List<Runnable>> action = () -> executor.shutdownNow(); PrivilegedAction<List<Runnable>> action = () -> executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action); WildFlySecurityManager.doUnchecked(action);
}); }
this.listeners.clear(); this.listeners.clear();
} }


Expand Down
Expand Up @@ -22,7 +22,6 @@
package org.wildfly.clustering.server.group; package org.wildfly.clustering.server.group;


import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.jboss.as.clustering.controller.CapabilityServiceBuilder; import org.jboss.as.clustering.controller.CapabilityServiceBuilder;
Expand All @@ -34,9 +33,11 @@
import org.jboss.msc.service.ServiceController; import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget; import org.jboss.msc.service.ServiceTarget;
import org.jgroups.Address;
import org.wildfly.clustering.group.Group; import org.wildfly.clustering.group.Group;
import org.wildfly.clustering.infinispan.spi.InfinispanCacheRequirement; import org.wildfly.clustering.infinispan.spi.InfinispanCacheRequirement;
import org.wildfly.clustering.service.Builder; import org.wildfly.clustering.service.Builder;
import org.wildfly.clustering.service.CompositeDependency;
import org.wildfly.clustering.service.InjectedValueDependency; import org.wildfly.clustering.service.InjectedValueDependency;
import org.wildfly.clustering.service.SuppliedValueService; import org.wildfly.clustering.service.SuppliedValueService;
import org.wildfly.clustering.service.ValueDependency; import org.wildfly.clustering.service.ValueDependency;
Expand All @@ -47,42 +48,44 @@
* Builds a {@link Group} service for a cache. * Builds a {@link Group} service for a cache.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class CacheGroupBuilder implements CapabilityServiceBuilder<Group>, CacheGroupConfiguration { public class CacheGroupBuilder implements CapabilityServiceBuilder<Group>, CacheGroupConfiguration, Supplier<CacheGroup> {


private final ServiceName name; private final ServiceName name;
private final String containerName; private final String containerName;
private final String cacheName; private final String cacheName;


@SuppressWarnings("rawtypes") private volatile ValueDependency<Cache<?, ?>> cache;
private volatile ValueDependency<Cache> cache; private volatile ValueDependency<NodeFactory<Address>> factory;
@SuppressWarnings("rawtypes")
private volatile ValueDependency<NodeFactory> factory;


public CacheGroupBuilder(ServiceName name, String containerName, String cacheName) { public CacheGroupBuilder(ServiceName name, String containerName, String cacheName) {
this.name = name; this.name = name;
this.containerName = containerName; this.containerName = containerName;
this.cacheName = cacheName; this.cacheName = cacheName;
} }


@Override
public CacheGroup get() {
return new CacheGroup(this);
}

@Override @Override
public ServiceName getServiceName() { public ServiceName getServiceName() {
return this.name; return this.name;
} }


@SuppressWarnings("unchecked")
@Override @Override
public Builder<Group> configure(CapabilityServiceSupport support) { public Builder<Group> configure(CapabilityServiceSupport support) {
this.cache = new InjectedValueDependency<>(InfinispanCacheRequirement.CACHE.getServiceName(support, this.containerName, this.cacheName), Cache.class); this.cache = new InjectedValueDependency<>(InfinispanCacheRequirement.CACHE.getServiceName(support, this.containerName, this.cacheName), (Class<Cache<?, ?>>) (Class<?>) Cache.class);
this.factory = new InjectedValueDependency<>(ClusteringRequirement.GROUP.getServiceName(support, this.containerName), NodeFactory.class); this.factory = new InjectedValueDependency<>(ClusteringRequirement.GROUP.getServiceName(support, this.containerName), (Class<NodeFactory<Address>>) (Class<?>) NodeFactory.class);
return this; return this;
} }


@Override @Override
public ServiceBuilder<Group> build(ServiceTarget target) { public ServiceBuilder<Group> build(ServiceTarget target) {
Supplier<CacheGroup> supplier = () -> new CacheGroup(this); Service<Group> service = new SuppliedValueService<>(Functions.identity(), this, Consumers.close());
Service<Group> service = new SuppliedValueService<>(Functions.identity(), supplier, Consumers.close());
ServiceBuilder<Group> builder = target.addService(this.name, service).setInitialMode(ServiceController.Mode.ON_DEMAND); ServiceBuilder<Group> builder = target.addService(this.name, service).setInitialMode(ServiceController.Mode.ON_DEMAND);
Stream.of(this.cache, this.factory).forEach(dependency -> dependency.register(builder)); return new CompositeDependency(this.cache, this.factory).register(builder);
return builder;
} }


@Override @Override
Expand All @@ -91,7 +94,7 @@ public ServiceBuilder<Group> build(ServiceTarget target) {
} }


@Override @Override
public NodeFactory<org.jgroups.Address> getMemberFactory() { public NodeFactory<Address> getMemberFactory() {
return this.factory.getValue(); return this.factory.getValue();
} }
} }
Expand Up @@ -40,7 +40,7 @@
* Builds a channel-based {@link Group} service. * Builds a channel-based {@link Group} service.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class ChannelGroupBuilder implements CapabilityServiceBuilder<Group> { public class ChannelGroupBuilder implements CapabilityServiceBuilder<Group>, Value<Group> {


private final ServiceName name; private final ServiceName name;
private final String group; private final String group;
Expand All @@ -52,6 +52,11 @@ public ChannelGroupBuilder(ServiceName name, String group) {
this.group = group; this.group = group;
} }


@Override
public Group getValue() {
return this.factory.getValue().getGroup();
}

@Override @Override
public ServiceName getServiceName() { public ServiceName getServiceName() {
return this.name; return this.name;
Expand All @@ -65,7 +70,6 @@ public Builder<Group> configure(CapabilityServiceSupport support) {


@Override @Override
public ServiceBuilder<Group> build(ServiceTarget target) { public ServiceBuilder<Group> build(ServiceTarget target) {
Value<Group> value = () -> this.factory.getValue().getGroup(); return this.factory.register(target.addService(this.getServiceName(), new ValueService<>(this)).setInitialMode(ServiceController.Mode.ON_DEMAND));
return this.factory.register(target.addService(this.getServiceName(), new ValueService<>(value)).setInitialMode(ServiceController.Mode.ON_DEMAND));
} }
} }
Expand Up @@ -34,6 +34,6 @@
public class DistributedCacheGroupBuilderProvider extends CacheGroupBuilderProvider implements org.wildfly.clustering.spi.DistributedCacheBuilderProvider { public class DistributedCacheGroupBuilderProvider extends CacheGroupBuilderProvider implements org.wildfly.clustering.spi.DistributedCacheBuilderProvider {


public DistributedCacheGroupBuilderProvider() { public DistributedCacheGroupBuilderProvider() {
super((name, containerName, cacheName) -> new CacheGroupBuilder(name, containerName, cacheName)); super(CacheGroupBuilder::new);
} }
} }
Expand Up @@ -21,44 +21,44 @@
*/ */
package org.wildfly.clustering.server.group; package org.wildfly.clustering.server.group;


import java.util.function.Function;

import org.jboss.as.clustering.controller.CapabilityServiceBuilder; import org.jboss.as.clustering.controller.CapabilityServiceBuilder;
import org.jboss.as.server.ServerEnvironment; import org.jboss.as.server.ServerEnvironment;
import org.jboss.as.server.ServerEnvironmentService; import org.jboss.as.server.ServerEnvironmentService;
import org.jboss.msc.service.ServiceBuilder; import org.jboss.msc.service.ServiceBuilder;
import org.jboss.msc.service.ServiceController; import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName; import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget; import org.jboss.msc.service.ServiceTarget;
import org.jboss.msc.value.InjectedValue; import org.jboss.msc.service.ValueService;
import org.jboss.msc.value.Value;
import org.wildfly.clustering.group.Group; import org.wildfly.clustering.group.Group;
import org.wildfly.clustering.service.MappedValueService; import org.wildfly.clustering.service.InjectedValueDependency;
import org.wildfly.clustering.service.ValueDependency;


/** /**
* Builds a non-clustered {@link Group} service. * Builds a non-clustered {@link Group} service.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class LocalGroupBuilder implements CapabilityServiceBuilder<Group> { public class LocalGroupBuilder implements CapabilityServiceBuilder<Group>, Value<Group> {


private final ServiceName name; private final ServiceName name;

private final ValueDependency<ServerEnvironment> environment = new InjectedValueDependency<>(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class);
private final InjectedValue<ServerEnvironment> environment = new InjectedValue<>();


public LocalGroupBuilder(ServiceName name) { public LocalGroupBuilder(ServiceName name) {
this.name = name; this.name = name;
} }


@Override
public Group getValue() {
return new LocalGroup(new LocalNode(this.environment.getValue().getNodeName()));
}

@Override @Override
public ServiceName getServiceName() { public ServiceName getServiceName() {
return this.name; return this.name;
} }


@Override @Override
public ServiceBuilder<Group> build(ServiceTarget target) { public ServiceBuilder<Group> build(ServiceTarget target) {
Function<ServerEnvironment, Group> mapper = environment -> new LocalGroup(new LocalNode(environment.getNodeName())); return this.environment.register(target.addService(this.name, new ValueService<>(this)).setInitialMode(ServiceController.Mode.ON_DEMAND));
return target.addService(this.name, new MappedValueService<>(mapper, this.environment))
.addDependency(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class, this.environment)
.setInitialMode(ServiceController.Mode.ON_DEMAND)
;
} }
} }
Expand Up @@ -21,7 +21,6 @@
*/ */
package org.wildfly.clustering.server.provider; package org.wildfly.clustering.server.provider;


import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Collection; import java.util.Collection;
Expand Down Expand Up @@ -71,9 +70,8 @@
public class CacheServiceProviderRegistry<T> implements ServiceProviderRegistry<T>, GroupListener, AutoCloseable { public class CacheServiceProviderRegistry<T> implements ServiceProviderRegistry<T>, GroupListener, AutoCloseable {


private static ThreadFactory createThreadFactory(Class<?> targetClass) { private static ThreadFactory createThreadFactory(Class<?> targetClass) {
PrivilegedAction<ThreadFactory> action = () -> new JBossThreadFactory(new ThreadGroup(targetClass.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null); PrivilegedAction<ThreadFactory> action = () -> new ClassLoaderThreadFactory(new JBossThreadFactory(new ThreadGroup(targetClass.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null), targetClass.getClassLoader());
return new ClassLoaderThreadFactory(WildFlySecurityManager.doUnchecked(action), return WildFlySecurityManager.doUnchecked(action);
AccessController.doPrivileged((PrivilegedAction<ClassLoader>) () -> targetClass.getClassLoader()));
} }


private final ConcurrentMap<T, Map.Entry<Listener, ExecutorService>> listeners = new ConcurrentHashMap<>(); private final ConcurrentMap<T, Map.Entry<Listener, ExecutorService>> listeners = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -210,7 +208,9 @@ public void membershipChanged(Membership previousMembership, Membership membersh
try { try {
Collection<T> services = this.dispatcher.executeOnNode(new GetLocalServicesCommand<>(), joinedMember).get(); Collection<T> services = this.dispatcher.executeOnNode(new GetLocalServicesCommand<>(), joinedMember).get();
try (Batch batch = this.batcher.createBatch()) { try (Batch batch = this.batcher.createBatch()) {
services.forEach(service -> this.register(joinedMember, service)); for (T service : services) {
this.register(joinedMember, service);
}
} }
} catch (Exception e) { } catch (Exception e) {
ClusteringServerLogger.ROOT_LOGGER.warn(e.getLocalizedMessage(), e); ClusteringServerLogger.ROOT_LOGGER.warn(e.getLocalizedMessage(), e);
Expand Down

0 comments on commit 41b0c80

Please sign in to comment.