Skip to content

Commit

Permalink
ISPN-13630 Do not send cross-site requests to local site
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and wburns committed Feb 9, 2022
1 parent 58f1b16 commit 2b45f1d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 24 deletions.
Expand Up @@ -18,11 +18,13 @@
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.concurrent.AggregateCompletionStage;
Expand Down Expand Up @@ -60,9 +62,9 @@ public class DefaultIracTombstoneManager implements IracTombstoneManager {
final Collection<IracXSiteBackup> asyncBackups;

public DefaultIracTombstoneManager(Configuration configuration) {
this.iracExecutor = new IracExecutor(this::performCleanup);
this.asyncBackups = DefaultIracManager.asyncBackups(configuration);
this.tombstoneMap = new ConcurrentHashMap<>();
iracExecutor = new IracExecutor(this::performCleanup);
asyncBackups = DefaultIracManager.asyncBackups(configuration);
tombstoneMap = new ConcurrentHashMap<>();
}

@Inject
Expand All @@ -75,6 +77,14 @@ public void inject(@ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
executorService.scheduleAtFixedRate(iracExecutor::run, 30, 30, TimeUnit.SECONDS);
}

@Start
public void start() {
Transport transport = rpcManager.getTransport();
transport.checkCrossSiteAvailable();
String localSiteName = transport.localSiteName();
asyncBackups.removeIf(xSiteBackup -> localSiteName.equals(xSiteBackup.getSiteName()));
}

public void storeTombstone(int segment, Object key, IracMetadata metadata) {
tombstoneMap.put(key, new TombstoneData(segment, metadata));
}
Expand Down Expand Up @@ -147,11 +157,11 @@ DistributionInfo getSegmentDistribution(int segment) {
return distributionManager.getCacheTopology().getSegmentDistribution(segment);
}

private static class TombstoneData {
private static final class TombstoneData {
private final int segment;
private final IracMetadata metadata;

private TombstoneData(int segment, IracMetadata metadata) {
TombstoneData(int segment, IracMetadata metadata) {
this.segment = segment;
this.metadata = Objects.requireNonNull(metadata);
}
Expand All @@ -178,11 +188,11 @@ public int hashCode() {
}
}

private class CleanupTask implements Function<Boolean, CompletionStage<Void>>, Runnable {
private final class CleanupTask implements Function<Boolean, CompletionStage<Void>>, Runnable {
private final Object key;
private final TombstoneData tombstone;

private CleanupTask(Object key, TombstoneData tombstone) {
CleanupTask(Object key, TombstoneData tombstone) {
this.key = key;
this.tombstone = tombstone;
}
Expand Down
Expand Up @@ -188,6 +188,7 @@ public class JGroupsTransport implements Transport {
private CompletableFuture<Void> nextViewFuture = new CompletableFuture<>();
private RequestRepository requests;
private final Map<String, SiteUnreachableReason> unreachableSites;
private String localSite;

// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
Expand Down Expand Up @@ -324,6 +325,7 @@ public BackupResponse backupRemotely(Collection<XSiteBackup> backups, XSiteRepli
log.tracef("About to send to backups %s, command %s", backups, command);
Map<XSiteBackup, CompletableFuture<ValidResponse>> backupCalls = new HashMap<>(backups.size());
for (XSiteBackup xsb : backups) {
assert !localSite.equals(xsb.getSiteName()) : "sending to local site";
Address recipient = JGroupsAddressCache.fromJGroupsAddress(new SiteMaster(xsb.getSiteName()));
long requestId = requests.newRequestId();
logRequest(requestId, command, recipient, "backup");
Expand All @@ -349,6 +351,7 @@ public BackupResponse backupRemotely(Collection<XSiteBackup> backups, XSiteRepli

@Override
public <O> XSiteResponse<O> backupRemotely(XSiteBackup backup, XSiteReplicateCommand<O> rpcCommand) {
assert !localSite.equals(backup.getSiteName()) : "sending to local site";
if (unreachableSites.containsKey(backup.getSiteName())) {
// fail fast if we have thread handling a SITE_UNREACHABLE event.
return new SiteUnreachableXSiteResponse<>(backup, timeService);
Expand Down Expand Up @@ -432,15 +435,14 @@ public boolean isMulticastCapable() {

@Override
public void checkCrossSiteAvailable() throws CacheConfigurationException {
if (findRelay2() == null) {
if (localSite == null) {
throw CLUSTER.crossSiteUnavailable();
}
}

@Override
public String localSiteName() {
RELAY2 relay2 = findRelay2();
return relay2 == null ? null : relay2.site();
return localSite;
}

@Start
Expand Down Expand Up @@ -469,17 +471,21 @@ public void start() {

waitForInitialNodes();
channel.getProtocolStack().getTransport().registerProbeHandler(probeHandler);
RELAY2 relay2 = findRelay2();
if (relay2 != null) {
localSite = relay2.site();
}
running = true;
}

protected void initChannel() {
final TransportConfiguration transportCfg = configuration.transport();
TransportConfiguration transportCfg = configuration.transport();
if (channel == null) {
buildChannel();
if (connectChannel) {
// Cannot change the name if the channelLookup already connected the channel
String transportNodeName = transportCfg.nodeName();
if (transportNodeName != null && transportNodeName.length() > 0) {
if (transportNodeName != null && !transportNodeName.isEmpty()) {
channel.setName(transportNodeName);
}
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class DefaultTakeOfflineManager implements TakeOfflineManager, XSiteRespo

public DefaultTakeOfflineManager(String cacheName) {
this.cacheName = cacheName;
this.offlineStatus = new ConcurrentHashMap<>();
offlineStatus = new ConcurrentHashMap<>(8);
}

public static boolean isCommunicationError(Throwable throwable) {
Expand All @@ -85,11 +85,14 @@ private static Optional<CacheConfigurationException> findConfigurationError(Thro

@Start
public void start() {
config.sites().enabledBackupStream().forEach(bc -> {
final String siteName = bc.site();
OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName));
offlineStatus.put(siteName, offline);
});
String localSiteName = rpcManager.getTransport().localSiteName();
config.sites().enabledBackupStream()
.filter(bc -> !localSiteName.equals(bc.site()))
.forEach(bc -> {
String siteName = bc.site();
OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName));
offlineStatus.put(siteName, offline);
});
}

@Override
Expand Down Expand Up @@ -187,11 +190,11 @@ private EventLogger getEventLogger() {
return eventLogManager.getEventLogger().context(cacheName).scope(rpcManager.getAddress());
}

private class Listener implements SiteStatusListener {
private final class Listener implements SiteStatusListener {

private final String siteName;

private Listener(String siteName) {
Listener(String siteName) {
this.siteName = siteName;
}

Expand Down
Expand Up @@ -45,8 +45,11 @@ protected void createSites() {
ConfigurationBuilder nyc = getNycActiveConfig();
nyc.sites().addBackup()
.site(LON)
.strategy(nycBackupStrategy)
.sites().addInUseBackupSite(LON);
.strategy(nycBackupStrategy);
nyc.sites().addBackup()
.site(NYC)
.strategy(nycBackupStrategy);
nyc.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC);

createSite(LON, initialClusterSize, globalConfigurationBuilderForSite(LON), lon);
createSite(NYC, initialClusterSize, globalConfigurationBuilderForSite(NYC), nyc);
Expand All @@ -73,9 +76,11 @@ protected ConfigurationBuilder lonConfigurationBuilder() {
.strategy(lonBackupStrategy)
.failurePolicyClass(lonCustomFailurePolicyClass)
.useTwoPhaseCommit(use2Pc)
.stateTransfer().maxRetries(1).waitTime(500)
.sites().addInUseBackupSite(NYC);
.stateTransfer().maxRetries(1).waitTime(500);
adaptLONConfiguration(lonBackupConfigurationBuilder);
// it shouldn't backup to itself
lon.sites().addBackup().site(LON).strategy(lonBackupStrategy);
lon.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC);
return lon;
}

Expand Down

0 comments on commit 2b45f1d

Please sign in to comment.