Skip to content

Commit

Permalink
WFLY-11913 JGroups client sockets are not registered with the socket …
Browse files Browse the repository at this point in the history
…binding manager
  • Loading branch information
pferraro committed May 6, 2019
1 parent 1fbc174 commit 8ff8e60
Show file tree
Hide file tree
Showing 29 changed files with 1,001 additions and 146 deletions.
Expand Up @@ -37,9 +37,10 @@ public enum JGroupsSchema implements Schema<JGroupsSchema> {
VERSION_3_0(3, 0), // WildFly 9 VERSION_3_0(3, 0), // WildFly 9
VERSION_4_0(4, 0), // WildFly 10 VERSION_4_0(4, 0), // WildFly 10
VERSION_5_0(5, 0), // WildFly 11 VERSION_5_0(5, 0), // WildFly 11
VERSION_6_0(6, 0), // WildFly 12 VERSION_6_0(6, 0), // WildFly 12-16
VERSION_7_0(7, 0), // WildFly 17
; ;
public static final JGroupsSchema CURRENT = VERSION_6_0; public static final JGroupsSchema CURRENT = VERSION_7_0;


private final int major; private final int major;
private final int minor; private final int minor;
Expand Down
Expand Up @@ -405,6 +405,12 @@ private void parseTransport(XMLExtendedStreamReader reader, PathAddress stackAdd
break; break;
} }
} }
case CLIENT_SOCKET_BINDING: {
if (this.schema.since(JGroupsSchema.VERSION_7_0)) {
readAttribute(reader, i, operation, SocketTransportResourceDefinition.Attribute.CLIENT_SOCKET_BINDING);
break;
}
}
default: { default: {
this.parseProtocolAttribute(reader, i, operation); this.parseProtocolAttribute(reader, i, operation);
} }
Expand Down Expand Up @@ -470,10 +476,25 @@ private void parseSocketProtocol(XMLExtendedStreamReader reader, PathAddress sta
operations.put(address, operation); operations.put(address, operation);


for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
this.parseSocketProtocolAttribute(reader, i, operation); XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case SOCKET_BINDING: {
readAttribute(reader, i, operation, SocketProtocolResourceDefinition.Attribute.SOCKET_BINDING);
break;
}
case CLIENT_SOCKET_BINDING: {
if (this.schema.since(JGroupsSchema.VERSION_7_0)) {
readAttribute(reader, i, operation, SocketProtocolResourceDefinition.Attribute.CLIENT_SOCKET_BINDING);
break;
}
}
default: {
parseProtocolAttribute(reader, i, operation);
}
}
} }


require(reader, operation, SocketBindingProtocolResourceDefinition.Attribute.SOCKET_BINDING); require(reader, operation, SocketProtocolResourceDefinition.Attribute.SOCKET_BINDING);


while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) { while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) {
this.parseProtocolElement(reader, address, operations); this.parseProtocolElement(reader, address, operations);
Expand All @@ -488,7 +509,16 @@ private void parseSocketDiscoveryProtocol(XMLExtendedStreamReader reader, PathAd
operations.put(address, operation); operations.put(address, operation);


for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
this.parseSocketDiscoveryProtocolAttribute(reader, i, operation); XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case OUTBOUND_SOCKET_BINDINGS: {
readAttribute(reader, i, operation, SocketDiscoveryProtocolResourceDefinition.Attribute.OUTBOUND_SOCKET_BINDINGS);
break;
}
default: {
parseProtocolAttribute(reader, i, operation);
}
}
} }


require(reader, operation, SocketDiscoveryProtocolResourceDefinition.Attribute.OUTBOUND_SOCKET_BINDINGS); require(reader, operation, SocketDiscoveryProtocolResourceDefinition.Attribute.OUTBOUND_SOCKET_BINDINGS);
Expand All @@ -506,7 +536,16 @@ private void parseJDBCProtocol(XMLExtendedStreamReader reader, PathAddress stack
operations.put(address, operation); operations.put(address, operation);


for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
this.parseJDBCProtocolAttribute(reader, i, operation); XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case DATA_SOURCE: {
readAttribute(reader, i, operation, JDBCProtocolResourceDefinition.Attribute.DATA_SOURCE);
break;
}
default: {
parseProtocolAttribute(reader, i, operation);
}
}
} }


require(reader, operation, JDBCProtocolResourceDefinition.Attribute.DATA_SOURCE); require(reader, operation, JDBCProtocolResourceDefinition.Attribute.DATA_SOURCE);
Expand All @@ -524,7 +563,20 @@ private void parseEncryptProtocol(XMLExtendedStreamReader reader, PathAddress st
operations.put(address, operation); operations.put(address, operation);


for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
this.parseEncryptProtocolAttribute(reader, i, operation); XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case KEY_ALIAS: {
readAttribute(reader, i, operation, EncryptProtocolResourceDefinition.Attribute.KEY_ALIAS);
break;
}
case KEY_STORE: {
readAttribute(reader, i, operation, EncryptProtocolResourceDefinition.Attribute.KEY_STORE);
break;
}
default: {
parseProtocolAttribute(reader, i, operation);
}
}
} }


