Skip to content

Commit

Permalink
WFLY-10748 Custom JGroups SocketFactory needs to invoke SocketBinding…
Browse files Browse the repository at this point in the history
…Manager methods using the name of the originating socket-binding.
  • Loading branch information
pferraro committed Jul 27, 2018
1 parent c80c2b1 commit 305c640
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 66 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.jboss.as.controller.services.path.PathManager;
import org.jboss.as.naming.NamingStore;
import org.jboss.as.naming.service.NamingService;
import org.jboss.as.network.SocketBindingManager;
import org.wildfly.clustering.service.Requirement;

/**
Expand All @@ -38,6 +39,7 @@ public enum CommonRequirement implements Requirement, ServiceNameFactoryProvider
MBEAN_SERVER("org.wildfly.management.jmx", MBeanServer.class),
NAMING_STORE(NamingService.CAPABILITY_NAME, NamingStore.class),
PATH_MANAGER("org.wildfly.management.path-manager", PathManager.class),
SOCKET_BINDING_MANAGER("org.wildfly.management.socket-binding-manager", SocketBindingManager.class),
;
private final String name;
private final Class<?> type;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Stream;

import org.jboss.as.clustering.jgroups.logging.JGroupsLogger;
import org.jboss.as.network.SocketBindingManager;
import org.jgroups.JChannel;
import org.jgroups.fork.ForkChannel;
import org.jgroups.protocols.TP;
Expand Down Expand Up @@ -119,5 +120,10 @@ public String getNodeName() {
public Optional<RelayConfiguration> getRelay() {
return this.parentStack.getRelay();
}

@Override
public SocketBindingManager getSocketBindingManager() {
return this.parentStack.getSocketBindingManager();
}
}
}
Expand Up @@ -25,8 +25,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jboss.as.network.SocketBinding;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.RequestCorrelator;
Expand Down Expand Up @@ -92,6 +95,7 @@ private Object handle(Message message) {
}
});

Map<String, SocketBinding> bindings = new HashMap<>();
// Transport always resides at the bottom of the stack
List<ProtocolConfiguration<? extends Protocol>> transports = Collections.singletonList(this.configuration.getTransport());
// Add RELAY2 to the top of the stack, if defined
Expand All @@ -100,11 +104,15 @@ private Object handle(Message message) {
for (List<ProtocolConfiguration<? extends Protocol>> protocolConfigs : Arrays.asList(transports, this.configuration.getProtocols(), relays)) {
for (ProtocolConfiguration<? extends Protocol> protocolConfig : protocolConfigs) {
protocols.add(protocolConfig.createProtocol(this.configuration));
bindings.putAll(protocolConfig.getSocketBindings());
}
}
// Add implicit FORK to the top of the stack
protocols.add(fork);

// Override the SocketFactory of the transport
protocols.get(0).setSocketFactory(new ManagedSocketFactory(this.configuration.getSocketBindingManager(), bindings));

JChannel channel = new JChannel(protocols);

channel.setName(this.configuration.getNodeName());
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.net.SocketException;
import java.util.Map;

import org.jboss.as.network.SocketBinding;
import org.jboss.as.network.SocketBindingManager;
import org.jgroups.util.SocketFactory;

Expand All @@ -42,64 +43,84 @@
public class ManagedSocketFactory implements SocketFactory {

private final SocketBindingManager manager;
// Maps a JGroups service name its associated SocketBinding
private final Map<String, SocketBinding> socketBindings;

public ManagedSocketFactory(SocketBindingManager manager) {
public ManagedSocketFactory(SocketBindingManager manager, Map<String, SocketBinding> socketBindings) {
this.manager = manager;
this.socketBindings = socketBindings;
}

private String getSocketBindingName(String name) {
SocketBinding socketBinding = this.socketBindings.get(name);
return (socketBinding != null) ? socketBinding.getName() : name;
}

@Override
public Socket createSocket(String name) throws IOException {
return this.manager.getSocketFactory().createSocket(name);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getSocketFactory().createSocket(socketBindingName);
}

@Override
public Socket createSocket(String name, String host, int port) throws IOException {
return this.manager.getSocketFactory().createSocket(name, host, port);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getSocketFactory().createSocket(socketBindingName, host, port);
}

@Override
public Socket createSocket(String name, InetAddress address, int port) throws IOException {
return this.manager.getSocketFactory().createSocket(name, address, port);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getSocketFactory().createSocket(socketBindingName, address, port);
}

@Override
public Socket createSocket(String name, String host, int port, InetAddress localHost, int localPort) throws IOException {
return this.manager.getSocketFactory().createSocket(name, host, port, localHost, localPort);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getSocketFactory().createSocket(socketBindingName, host, port, localHost, localPort);
}

@Override
public Socket createSocket(String name, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException {
return this.manager.getSocketFactory().createSocket(name, address, port, localAddress, localPort);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getSocketFactory().createSocket(socketBindingName, address, port, localAddress, localPort);
}

@Override
public ServerSocket createServerSocket(String name) throws IOException {
return this.manager.getServerSocketFactory().createServerSocket(name);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getServerSocketFactory().createServerSocket(socketBindingName);
}

@Override
public ServerSocket createServerSocket(String name, int port) throws IOException {
return this.manager.getServerSocketFactory().createServerSocket(name, port);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getServerSocketFactory().createServerSocket(socketBindingName, port);
}

@Override
public ServerSocket createServerSocket(String name, int port, int backlog) throws IOException {
return this.manager.getServerSocketFactory().createServerSocket(name, port, backlog);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getServerSocketFactory().createServerSocket(socketBindingName, port, backlog);
}

@Override
public ServerSocket createServerSocket(String name, int port, int backlog, InetAddress ifAddress) throws IOException {
return this.manager.getServerSocketFactory().createServerSocket(name, port, backlog, ifAddress);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.getServerSocketFactory().createServerSocket(socketBindingName, port, backlog, ifAddress);
}

@Override
public DatagramSocket createDatagramSocket(String name) throws SocketException {
return this.createDatagramSocket(name, 0);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.createDatagramSocket(socketBindingName);
}

@Override
public DatagramSocket createDatagramSocket(String name, SocketAddress address) throws SocketException {
return (address != null) ? this.manager.createDatagramSocket(name, address) : this.manager.createDatagramSocket(name);
if (address == null) return this.createDatagramSocket(name);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.createDatagramSocket(socketBindingName, address);
}

@Override
Expand All @@ -114,7 +135,8 @@ public DatagramSocket createDatagramSocket(String name, int port, InetAddress ad

@Override
public MulticastSocket createMulticastSocket(String name) throws IOException {
return this.createMulticastSocket(name, 0);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.createMulticastSocket(socketBindingName);
}

@Override
Expand All @@ -124,7 +146,9 @@ public MulticastSocket createMulticastSocket(String name, int port) throws IOExc

@Override
public MulticastSocket createMulticastSocket(String name, SocketAddress address) throws IOException {
return (address != null) ? this.manager.createMulticastSocket(name, address) : this.manager.createMulticastSocket(name);
if (address == null) return this.createMulticastSocket(name);
String socketBindingName = this.getSocketBindingName(name);
return this.manager.createMulticastSocket(socketBindingName, address);
}

@Override
Expand All @@ -150,6 +174,6 @@ public void close(DatagramSocket socket) {

@Override
public Map<Object, String> getSockets() {
return null;
throw new UnsupportedOperationException();
}
}
Expand Up @@ -31,13 +31,15 @@
import java.util.function.Supplier;

import org.jboss.as.clustering.controller.CapabilityServiceNameProvider;
import org.jboss.as.clustering.controller.CommonRequirement;
import org.jboss.as.clustering.controller.ResourceServiceConfigurator;
import org.jboss.as.clustering.jgroups.JChannelFactory;
import org.jboss.as.clustering.jgroups.logging.JGroupsLogger;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.PathAddress;
import org.jboss.as.controller.registry.Resource;
import org.jboss.as.network.SocketBindingManager;
import org.jboss.as.server.ServerEnvironment;
import org.jboss.as.server.ServerEnvironmentService;
import org.jboss.dmr.ModelNode;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class JChannelFactoryServiceConfigurator extends CapabilityServiceNamePro
private volatile SupplierDependency<TransportConfiguration<? extends TP>> transport = null;
private volatile List<SupplierDependency<ProtocolConfiguration<? extends Protocol>>> protocols = null;
private volatile SupplierDependency<RelayConfiguration> relay = null;
private volatile SupplierDependency<SocketBindingManager> socketBindingManager;
private volatile Supplier<ServerEnvironment> environment;

public JChannelFactoryServiceConfigurator(PathAddress address) {
Expand All @@ -81,7 +84,7 @@ public JChannelFactoryServiceConfigurator(PathAddress address) {
@Override
public ServiceBuilder<?> build(ServiceTarget target) {
ServiceBuilder<?> builder = target.addService(this.getServiceName());
Consumer<ChannelFactory> factory = new CompositeDependency(this.transport, this.relay).register(builder).provides(this.getServiceName());
Consumer<ChannelFactory> factory = new CompositeDependency(this.transport, this.relay, this.socketBindingManager).register(builder).provides(this.getServiceName());
this.environment = builder.requires(ServerEnvironmentService.SERVICE_NAME);
for (Dependency dependency : this.protocols) {
dependency.register(builder);
Expand All @@ -108,6 +111,8 @@ public ServiceConfigurator configure(OperationContext context, ModelNode model)
}
this.relay = resource.hasChild(RelayResourceDefinition.PATH) ? new ServiceSupplierDependency<>(new SingletonProtocolServiceNameProvider(this.address, RelayResourceDefinition.PATH)) : null;

this.socketBindingManager = new ServiceSupplierDependency<>(CommonRequirement.SOCKET_BINDING_MANAGER.getServiceName(context));

return this;
}

Expand Down Expand Up @@ -145,4 +150,9 @@ public Optional<RelayConfiguration> getRelay() {
public boolean isStatisticsEnabled() {
return this.statisticsEnabled;
}

@Override
public SocketBindingManager getSocketBindingManager() {
return this.socketBindingManager.get();
}
}
Expand Up @@ -22,6 +22,9 @@

package org.jboss.as.clustering.jgroups.subsystem;

import java.util.Collections;
import java.util.Map;

import org.jboss.as.clustering.controller.CommonUnaryRequirement;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
Expand Down Expand Up @@ -58,6 +61,11 @@ public ServiceConfigurator configure(OperationContext context, ModelNode model)
return super.configure(context, model);
}

@Override
public Map<String, SocketBinding> getSocketBindings() {
return Collections.singletonMap("jgroups.mping.mcast_sock", this.binding.get());
}

@Override
public void accept(MPING protocol) {
SocketBinding binding = this.binding.get();
Expand Down
Expand Up @@ -29,6 +29,9 @@
import static org.jboss.as.clustering.jgroups.subsystem.TransportResourceDefinition.Attribute.SOCKET_BINDING;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.jboss.as.clustering.controller.CommonUnaryRequirement;
import org.jboss.as.clustering.jgroups.ClassLoaderThreadFactory;
Expand Down Expand Up @@ -113,6 +116,19 @@ public String getSite() {
return super.configure(context, model);
}

@Override
public Map<String, SocketBinding> getSocketBindings() {
Map<String, SocketBinding> bindings = new HashMap<>();
SocketBinding binding = this.getSocketBinding();
for (String serviceName : Arrays.asList("jgroups.udp.mcast_sock", "jgroups.udp.sock", "jgroups.tcp.server", "jgroups.nio.server", "jgroups.tunnel.ucast_sock")) {
bindings.put(serviceName, binding);
}
if (this.diagnosticsSocketBinding != null) {
bindings.put("jgroups.tp.diag.mcast_sock", this.diagnosticsSocketBinding.get());
}
return bindings;
}

@Override
public void accept(T protocol) {
InetSocketAddress socketAddress = this.getSocketBinding().getSocketAddress();
Expand All @@ -139,13 +155,12 @@ public void accept(T protocol) {
}
}

@Override
public SocketBinding getSocketBinding() {
return this.socketBinding.get();
}

@Override
public Topology getTopology() {
return this.topology;
}

SocketBinding getSocketBinding() {
return this.socketBinding.get();
}
}

0 comments on commit 305c640

Please sign in to comment.