From 2b45f1dceea3a02441693a47d4a407f24baa49d2 Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Thu, 13 Jan 2022 11:22:11 +0000 Subject: [PATCH] ISPN-13630 Do not send cross-site requests to local site --- .../irac/DefaultIracTombstoneManager.java | 24 +++++++++++++------ .../transport/jgroups/JGroupsTransport.java | 16 +++++++++---- .../status/DefaultTakeOfflineManager.java | 19 ++++++++------- .../xsite/AbstractTwoSitesTest.java | 13 ++++++---- 4 files changed, 48 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/infinispan/container/versioning/irac/DefaultIracTombstoneManager.java b/core/src/main/java/org/infinispan/container/versioning/irac/DefaultIracTombstoneManager.java index d21b6e4dc775..2ba1a54a48e3 100644 --- a/core/src/main/java/org/infinispan/container/versioning/irac/DefaultIracTombstoneManager.java +++ b/core/src/main/java/org/infinispan/container/versioning/irac/DefaultIracTombstoneManager.java @@ -18,11 +18,13 @@ import org.infinispan.factories.KnownComponentNames; import org.infinispan.factories.annotations.ComponentName; import org.infinispan.factories.annotations.Inject; +import org.infinispan.factories.annotations.Start; import org.infinispan.factories.impl.ComponentRef; import org.infinispan.factories.scopes.Scope; import org.infinispan.factories.scopes.Scopes; import org.infinispan.metadata.impl.IracMetadata; import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Transport; import org.infinispan.remoting.transport.impl.VoidResponseCollector; import org.infinispan.util.ExponentialBackOff; import org.infinispan.util.concurrent.AggregateCompletionStage; @@ -60,9 +62,9 @@ public class DefaultIracTombstoneManager implements IracTombstoneManager { final Collection asyncBackups; public DefaultIracTombstoneManager(Configuration configuration) { - this.iracExecutor = new IracExecutor(this::performCleanup); - this.asyncBackups = DefaultIracManager.asyncBackups(configuration); - this.tombstoneMap = new ConcurrentHashMap<>(); + iracExecutor = new IracExecutor(this::performCleanup); + asyncBackups = DefaultIracManager.asyncBackups(configuration); + tombstoneMap = new ConcurrentHashMap<>(); } @Inject @@ -75,6 +77,14 @@ public void inject(@ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR) executorService.scheduleAtFixedRate(iracExecutor::run, 30, 30, TimeUnit.SECONDS); } + @Start + public void start() { + Transport transport = rpcManager.getTransport(); + transport.checkCrossSiteAvailable(); + String localSiteName = transport.localSiteName(); + asyncBackups.removeIf(xSiteBackup -> localSiteName.equals(xSiteBackup.getSiteName())); + } + public void storeTombstone(int segment, Object key, IracMetadata metadata) { tombstoneMap.put(key, new TombstoneData(segment, metadata)); } @@ -147,11 +157,11 @@ DistributionInfo getSegmentDistribution(int segment) { return distributionManager.getCacheTopology().getSegmentDistribution(segment); } - private static class TombstoneData { + private static final class TombstoneData { private final int segment; private final IracMetadata metadata; - private TombstoneData(int segment, IracMetadata metadata) { + TombstoneData(int segment, IracMetadata metadata) { this.segment = segment; this.metadata = Objects.requireNonNull(metadata); } @@ -178,11 +188,11 @@ public int hashCode() { } } - private class CleanupTask implements Function>, Runnable { + private final class CleanupTask implements Function>, Runnable { private final Object key; private final TombstoneData tombstone; - private CleanupTask(Object key, TombstoneData tombstone) { + CleanupTask(Object key, TombstoneData tombstone) { this.key = key; this.tombstone = tombstone; } diff --git a/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java b/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java index cb154f2ed2bd..58dd3cf2a9d3 100644 --- a/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java +++ b/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java @@ -188,6 +188,7 @@ public class JGroupsTransport implements Transport { private CompletableFuture nextViewFuture = new CompletableFuture<>(); private RequestRepository requests; private final Map unreachableSites; + private String localSite; // ------------------------------------------------------------------------------------------------------------------ // Lifecycle and setup stuff @@ -324,6 +325,7 @@ public BackupResponse backupRemotely(Collection backups, XSiteRepli log.tracef("About to send to backups %s, command %s", backups, command); Map> backupCalls = new HashMap<>(backups.size()); for (XSiteBackup xsb : backups) { + assert !localSite.equals(xsb.getSiteName()) : "sending to local site"; Address recipient = JGroupsAddressCache.fromJGroupsAddress(new SiteMaster(xsb.getSiteName())); long requestId = requests.newRequestId(); logRequest(requestId, command, recipient, "backup"); @@ -349,6 +351,7 @@ public BackupResponse backupRemotely(Collection backups, XSiteRepli @Override public XSiteResponse backupRemotely(XSiteBackup backup, XSiteReplicateCommand rpcCommand) { + assert !localSite.equals(backup.getSiteName()) : "sending to local site"; if (unreachableSites.containsKey(backup.getSiteName())) { // fail fast if we have thread handling a SITE_UNREACHABLE event. return new SiteUnreachableXSiteResponse<>(backup, timeService); @@ -432,15 +435,14 @@ public boolean isMulticastCapable() { @Override public void checkCrossSiteAvailable() throws CacheConfigurationException { - if (findRelay2() == null) { + if (localSite == null) { throw CLUSTER.crossSiteUnavailable(); } } @Override public String localSiteName() { - RELAY2 relay2 = findRelay2(); - return relay2 == null ? null : relay2.site(); + return localSite; } @Start @@ -469,17 +471,21 @@ public void start() { waitForInitialNodes(); channel.getProtocolStack().getTransport().registerProbeHandler(probeHandler); + RELAY2 relay2 = findRelay2(); + if (relay2 != null) { + localSite = relay2.site(); + } running = true; } protected void initChannel() { - final TransportConfiguration transportCfg = configuration.transport(); + TransportConfiguration transportCfg = configuration.transport(); if (channel == null) { buildChannel(); if (connectChannel) { // Cannot change the name if the channelLookup already connected the channel String transportNodeName = transportCfg.nodeName(); - if (transportNodeName != null && transportNodeName.length() > 0) { + if (transportNodeName != null && !transportNodeName.isEmpty()) { channel.setName(transportNodeName); } } diff --git a/core/src/main/java/org/infinispan/xsite/status/DefaultTakeOfflineManager.java b/core/src/main/java/org/infinispan/xsite/status/DefaultTakeOfflineManager.java index 53c899af664a..1fd1b763e1f4 100644 --- a/core/src/main/java/org/infinispan/xsite/status/DefaultTakeOfflineManager.java +++ b/core/src/main/java/org/infinispan/xsite/status/DefaultTakeOfflineManager.java @@ -58,7 +58,7 @@ public class DefaultTakeOfflineManager implements TakeOfflineManager, XSiteRespo public DefaultTakeOfflineManager(String cacheName) { this.cacheName = cacheName; - this.offlineStatus = new ConcurrentHashMap<>(); + offlineStatus = new ConcurrentHashMap<>(8); } public static boolean isCommunicationError(Throwable throwable) { @@ -85,11 +85,14 @@ private static Optional findConfigurationError(Thro @Start public void start() { - config.sites().enabledBackupStream().forEach(bc -> { - final String siteName = bc.site(); - OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName)); - offlineStatus.put(siteName, offline); - }); + String localSiteName = rpcManager.getTransport().localSiteName(); + config.sites().enabledBackupStream() + .filter(bc -> !localSiteName.equals(bc.site())) + .forEach(bc -> { + String siteName = bc.site(); + OfflineStatus offline = new OfflineStatus(bc.takeOffline(), timeService, new Listener(siteName)); + offlineStatus.put(siteName, offline); + }); } @Override @@ -187,11 +190,11 @@ private EventLogger getEventLogger() { return eventLogManager.getEventLogger().context(cacheName).scope(rpcManager.getAddress()); } - private class Listener implements SiteStatusListener { + private final class Listener implements SiteStatusListener { private final String siteName; - private Listener(String siteName) { + Listener(String siteName) { this.siteName = siteName; } diff --git a/core/src/test/java/org/infinispan/xsite/AbstractTwoSitesTest.java b/core/src/test/java/org/infinispan/xsite/AbstractTwoSitesTest.java index 3432ef0ec50d..a842961c682f 100644 --- a/core/src/test/java/org/infinispan/xsite/AbstractTwoSitesTest.java +++ b/core/src/test/java/org/infinispan/xsite/AbstractTwoSitesTest.java @@ -45,8 +45,11 @@ protected void createSites() { ConfigurationBuilder nyc = getNycActiveConfig(); nyc.sites().addBackup() .site(LON) - .strategy(nycBackupStrategy) - .sites().addInUseBackupSite(LON); + .strategy(nycBackupStrategy); + nyc.sites().addBackup() + .site(NYC) + .strategy(nycBackupStrategy); + nyc.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC); createSite(LON, initialClusterSize, globalConfigurationBuilderForSite(LON), lon); createSite(NYC, initialClusterSize, globalConfigurationBuilderForSite(NYC), nyc); @@ -73,9 +76,11 @@ protected ConfigurationBuilder lonConfigurationBuilder() { .strategy(lonBackupStrategy) .failurePolicyClass(lonCustomFailurePolicyClass) .useTwoPhaseCommit(use2Pc) - .stateTransfer().maxRetries(1).waitTime(500) - .sites().addInUseBackupSite(NYC); + .stateTransfer().maxRetries(1).waitTime(500); adaptLONConfiguration(lonBackupConfigurationBuilder); + // it shouldn't backup to itself + lon.sites().addBackup().site(LON).strategy(lonBackupStrategy); + lon.sites().addInUseBackupSite(LON).addInUseBackupSite(NYC); return lon; }