Skip to content

Commit

Permalink
AS7-4415 Default up handler of Channel created by jgroups subsystem i…
Browse files Browse the repository at this point in the history
…nterferes with custom receivers.

Some minor code cleanup of ProtocolStackAdd
  • Loading branch information
pferraro authored and stuartwdouglas committed Apr 8, 2012
1 parent c611cf5 commit ae32d2c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 53 deletions.
Expand Up @@ -22,6 +22,7 @@
package org.jboss.as.clustering.jgroups;

import org.jgroups.JChannel;
import org.jgroups.Receiver;
import org.jgroups.UpHandler;
import org.jgroups.blocks.mux.MuxUpHandler;
import org.jgroups.blocks.mux.Muxer;
Expand All @@ -37,6 +38,15 @@ public MuxChannel(ProtocolStackConfigurator configurator) throws Exception {
this.setUpHandler(new MuxUpHandler());
}

@Override
public void setReceiver(Receiver receiver) {
super.setReceiver(receiver);
// If we're using a receiver, we're not interested in using an up handler
if (receiver != null) {
super.setUpHandler(null);
}
}

@Override
public void setUpHandler(UpHandler handler) {
UpHandler existingHandler = this.getUpHandler();
Expand Down
Expand Up @@ -26,9 +26,11 @@

import javax.management.MBeanServer;
import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -98,7 +100,7 @@ protected void populateModel(final OperationContext context, final ModelNode ope
// add a step to initialize an *optional* TRANSPORT parameter
if (operation.hasDefined(ModelKeys.TRANSPORT)) {
// create an ADD operation to add the transport=TRANSPORT child
ModelNode addTransport = operation.get(ModelKeys.TRANSPORT).clone() ;
ModelNode addTransport = operation.get(ModelKeys.TRANSPORT).clone();

addTransport.get(OPERATION_NAME).set(ModelDescriptionConstants.ADD);
ModelNode transportAddress = operation.get(OP_ADDR).clone();
Expand All @@ -122,7 +124,7 @@ protected void populateModel(final OperationContext context, final ModelNode ope
ModelNode protocol = protocols.get(i);

// create an ADD operation to add the protocol=* child
ModelNode addProtocol = protocol.clone() ;
ModelNode addProtocol = protocol.clone();
addProtocol.get(OPERATION_NAME).set(ModelKeys.ADD_PROTOCOL);
// add-protocol is a stack operation
ModelNode protocolAddress = operation.get(OP_ADDR).clone();
Expand Down Expand Up @@ -155,28 +157,22 @@ protected void installRuntimeServices(OperationContext context, ModelNode operat

// we need to preserve the order of the protocols as maintained by PROTOCOLS
// pick up the ordered protocols here as a List<Property> where property is <name, ModelNode>
List<Property> orderedProtocols = getOrderedProtocolPropertyList(model) ;
List<Property> orderedProtocols = getOrderedProtocolPropertyList(model);

// pick up the transport here and its values
ModelNode transport = model.get(ModelKeys.TRANSPORT, ModelKeys.TRANSPORT_NAME);
ModelNode resolvedValue = null ;
final String type = ((resolvedValue = CommonAttributes.TYPE.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
ModelNode resolvedValue = null;
final String type = (resolvedValue = CommonAttributes.TYPE.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final boolean shared = CommonAttributes.SHARED.resolveModelAttribute(context, transport).asBoolean();
final String machine = ((resolvedValue = CommonAttributes.MACHINE.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String rack = ((resolvedValue = CommonAttributes.RACK.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String site = ((resolvedValue = CommonAttributes.SITE.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String timerExecutor = ((resolvedValue = CommonAttributes.TIMER_EXECUTOR.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String threadFactory = ((resolvedValue = CommonAttributes.THREAD_FACTORY.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String diagnosticsSocketBinding = ((resolvedValue = CommonAttributes.DIAGNOSTICS_SOCKET_BINDING.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String defaultExecutor = ((resolvedValue = CommonAttributes.DEFAULT_EXECUTOR.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String oobExecutor = ((resolvedValue = CommonAttributes.OOB_EXECUTOR.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
final String transportSocketBinding = ((resolvedValue = CommonAttributes.SOCKET_BINDING.resolveModelAttribute(context, transport)).isDefined()) ? resolvedValue.asString() : null ;
List<String> protocolSocketBindings = new ArrayList<String>();
for (Property protocolProperty : orderedProtocols) {
ModelNode protocol = protocolProperty.getValue();
final String protocolSocketBinding = ((resolvedValue = CommonAttributes.SOCKET_BINDING.resolveModelAttribute(context, protocol)).isDefined()) ? resolvedValue.asString() : null ;
protocolSocketBindings.add(protocolSocketBinding) ;
}
final String machine = (resolvedValue = CommonAttributes.MACHINE.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String rack = (resolvedValue = CommonAttributes.RACK.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String site = (resolvedValue = CommonAttributes.SITE.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String timerExecutor = (resolvedValue = CommonAttributes.TIMER_EXECUTOR.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String threadFactory = (resolvedValue = CommonAttributes.THREAD_FACTORY.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String diagnosticsSocketBinding = (resolvedValue = CommonAttributes.DIAGNOSTICS_SOCKET_BINDING.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String defaultExecutor = (resolvedValue = CommonAttributes.DEFAULT_EXECUTOR.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String oobExecutor = (resolvedValue = CommonAttributes.OOB_EXECUTOR.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;
final String transportSocketBinding = (resolvedValue = CommonAttributes.SOCKET_BINDING.resolveModelAttribute(context, transport)).isDefined() ? resolvedValue.asString() : null;

// set up the transport
Transport transportConfig = new Transport(type);
Expand All @@ -186,12 +182,15 @@ protected void installRuntimeServices(OperationContext context, ModelNode operat

// set up the protocol stack Protocol objects
ProtocolStack stackConfig = new ProtocolStack(name, transportConfig);
List<Map.Entry<Protocol, String>> protocolSocketBindings = new ArrayList<Map.Entry<Protocol, String>>(orderedProtocols.size());
for (Property protocolProperty : orderedProtocols) {
ModelNode tmp = null ;
final String protocolType = ((tmp = CommonAttributes.TYPE.resolveModelAttribute(context, protocolProperty.getValue())).isDefined()) ? tmp.asString() : null ;
ModelNode protocol = protocolProperty.getValue();
final String protocolType = (resolvedValue = CommonAttributes.TYPE.resolveModelAttribute(context, protocol)).isDefined() ? resolvedValue.asString() : null;
Protocol protocolConfig = new Protocol(protocolType);
initProtocolProperties(protocolProperty.getValue(), protocolConfig);
initProtocolProperties(protocol, protocolConfig);
stackConfig.getProtocols().add(protocolConfig);
final String protocolSocketBinding = (resolvedValue = CommonAttributes.SOCKET_BINDING.resolveModelAttribute(context, protocol)).isDefined() ? resolvedValue.asString() : null;
protocolSocketBindings.add(new AbstractMap.SimpleImmutableEntry<Protocol, String>(protocolConfig, protocolSocketBinding));
}

ServiceTarget target = context.getServiceTarget();
Expand Down Expand Up @@ -224,7 +223,7 @@ protected ServiceController<ChannelFactory> installChannelFactoryService(Service
String timerExecutor,
String threadFactory,
String transportSocketBinding,
List<String> protocolSocketBindings,
List<Map.Entry<Protocol, String>> protocolSocketBindings,
Transport transportConfig,
ProtocolStack stackConfig,
ServiceVerificationHandler verificationHandler) {
Expand All @@ -235,19 +234,13 @@ protected ServiceController<ChannelFactory> installChannelFactoryService(Service
.addDependency(ProtocolDefaultsService.SERVICE_NAME, ProtocolDefaults.class, stackConfig.getDefaultsInjector())
.addDependency(MBeanServerService.SERVICE_NAME, MBeanServer.class, stackConfig.getMBeanServerInjector())
.addDependency(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class, stackConfig.getEnvironmentInjector())
;

.setInitialMode(ServiceController.Mode.ON_DEMAND)
;
// add transport dependencies
addSocketBindingDependency(builder, transportSocketBinding, transportConfig.getSocketBindingInjector());

// add protocol dependencies, iterating over two lists in lock step
Iterator sbIterator = protocolSocketBindings.iterator();
Iterator pcIterator = stackConfig.getProtocols().iterator();
while (sbIterator.hasNext() && pcIterator.hasNext()) {
String socketBinding = (String) sbIterator.next();
Protocol protocolConfig = (Protocol) pcIterator.next();

addSocketBindingDependency(builder, socketBinding, protocolConfig.getSocketBindingInjector());
for (Map.Entry<Protocol, String> entry: protocolSocketBindings) {
addSocketBindingDependency(builder, entry.getValue(), entry.getKey().getSocketBindingInjector());
}

// add remaining dependencies
Expand All @@ -260,9 +253,7 @@ protected ServiceController<ChannelFactory> installChannelFactoryService(Service
if (threadFactory != null) {
builder.addDependency(ThreadsServices.threadFactoryName(threadFactory), ThreadFactory.class, transportConfig.getThreadFactoryInjector());
}
builder.setInitialMode(ServiceController.Mode.ON_DEMAND);

return builder.install() ;
return builder.install();
}

private void initProtocolProperties(ModelNode protocol, Protocol protocolConfig) {
Expand All @@ -288,16 +279,16 @@ public static List<Property> getOrderedProtocolPropertyList(ModelNode stack) {

// check for the empty ordering list
if (!stack.hasDefined(ModelKeys.PROTOCOLS)) {
return null ;
return null;
}
// PROTOCOLS is a list of protocol names only, reflecting the order in which protocols were added to the stack
List<ModelNode> protocolOrdering = stack.get(ModelKeys.PROTOCOLS).asList();

// now construct an ordered list of the full protocol model nodes
ModelNode unorderedProtocols = stack.get(ModelKeys.PROTOCOL) ;
ModelNode unorderedProtocols = stack.get(ModelKeys.PROTOCOL);
for (ModelNode protocolName : protocolOrdering) {
ModelNode protocolModel = unorderedProtocols.get(protocolName.asString());
orderedProtocols.add(protocolName.asString(), protocolModel) ;
orderedProtocols.add(protocolName.asString(), protocolModel);
}
return orderedProtocols.asPropertyList();
}
Expand All @@ -321,12 +312,12 @@ private void protocolStackSanityCheck(String stackName, ModelNode model) throws

ModelNode transport = model.get(ModelKeys.TRANSPORT, ModelKeys.TRANSPORT_NAME);
if (!transport.isDefined()) {
throw JGroupsMessages.MESSAGES.transportNotDefined(stackName) ;
throw JGroupsMessages.MESSAGES.transportNotDefined(stackName);
}

List<Property> protocols = getOrderedProtocolPropertyList(model);
if ( protocols == null || !(protocols.size() > 0)) {
throw JGroupsMessages.MESSAGES.protocolListNotDefined(stackName) ;
throw JGroupsMessages.MESSAGES.protocolListNotDefined(stackName);
}
}

Expand Down Expand Up @@ -504,15 +495,21 @@ static class Protocol implements ProtocolConfiguration {
private final String name;
private final InjectedValue<SocketBinding> socketBinding = new InjectedValue<SocketBinding>();
private final Map<String, String> properties = new HashMap<String, String>();
private final Class<?> protocolClass;
final Class<?> protocolClass;

Protocol(String name) {
Protocol(final String name) {
this.name = name;
try {
this.protocolClass = this.getClass().getClassLoader().loadClass(org.jgroups.conf.ProtocolConfiguration.protocol_prefix + '.' + name);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
PrivilegedAction<Class<?>> action = new PrivilegedAction<Class<?>>() {
@Override
public Class<?> run() {
try {
return Protocol.this.getClass().getClassLoader().loadClass(org.jgroups.conf.ProtocolConfiguration.protocol_prefix + '.' + name);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
};
this.protocolClass = AccessController.doPrivileged(action);
}

@Override
Expand All @@ -521,11 +518,17 @@ public String getName() {
}

@Override
public boolean hasProperty(String property) {
return getField(this.protocolClass, property) != null;
public boolean hasProperty(final String property) {
PrivilegedAction<Field> action = new PrivilegedAction<Field>() {
@Override
public Field run() {
return getField(Protocol.this.protocolClass, property);
}
};
return AccessController.doPrivileged(action) != null;
}

private static Field getField(Class<?> targetClass, String property) {
static Field getField(Class<?> targetClass, String property) {
try {
return targetClass.getDeclaredField(property);
} catch (NoSuchFieldException e) {
Expand Down

0 comments on commit ae32d2c

Please sign in to comment.