Skip to content

Commit

Permalink
AS7-1768 1st channel not using shared transport
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro authored and n1hility committed Sep 22, 2011
1 parent 8a5ccb0 commit 1a1eeb9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public JChannelFactory(ProtocolStackConfiguration configuration) {

@Override
public Channel createChannel(String id) throws Exception {

JChannel channel = new JChannel(this);

// We need to synchronize on shared transport,
Expand All @@ -79,7 +78,10 @@ public Channel createChannel(String id) throws Exception {
this.init(transport);
}

channel.setName(id);
// Get hostname without triggering reverse dns lookup
String address = transport.getBindAddressAsInetAddress().toString();
int index = address.indexOf("/");
channel.setName(InetSocketAddress.createUnresolved((index > 0) ? address.substring(0, index) : address.substring(1), transport.getBindPort()).toString());

MBeanServer server = this.configuration.getMBeanServer();
if (server != null) {
Expand Down Expand Up @@ -147,10 +149,10 @@ public String getProtocolStackString() {
public List<org.jgroups.conf.ProtocolConfiguration> getProtocolStack() {
List<org.jgroups.conf.ProtocolConfiguration> configs = new ArrayList<org.jgroups.conf.ProtocolConfiguration>(this.configuration.getProtocols().size() + 1);
TransportConfiguration transport = this.configuration.getTransport();
Map<String, String> properties = transport.getProperties();
org.jgroups.conf.ProtocolConfiguration config = this.createProtocol(this.configuration.getTransport());

if (transport.isShared() && !transport.getProperties().containsKey(Global.SINGLETON_NAME)) {
org.jgroups.conf.ProtocolConfiguration config = this.createProtocol(transport);
Map<String, String> properties = config.getProperties();
if (transport.isShared()) {
properties.put(Global.SINGLETON_NAME, this.configuration.getName());
}

Expand Down Expand Up @@ -280,26 +282,61 @@ public static class JChannel extends org.jgroups.JChannel {

@Override
public void connect(final String clusterName, final boolean useFlushIfPresent) throws ChannelException {
TP transport = this.getProtocolStack().getTransport();
if (transport.isSingleton()) {
synchronized (transport) {
super.connect(clusterName, useFlushIfPresent);
ConnectTask connectTask = new ConnectTask() {
@Override
public void connect() throws ChannelException {
JChannel.super.connect(clusterName, useFlushIfPresent);
}
} else {
super.connect(clusterName, useFlushIfPresent);
}
};
this.connect(connectTask);
}

@Override
public void connect(String clusterName, Address target, String stateId, long timeout, boolean useFlushIfPresent) throws ChannelException {
public void connect(final String clusterName) throws ChannelException {
ConnectTask connectTask = new ConnectTask() {
@Override
public void connect() throws ChannelException {
JChannel.super.connect(clusterName);
}
};
this.connect(connectTask);
}

@Override
public void connect(final String clusterName, final Address target, final String stateId, final long timeout) throws ChannelException {
ConnectTask connectTask = new ConnectTask() {
@Override
public void connect() throws ChannelException {
JChannel.super.connect(clusterName, target, stateId, timeout);
}
};
this.connect(connectTask);
}

@Override
public void connect(final String clusterName, final Address target, final String stateId, final long timeout, final boolean useFlushIfPresent) throws ChannelException {
ConnectTask connectTask = new ConnectTask() {
@Override
public void connect() throws ChannelException {
JChannel.super.connect(clusterName, target, stateId, timeout, useFlushIfPresent);
}
};
this.connect(connectTask);
}

private void connect(ConnectTask connectTask) throws ChannelException {
TP transport = this.getProtocolStack().getTransport();
if (transport.isSingleton()) {
synchronized (transport) {
super.connect(clusterName, target, stateId, timeout, useFlushIfPresent);
connectTask.connect();
}
} else {
super.connect(clusterName, target, stateId, timeout, useFlushIfPresent);
connectTask.connect();
}
}

private interface ConnectTask {
void connect() throws ChannelException;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public interface ProtocolStackConfiguration {

String getName();

// ServerEnvironment getEnvironment();

ProtocolDefaults getDefaults();

MBeanServer getMBeanServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ThreadFactory;
import javax.management.MBeanServer;
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jboss.as.clustering.jgroups.JChannelFactory;
import org.jboss.as.clustering.jgroups.ProtocolConfiguration;
import org.jboss.as.clustering.jgroups.ProtocolDefaults;
import org.jboss.as.clustering.jgroups.ProtocolStackConfiguration;
Expand All @@ -55,8 +54,6 @@
import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ValueService;
import org.jboss.msc.value.ImmediateValue;
import org.jboss.msc.value.InjectedValue;
import org.jboss.threads.JBossExecutors;

Expand Down Expand Up @@ -98,9 +95,12 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
ProtocolStack stackConfig = new ProtocolStack(name, transportConfig);

ServiceBuilder<ChannelFactory> builder = context.getServiceTarget()
.addService(ChannelFactoryService.getServiceName(name), new ValueService<ChannelFactory>(new ImmediateValue<ChannelFactory>(new JChannelFactory(stackConfig))))
.addService(ChannelFactoryService.getServiceName(name), new ChannelFactoryService(stackConfig))
.addDependency(ProtocolDefaultsService.SERVICE_NAME, ProtocolDefaults.class, stackConfig.getDefaultsInjector())
.addDependency(DependencyType.OPTIONAL, ServiceName.JBOSS.append("mbean", "server"), MBeanServer.class, stackConfig.getMBeanServerInjector());
if (transport.hasDefined(ModelKeys.SHARED)) {
transportConfig.setShared(transport.asBoolean());
}
build(builder, transport, transportConfig);
addSocketBindingDependency(builder, transport, ModelKeys.DIAGNOSTICS_SOCKET_BINDING, transportConfig.getDiagnosticsSocketBindingInjector());
addExecutorDependency(builder, transport, ModelKeys.DEFAULT_EXECUTOR, transportConfig.getDefaultExecutorInjector());
Expand All @@ -119,7 +119,6 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
builder.setInitialMode(ServiceController.Mode.ON_DEMAND);

newControllers.add(builder.install());

}

private void build(ServiceBuilder<ChannelFactory> builder, ModelNode protocol, Protocol protocolConfig) {
Expand Down

0 comments on commit 1a1eeb9

Please sign in to comment.