require(reader, operation, EncryptProtocolResourceDefinition.Attribute.KEY_ALIAS, EncryptProtocolResourceDefinition.Attribute.KEY_STORE); require(reader, operation, EncryptProtocolResourceDefinition.Attribute.KEY_ALIAS, EncryptProtocolResourceDefinition.Attribute.KEY_STORE);
Expand Down Expand Up @@ -570,62 +622,6 @@ private void parseProtocol(XMLExtendedStreamReader reader, PathAddress stackAddr
} }
} }


private void parseSocketProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException {
XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index));
switch (attribute) {
case SOCKET_BINDING: {
readAttribute(reader, index, operation, SocketBindingProtocolResourceDefinition.Attribute.SOCKET_BINDING);
break;
}
default: {
parseProtocolAttribute(reader, index, operation);
}
}
}

private void parseSocketDiscoveryProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException {
XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index));
switch (attribute) {
case OUTBOUND_SOCKET_BINDINGS: {
readAttribute(reader, index, operation, SocketDiscoveryProtocolResourceDefinition.Attribute.OUTBOUND_SOCKET_BINDINGS);
break;
}
default: {
parseProtocolAttribute(reader, index, operation);
}
}
}

private void parseJDBCProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException {
XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index));
switch (attribute) {
case DATA_SOURCE: {
readAttribute(reader, index, operation, JDBCProtocolResourceDefinition.Attribute.DATA_SOURCE);
break;
}
default: {
parseProtocolAttribute(reader, index, operation);
}
}
}

private void parseEncryptProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException {
XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index));
switch (attribute) {
case KEY_ALIAS: {
readAttribute(reader, index, operation, EncryptProtocolResourceDefinition.Attribute.KEY_ALIAS);
break;
}
case KEY_STORE: {
readAttribute(reader, index, operation, EncryptProtocolResourceDefinition.Attribute.KEY_STORE);
break;
}
default: {
parseProtocolAttribute(reader, index, operation);
}
}
}

