Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-13630 Do not send cross-site requests to local site #9806

Merged
merged 1 commit into from Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,11 +18,13 @@
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.concurrent.AggregateCompletionStage;
Expand Down Expand Up @@ -60,9 +62,9 @@ public class DefaultIracTombstoneManager implements IracTombstoneManager {
final Collection<IracXSiteBackup> asyncBackups;

public DefaultIracTombstoneManager(Configuration configuration) {
this.iracExecutor = new IracExecutor(this::performCleanup);
this.asyncBackups = DefaultIracManager.asyncBackups(configuration);
this.tombstoneMap = new ConcurrentHashMap<>();
iracExecutor = new IracExecutor(this::performCleanup);
asyncBackups = DefaultIracManager.asyncBackups(configuration);
tombstoneMap = new ConcurrentHashMap<>();
}

@Inject
Expand All @@ -75,6 +77,14 @@ public void inject(@ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
executorService.scheduleAtFixedRate(iracExecutor::run, 30, 30, TimeUnit.SECONDS);
}

@Start
public void start() {
Transport transport = rpcManager.getTransport();
transport.checkCrossSiteAvailable();
String localSiteName = transport.localSiteName();
asyncBackups.removeIf(xSiteBackup -> localSiteName.equals(xSiteBackup.getSiteName()));
}

public void storeTombstone(int segment, Object key, IracMetadata metadata) {
tombstoneMap.put(key, new TombstoneData(segment, metadata));
}
Expand Down Expand Up @@ -147,11 +157,11 @@ DistributionInfo getSegmentDistribution(int segment) {
return distributionManager.getCacheTopology().getSegmentDistribution(segment);
}

private static class TombstoneData {
private static final class TombstoneData {
private final int segment;
private final IracMetadata metadata;

private TombstoneData(int segment, IracMetadata metadata) {
TombstoneData(int segment, IracMetadata metadata) {
this.segment = segment;
this.metadata = Objects.requireNonNull(metadata);
}
Expand All @@ -178,11 +188,11 @@ public int hashCode() {
}
}

private class CleanupTask implements Function<Boolean, CompletionStage<Void>>, Runnable {
private final class CleanupTask implements Function<Boolean, CompletionStage<Void>>, Runnable {
private final Object key;
private final TombstoneData tombstone;

private CleanupTask(Object key, TombstoneData tombstone) {
CleanupTask(Object key, TombstoneData tombstone) {
this.key = key;
this.tombstone = tombstone;
}
Expand Down
Expand Up @@ -188,6 +188,7 @@ public class JGroupsTransport implements Transport {
private CompletableFuture<Void> nextViewFuture = new CompletableFuture<>();
private RequestRepository requests;
private final Map<String, SiteUnreachableReason> unreachableSites;
private String localSite;

// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
Expand Down Expand Up @@ -324,6 +325,7 @@ public BackupResponse backupRemotely(Collection<XSiteBackup> backups, XSiteRepli
log.tracef("About to send to backups %s, command %s", backups, command);
Map<XSiteBackup, CompletableFuture<ValidResponse>> backupCalls = new HashMap<>(backups.size());
for (XSiteBackup xsb : backups) {
assert !localSite.equals(xsb.getSiteName()) : "sending to local site";
Address recipient = JGroupsAddressCache.fromJGroupsAddress(new SiteMaster(xsb.getSiteName()));
long requestId = requests.newRequestId();
logRequest(requestId, command, recipient, "backup");
Expand All @@ -349,6 +351,7 @@ public BackupResponse backupRemotely(Collection<XSiteBackup> backups, XSiteRepli

@Override
public <O> XSiteResponse<O> backupRemotely(XSiteBackup backup, XSiteReplicateCommand<O> rpcCommand) {
assert !localSite.equals(backup.getSiteName()) : "sending to local site";
if (unreachableSites.containsKey(backup.getSiteName())) {
// fail fast if we have thread handling a SITE_UNREACHABLE event.
return new SiteUnreachableXSiteResponse<>(backup, timeService);
Expand Down Expand Up @@ -432,15 +435,14 @@ public boolean isMulticastCapable() {

@Override
public void checkCrossSiteAvailable() throws CacheConfigurationException {
if (findRelay2() == null) {
if (localSite == null) {
throw CLUSTER.crossSiteUnavailable();
}
}

@Override
public String localSiteName() {
RELAY2 relay2 = findRelay2();
return relay2 == null ? null : relay2.site();
return localSite;
}

@Start
Expand Down Expand Up @@ -469,17 +471,21 @@ public void start() {

waitForInitialNodes();
channel.getProtocolStack().getTransport().registerProbeHandler(probeHandler);
RELAY2 relay2 = findRelay2();
if (relay2 != null) {
localSite = relay2.site();
}
running = true;
}

protected void initChannel() {
final TransportConfiguration transportCfg = configuration.transport();
TransportConfiguration transportCfg = configuration.transport();
if (channel == null) {
buildChannel();
if (connectChannel) {
// Cannot change the name if the channelLookup already connected the channel
String transportNodeName = transportCfg.nodeName();
if (transportNodeName != null && transportNodeName.length() > 0) {
if (transportNodeName != null && !transportNodeName.isEmpty()) {
channel.setName(transportNodeName);
}
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class DefaultTakeOfflineManager implements TakeOfflineManager, XSiteRespo

public DefaultTakeOfflineManager(String cacheName) {
this.cacheName = cacheName;
this.offlineStatus = new ConcurrentHashMap<>();
offlineStatus = new ConcurrentHashMap<>(8);
}

public static boolean isCommunicationError(Throwable throwable) {
Expand All @@ -85,11 +85,14 @@ private static Optional<CacheConfigurationException> findConfigurationError(Thro

@Start
public void start() {
config.sites().enabledBackupStream().forEach(bc -> {
final String siteName = bc.site();
OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName));
offlineStatus.put(siteName, offline);
});
String localSiteName = rpcManager.getTransport().localSiteName();
config.sites().enabledBackupStream()
.filter(bc -> !localSiteName.equals(bc.site()))
.forEach(bc -> {
String siteName = bc.site();
OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName));
offlineStatus.put(siteName, offline);
});
}

@Override
Expand Down Expand Up @@ -187,11 +190,11 @@ private EventLogger getEventLogger() {
return eventLogManager.getEventLogger().context(cacheName).scope(rpcManager.getAddress());
}

private class Listener implements SiteStatusListener {
private final class Listener implements SiteStatusListener {

private final String siteName;

private Listener(String siteName) {
Listener(String siteName) {
this.siteName = siteName;
}

Expand Down
Expand Up @@ -45,8 +45,11 @@ protected void createSites() {
ConfigurationBuilder nyc = getNycActiveConfig();
nyc.sites().addBackup()
.site(LON)
.strategy(nycBackupStrategy)
.sites().addInUseBackupSite(LON);
.strategy(nycBackupStrategy);
nyc.sites().addBackup()
.site(NYC)
.strategy(nycBackupStrategy);
nyc.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC);

createSite(LON, initialClusterSize, globalConfigurationBuilderForSite(LON), lon);
createSite(NYC, initialClusterSize, globalConfigurationBuilderForSite(NYC), nyc);
Expand All @@ -73,9 +76,11 @@ protected ConfigurationBuilder lonConfigurationBuilder() {
.strategy(lonBackupStrategy)
.failurePolicyClass(lonCustomFailurePolicyClass)
.useTwoPhaseCommit(use2Pc)
.stateTransfer().maxRetries(1).waitTime(500)
.sites().addInUseBackupSite(NYC);
.stateTransfer().maxRetries(1).waitTime(500);
adaptLONConfiguration(lonBackupConfigurationBuilder);
// it shouldn't backup to itself
lon.sites().addBackup().site(LON).strategy(lonBackupStrategy);
lon.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC);
return lon;
}

Expand Down