Skip to content

Commit

Permalink
wip...
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo committed Apr 21, 2023
1 parent 41337a8 commit af2d515
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 64 deletions.
2 changes: 1 addition & 1 deletion build/configuration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
<version.jboss.shrinkwrap>1.2.6</version.jboss.shrinkwrap>
<version.jboss.shrinkwrap.resolver>3.1.3</version.jboss.shrinkwrap.resolver>
<version.jcip-annotations>1.0</version.jcip-annotations>
<version.jgroups>5.2.12.Final</version.jgroups>
<version.jgroups>5.2.15.Final-SNAPSHOT</version.jgroups>
<version.jgroups.raft>1.0.10.Final</version.jgroups.raft>
<version.jsr107>1.1.0</version.jsr107>
<version.junit>4.13.2</version.junit>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.List;
import java.util.Map;

import org.infinispan.commons.util.Util;
import org.infinispan.configuration.global.JGroupsConfiguration;
import org.infinispan.xsite.XSiteNamedCache;
import org.jgroups.ChannelListener;
Expand All @@ -17,10 +16,8 @@
import org.jgroups.conf.ProtocolStackConfigurator;
import org.jgroups.protocols.relay.RELAY2;
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.stack.Configurator;
import org.jgroups.stack.Protocol;
import org.jgroups.util.SocketFactory;
import org.jgroups.util.StackType;

/**
* A JGroups {@link ProtocolStackConfigurator} which
Expand All @@ -30,8 +27,6 @@
**/
public class EmbeddedJGroupsChannelConfigurator extends AbstractJGroupsChannelConfigurator {

private static final String PROTOCOL_PREFIX = "org.jgroups.protocols.";

private final String name;
private final String parent;
private JGroupsConfiguration jgroupsConfiguration;
Expand Down Expand Up @@ -73,67 +68,48 @@ public String getName() {

@Override
public JChannel createChannel(String name) throws Exception {
StackType stackType = org.jgroups.util.Util.getIpStackType();
List<ProtocolConfiguration> actualStack = combineStack(jgroupsConfiguration.configurator(parent), stack);
List<Protocol> protocols = new ArrayList<>(actualStack.size());

boolean hasRelay2 = false;

for (ProtocolConfiguration c : actualStack) {
Protocol protocol;
try {
String className = PROTOCOL_PREFIX + c.getProtocolName();
protocol = Util.getInstanceStrict(className, getClass().getClassLoader());
} catch (ClassNotFoundException e) {
protocol = Util.getInstanceStrict(c.getProtocolName(), getClass().getClassLoader());
}
ProtocolConfiguration configuration = new ProtocolConfiguration(protocol.getName(), c.getProperties());
Configurator.initializeAttrs(protocol, configuration, stackType);
protocols.add(protocol);

if (protocol instanceof RELAY2) {
hasRelay2 = true;
// Process remote sites if any
RELAY2 relay2 = (RELAY2) protocol;
RemoteSites actualSites = getRemoteSites();
if (actualSites.remoteSites.size() == 0) {
throw CONFIG.jgroupsRelayWithoutRemoteSites(name);
}
for (Map.Entry<String, RemoteSite> remoteSite : actualSites.remoteSites.entrySet()) {
JGroupsChannelConfigurator configurator = jgroupsConfiguration.configurator(remoteSite.getValue().stack);
SocketFactory socketFactory = getSocketFactory();
String remoteCluster = remoteSite.getValue().cluster;
if (remoteCluster == null) {
remoteCluster = actualSites.defaultCluster;
}
if (socketFactory instanceof NamedSocketFactory) {
// Create a new NamedSocketFactory using the remote cluster name
socketFactory = new NamedSocketFactory((NamedSocketFactory) socketFactory, remoteCluster);
}
configurator.setSocketFactory(socketFactory);
for(ChannelListener listener : channelListeners) {
configurator.addChannelListener(listener);
}
RelayConfig.SiteConfig siteConfig = new RelayConfig.SiteConfig(remoteSite.getKey());
siteConfig.addBridge(new RelayConfig.BridgeConfig(remoteCluster) {
@Override
public JChannel createChannel() throws Exception {
// TODO The bridge channel is created lazily, and Infinispan doesn't see any errors
return configurator.createChannel(getClusterName());
}
});
relay2.addSite(remoteSite.getKey(), siteConfig);
}

}
}


if (!hasRelay2 && hasSites()) {
if (hasSites() && getProtocolStack().stream().noneMatch(EmbeddedJGroupsChannelConfigurator::isRelay2)) {
throw CONFIG.jgroupsRemoteSitesWithoutRelay(name);
}
return amendChannel(new JChannel(this));
}

return amendChannel(new JChannel(protocols));
@Override
public void afterCreation(Protocol protocol) {
if (!(protocol instanceof RELAY2)) {
return;
}
// Process remote sites if any
RemoteSites actualSites = getRemoteSites();
if (actualSites.remoteSites.size() == 0) {
throw CONFIG.jgroupsRelayWithoutRemoteSites(name);
}
RELAY2 relay2 = (RELAY2) protocol;
for (Map.Entry<String, RemoteSite> remoteSite : actualSites.remoteSites.entrySet()) {
JGroupsChannelConfigurator configurator = jgroupsConfiguration.configurator(remoteSite.getValue().stack);
SocketFactory socketFactory = getSocketFactory();
String remoteCluster = remoteSite.getValue().cluster;
if (remoteCluster == null) {
remoteCluster = actualSites.defaultCluster;
}
if (socketFactory instanceof NamedSocketFactory) {
// Create a new NamedSocketFactory using the remote cluster name
socketFactory = new NamedSocketFactory((NamedSocketFactory) socketFactory, remoteCluster);
}
configurator.setSocketFactory(socketFactory);
for(ChannelListener listener : channelListeners) {
configurator.addChannelListener(listener);
}
RelayConfig.SiteConfig siteConfig = new RelayConfig.SiteConfig(remoteSite.getKey());
siteConfig.addBridge(new RelayConfig.BridgeConfig(remoteCluster) {
@Override
public JChannel createChannel() throws Exception {
// TODO The bridge channel is created lazily, and Infinispan doesn't see any errors
return configurator.createChannel(getClusterName());
}
});
relay2.addSite(remoteSite.getKey(), siteConfig);
}
}

private static List<ProtocolConfiguration> combineStack(JGroupsChannelConfigurator baseStack, List<ProtocolConfiguration> stack) {
Expand Down Expand Up @@ -271,6 +247,15 @@ public String toString() {
'}';
}

private static boolean isRelay2(ProtocolConfiguration configuration) {
try {
return configuration.isAssignableProtocol(RELAY2.class, EmbeddedJGroupsChannelConfigurator.class);
} catch (Exception e) {
// it is ok to return false, JChannel will throw the exception
return false;
}
}

public enum StackCombine {
COMBINE,
INSERT_AFTER,
Expand Down

0 comments on commit af2d515

Please sign in to comment.