private void parseProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException { private void parseProtocolAttribute(XMLExtendedStreamReader reader, int index, ModelNode operation) throws XMLStreamException {
XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index)); XMLAttribute attribute = XMLAttribute.forName(reader.getAttributeLocalName(index));
switch (attribute) { switch (attribute) {
Expand Down
Expand Up @@ -134,9 +134,9 @@ private static void writeProtocolAttributes(XMLExtendedStreamWriter writer, Prop


String protocol = property.getName(); String protocol = property.getName();
if (containsName(ProtocolRegistration.MulticastProtocol.class, protocol)) { if (containsName(ProtocolRegistration.MulticastProtocol.class, protocol)) {
writeAttributes(writer, property.getValue(), SocketBindingProtocolResourceDefinition.Attribute.class); writeAttributes(writer, property.getValue(), MulticastProtocolResourceDefinition.Attribute.class);
} else if (containsName(ProtocolRegistration.SocketProtocol.class, protocol)) { } else if (containsName(ProtocolRegistration.SocketProtocol.class, protocol)) {
writeAttributes(writer, property.getValue(), OptionalSocketBindingProtocolResourceDefinition.Attribute.class); writeAttributes(writer, property.getValue(), SocketProtocolResourceDefinition.Attribute.class);
} else if (containsName(ProtocolRegistration.JdbcProtocol.class, protocol)) { } else if (containsName(ProtocolRegistration.JdbcProtocol.class, protocol)) {
writeAttributes(writer, property.getValue(), JDBCProtocolResourceDefinition.Attribute.class); writeAttributes(writer, property.getValue(), JDBCProtocolResourceDefinition.Attribute.class);
} else if (containsName(ProtocolRegistration.EncryptProtocol.class, protocol)) { } else if (containsName(ProtocolRegistration.EncryptProtocol.class, protocol)) {
Expand Down
Expand Up @@ -41,7 +41,7 @@
* Resource definition override for protocols that require a socket-binding. * Resource definition override for protocols that require a socket-binding.
* @author Paul Ferraro * @author Paul Ferraro
*/ */
public class SocketBindingProtocolResourceDefinition extends ProtocolResourceDefinition { public class MulticastProtocolResourceDefinition extends ProtocolResourceDefinition {


enum Attribute implements org.jboss.as.clustering.controller.Attribute, UnaryOperator<SimpleAttributeDefinitionBuilder> { enum Attribute implements org.jboss.as.clustering.controller.Attribute, UnaryOperator<SimpleAttributeDefinitionBuilder> {
SOCKET_BINDING(ModelDescriptionConstants.SOCKET_BINDING, ModelType.STRING) { SOCKET_BINDING(ModelDescriptionConstants.SOCKET_BINDING, ModelType.STRING) {
Expand Down Expand Up @@ -86,7 +86,7 @@ public ResourceDescriptor apply(ResourceDescriptor descriptor) {
} }
} }


SocketBindingProtocolResourceDefinition(String name, UnaryOperator<ResourceDescriptor> configurator, ResourceServiceConfiguratorFactory serviceConfiguratorFactory, ResourceServiceConfiguratorFactory parentServiceConfiguratorFactory) { MulticastProtocolResourceDefinition(String name, UnaryOperator<ResourceDescriptor> configurator, ResourceServiceConfiguratorFactory parentServiceConfiguratorFactory) {
super(pathElement(name), new ResourceDescriptorConfigurator(configurator), serviceConfiguratorFactory, parentServiceConfiguratorFactory); super(pathElement(name), new ResourceDescriptorConfigurator(configurator), MulticastSocketProtocolConfigurationServiceConfigurator::new, parentServiceConfiguratorFactory);
} }
} }
Expand Up @@ -22,7 +22,7 @@


package org.jboss.as.clustering.jgroups.subsystem; package org.jboss.as.clustering.jgroups.subsystem;


import static org.jboss.as.clustering.jgroups.subsystem.SocketBindingProtocolResourceDefinition.Attribute.SOCKET_BINDING; import static org.jboss.as.clustering.jgroups.subsystem.MulticastProtocolResourceDefinition.Attribute.SOCKET_BINDING;


import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
Expand Down
Expand Up @@ -113,16 +113,12 @@ static void buildTransformation(ModelVersion version, ResourceTransformationDesc


for (SocketProtocol protocol : EnumSet.allOf(SocketProtocol.class)) { for (SocketProtocol protocol : EnumSet.allOf(SocketProtocol.class)) {
PathElement path = ProtocolResourceDefinition.pathElement(protocol.name()); PathElement path = ProtocolResourceDefinition.pathElement(protocol.name());
if (!JGroupsModel.VERSION_7_0_0.requiresTransformation(version)) { SocketProtocolResourceDefinition.addTransformations(version, parent.addChildResource(path));
OptionalSocketBindingProtocolResourceDefinition.addTransformations(version, parent.addChildResource(path));
}
} }


for (MulticastProtocol protocol : EnumSet.allOf(MulticastProtocol.class)) { for (MulticastProtocol protocol : EnumSet.allOf(MulticastProtocol.class)) {
PathElement path = ProtocolResourceDefinition.pathElement(protocol.name()); PathElement path = ProtocolResourceDefinition.pathElement(protocol.name());
if (!JGroupsModel.VERSION_5_0_0.requiresTransformation(version)) { MulticastProtocolResourceDefinition.addTransformations(version, parent.addChildResource(path));
SocketBindingProtocolResourceDefinition.addTransformations(version, parent.addChildResource(path));
}
} }


for (JdbcProtocol protocol : EnumSet.allOf(JdbcProtocol.class)) { for (JdbcProtocol protocol : EnumSet.allOf(JdbcProtocol.class)) {
Expand Down Expand Up @@ -194,10 +190,10 @@ public void register(ManagementResourceRegistration registration) {


// Override definitions for protocol types // Override definitions for protocol types
for (SocketProtocol protocol : EnumSet.allOf(SocketProtocol.class)) { for (SocketProtocol protocol : EnumSet.allOf(SocketProtocol.class)) {
new OptionalSocketBindingProtocolResourceDefinition(protocol.name(), this.configurator, SocketProtocolConfigurationServiceConfigurator::new, this.parentServiceConfiguratorFactory).register(registration); new SocketProtocolResourceDefinition(protocol.name(), this.configurator, this.parentServiceConfiguratorFactory).register(registration);
} }
for (MulticastProtocol protocol : EnumSet.allOf(MulticastProtocol.class)) { for (MulticastProtocol protocol : EnumSet.allOf(MulticastProtocol.class)) {
new SocketBindingProtocolResourceDefinition(protocol.name(), this.configurator, MulticastSocketProtocolConfigurationServiceConfigurator::new, this.parentServiceConfiguratorFactory).register(registration); new MulticastProtocolResourceDefinition(protocol.name(), this.configurator, this.parentServiceConfiguratorFactory).register(registration);
} }


for (JdbcProtocol protocol : EnumSet.allOf(JdbcProtocol.class)) { for (JdbcProtocol protocol : EnumSet.allOf(JdbcProtocol.class)) {
Expand Down
Expand Up @@ -22,14 +22,17 @@


package org.jboss.as.clustering.jgroups.subsystem; package org.jboss.as.clustering.jgroups.subsystem;


import static org.jboss.as.clustering.jgroups.subsystem.OptionalSocketBindingProtocolResourceDefinition.Attribute.SOCKET_BINDING; import static org.jboss.as.clustering.jgroups.subsystem.SocketProtocolResourceDefinition.Attribute.CLIENT_SOCKET_BINDING;
import static org.jboss.as.clustering.jgroups.subsystem.SocketProtocolResourceDefinition.Attribute.SOCKET_BINDING;


import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Collections; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


import org.jboss.as.clustering.controller.Attribute;
import org.jboss.as.clustering.controller.CommonUnaryRequirement; import org.jboss.as.clustering.controller.CommonUnaryRequirement;
import org.jboss.as.controller.OperationContext; import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException; import org.jboss.as.controller.OperationFailedException;
Expand All @@ -39,6 +42,7 @@
import org.jboss.dmr.ModelNode; import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.ServiceBuilder; import org.jboss.msc.service.ServiceBuilder;
import org.jgroups.protocols.FD_SOCK; import org.jgroups.protocols.FD_SOCK;
import org.wildfly.clustering.service.CompositeDependency;
import org.wildfly.clustering.service.ServiceConfigurator; import org.wildfly.clustering.service.ServiceConfigurator;
import org.wildfly.clustering.service.ServiceSupplierDependency; import org.wildfly.clustering.service.ServiceSupplierDependency;
import org.wildfly.clustering.service.SimpleSupplierDependency; import org.wildfly.clustering.service.SimpleSupplierDependency;
Expand All @@ -51,26 +55,35 @@
public class SocketProtocolConfigurationServiceConfigurator extends ProtocolConfigurationServiceConfigurator<FD_SOCK> { public class SocketProtocolConfigurationServiceConfigurator extends ProtocolConfigurationServiceConfigurator<FD_SOCK> {


private volatile SupplierDependency<SocketBinding> binding; private volatile SupplierDependency<SocketBinding> binding;
private volatile SupplierDependency<SocketBinding> clientBinding;


public SocketProtocolConfigurationServiceConfigurator(PathAddress address) { public SocketProtocolConfigurationServiceConfigurator(PathAddress address) {
super(address); super(address);
} }


@Override @Override
public <T> ServiceBuilder<T> register(ServiceBuilder<T> builder) { public <T> ServiceBuilder<T> register(ServiceBuilder<T> builder) {
return super.register(this.binding.register(builder)); return super.register(new CompositeDependency(this.binding, this.clientBinding).register(builder));
} }


@Override @Override
public ServiceConfigurator configure(OperationContext context, ModelNode model) throws OperationFailedException { public ServiceConfigurator configure(OperationContext context, ModelNode model) throws OperationFailedException {
String bindingName = SOCKET_BINDING.resolveModelAttribute(context, model).asString(null); this.binding = createDependency(context, model, SOCKET_BINDING);
this.binding = (bindingName != null) ? new ServiceSupplierDependency<>(CommonUnaryRequirement.SOCKET_BINDING.getServiceName(context, bindingName)) : new SimpleSupplierDependency<>(null); this.clientBinding = createDependency(context, model, CLIENT_SOCKET_BINDING);
return super.configure(context, model); return super.configure(context, model);
} }


private static SupplierDependency<SocketBinding> createDependency(OperationContext context, ModelNode model, Attribute attribute) throws OperationFailedException {
String bindingName = attribute.resolveModelAttribute(context, model).asStringOrNull();
return (bindingName != null) ? new ServiceSupplierDependency<>(CommonUnaryRequirement.SOCKET_BINDING.getServiceName(context, bindingName)) : new SimpleSupplierDependency<>(null);
}

@Override @Override
public Map<String, SocketBinding> getSocketBindings() { public Map<String, SocketBinding> getSocketBindings() {
return Collections.singletonMap("jgroups.fd_sock.srv_sock", this.binding.get()); Map<String, SocketBinding> bindings = new HashMap<>();
bindings.put("jgroups.fd_sock.srv_sock", this.binding.get());
bindings.put("jgroups.fd.ping_sock", this.clientBinding.get());
return bindings;
} }


@Override @Override
Expand All @@ -93,5 +106,10 @@ public void accept(FD_SOCK protocol) {
} }
} }
} }
SocketBinding clientBinding = this.clientBinding.get();
if (clientBinding != null) {
InetSocketAddress socketAddress = clientBinding.getSocketAddress();
protocol.setValue("client_bind_port", socketAddress.getPort());
}
} }
} }

0 comments on commit 8ff8e60

Please sign in to comment.