From 068e82d4e636c2828f96bb85077fd11c55657bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E8=BE=B0?= Date: Thu, 10 Feb 2022 13:57:47 +0800 Subject: [PATCH] [ISSUE #9210] Persist and sync revision for clients. --- .../distro/v2/DistroClientDataProcessor.java | 8 +++--- .../naming/constants/ClientConstants.java | 2 ++ .../naming/core/v2/client/AbstractClient.java | 26 +++++++++++++++++-- .../nacos/naming/core/v2/client/Client.java | 11 ++++++++ .../impl/ConnectionBasedClientFactory.java | 8 ++++-- .../impl/EphemeralIpPortClientFactory.java | 8 ++++-- .../v2/client/impl/ConnectionBasedClient.java | 4 +-- .../v2/client/impl/IpPortBasedClient.java | 5 ++++ .../impl/ConnectionBasedClientManager.java | 9 +++++-- .../impl/EphemeralIpPortClientManager.java | 14 +++++++--- .../core/v2/client/AbstractClientTest.java | 2 +- .../core/v2/client/MockAbstractClient.java | 4 +++ .../impl/ConnectionBasedClientTest.java | 2 +- .../v2/client/impl/IpPortBasedClientTest.java | 12 ++++++++- 14 files changed, 95 insertions(+), 20 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java index 4a0ff91e75d..fbeb0cf3e2f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java @@ -26,6 +26,7 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroData; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.naming.cluster.transport.Serializer; +import com.alibaba.nacos.naming.constants.ClientConstants; import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.client.Client; import com.alibaba.nacos.naming.core.v2.client.ClientSyncData; @@ -40,7 +41,6 @@ import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.utils.DistroUtils; import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.utils.ApplicationUtils; import org.apache.commons.collections.CollectionUtils; @@ -159,7 +159,9 @@ public boolean processData(DistroData distroData) { } private void handlerClientSyncData(ClientSyncData clientSyncData) { - Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); + Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}", + clientSyncData.getClientId(), + clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L)); clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); Client client = clientManager.getClient(clientSyncData.getClientId()); upgradeClient(client, clientSyncData); @@ -280,7 +282,7 @@ public List getVerifyData() { } if (clientManager.isResponsibleClient(client)) { DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), - DistroUtils.hash(client)); + client.recalculateRevision()); DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java b/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java index 1a0724ff5a8..cab996df64e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/constants/ClientConstants.java @@ -35,6 +35,8 @@ public class ClientConstants { public static final String PERSISTENT_IP_PORT = "persistentIpPort"; + public static final String REVISION = "revision"; + public static final String PERSISTENT_SUFFIX = "false"; public static final String CLIENT_EXPIRED_TIME_CONFIG_KEY = "nacos.naming.client.expired.time"; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java index a51e6ac01a9..825857cb43c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java @@ -25,12 +25,16 @@ import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.naming.pojo.Subscriber; +import com.alibaba.nacos.naming.utils.DistroUtils; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION; /** * Abstract implementation of {@code Client}. @@ -45,8 +49,11 @@ public abstract class AbstractClient implements Client { protected volatile long lastUpdatedTime; - public AbstractClient() { + private final AtomicLong revision; + + public AbstractClient(Long revision) { lastUpdatedTime = System.currentTimeMillis(); + this.revision = new AtomicLong(revision == null ? 0 : revision); } @Override @@ -151,7 +158,9 @@ public ClientSyncData generateSyncData() { instances.add(entry.getValue()); } } - return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData); + ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData); + data.getAttributes().addClientAttribute(REVISION, recalculateRevision()); + return data; } private static BatchInstanceData buildBatchInstanceData(BatchInstanceData batchInstanceData, List batchNamespaces, @@ -178,4 +187,17 @@ public void release() { } MetricsMonitor.getIpCountMonitor().addAndGet(-1 * subscribers.size()); } + + @Override + public long recalculateRevision() { + int hash = DistroUtils.hash(this); + revision.set(hash); + return hash; + } + + @Override + public long getRevision() { + return revision.get(); + } + } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java index 58f3b2189db..38c9a55a418 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/Client.java @@ -141,4 +141,15 @@ public interface Client { * Release current client and release resources if neccessary. */ void release(); + + /** + * Recalculate client revision and get its value. + */ + long recalculateRevision(); + + /** + * Get client revision. + */ + long getRevision(); + } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java index f38db8a952e..6684359853f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/ConnectionBasedClientFactory.java @@ -21,6 +21,8 @@ import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory; import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient; +import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION; + /** * Client factory for {@link ConnectionBasedClient}. * @@ -35,11 +37,13 @@ public String getType() { @Override public ConnectionBasedClient newClient(String clientId, ClientAttributes attributes) { - return new ConnectionBasedClient(clientId, true); + Long revision = attributes.getClientAttribute(REVISION); + return new ConnectionBasedClient(clientId, true, revision); } @Override public ConnectionBasedClient newSyncedClient(String clientId, ClientAttributes attributes) { - return new ConnectionBasedClient(clientId, false); + Long revision = attributes.getClientAttribute(REVISION); + return new ConnectionBasedClient(clientId, false, revision); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/EphemeralIpPortClientFactory.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/EphemeralIpPortClientFactory.java index 620416f2307..a499fb3cee7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/EphemeralIpPortClientFactory.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/factory/impl/EphemeralIpPortClientFactory.java @@ -21,6 +21,8 @@ import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory; import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION; + /** * Client factory for ephemeral {@link IpPortBasedClient}. * @@ -35,11 +37,13 @@ public String getType() { @Override public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) { - return new IpPortBasedClient(clientId, true); + long revision = attributes.getClientAttribute(REVISION, 0); + return new IpPortBasedClient(clientId, true, revision); } @Override public IpPortBasedClient newSyncedClient(String clientId, ClientAttributes attributes) { - return new IpPortBasedClient(clientId, true); + long revision = attributes.getClientAttribute(REVISION, 0); + return new IpPortBasedClient(clientId, true, revision); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClient.java index 83070e7e820..78ede54d45c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClient.java @@ -41,8 +41,8 @@ public class ConnectionBasedClient extends AbstractClient { */ private volatile long lastRenewTime; - public ConnectionBasedClient(String connectionId, boolean isNative) { - super(); + public ConnectionBasedClient(String connectionId, boolean isNative, Long revision) { + super(revision); this.connectionId = connectionId; this.isNative = isNative; lastRenewTime = getLastUpdatedTime(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java index a855ebfaf57..2a4eb203115 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java @@ -51,6 +51,11 @@ public class IpPortBasedClient extends AbstractClient { private HealthCheckTaskV2 healthCheckTaskV2; public IpPortBasedClient(String clientId, boolean ephemeral) { + this(clientId, ephemeral, null); + } + + public IpPortBasedClient(String clientId, boolean ephemeral, Long revision) { + super(revision); this.ephemeral = ephemeral; this.clientId = clientId; this.responsibleId = getResponsibleTagFromId(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java index c25efde27f6..33bae0edcd6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/ConnectionBasedClientManager.java @@ -130,8 +130,13 @@ public boolean isResponsibleClient(Client client) { public boolean verifyClient(DistroClientVerifyInfo verifyData) { ConnectionBasedClient client = clients.get(verifyData.getClientId()); if (null != client) { - client.setLastRenewTime(); - return true; + if (client.getRevision() == verifyData.getRevision()) { + client.setLastRenewTime(); + return true; + } else { + Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] ConnectionBasedClient revision local={}, remote={}", + client.getRevision(), verifyData.getRevision()); + } } return false; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java index e7cd1a3313e..f82680dbf02 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/EphemeralIpPortClientManager.java @@ -35,6 +35,7 @@ import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.utils.DistroUtils; +import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import java.util.Collection; @@ -123,10 +124,15 @@ public boolean isResponsibleClient(Client client) { public boolean verifyClient(DistroClientVerifyInfo verifyData) { String clientId = verifyData.getClientId(); IpPortBasedClient client = clients.get(clientId); - if (null != client && DistroUtils.hash(client) == (int) verifyData.getRevision()) { - NamingExecuteTaskDispatcher.getInstance() - .dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client)); - return true; + if (null != client) { + if (client.getRevision() == verifyData.getRevision()) { + NamingExecuteTaskDispatcher.getInstance() + .dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client)); + return true; + } else { + Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient revision local={}, remote={}", + client.getRevision(), verifyData.getRevision()); + } } return false; } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java index a66188264e3..4544809c1c0 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/AbstractClientTest.java @@ -47,7 +47,7 @@ public class AbstractClientTest { @Before public void setUp() { - abstractClient = new MockAbstractClient(); + abstractClient = new MockAbstractClient(0L); service = Service.newService("ns1", "group1", "serviceName001"); instancePublishInfo = new InstancePublishInfo("127.0.0.1", 8890); subscriber = new Subscriber("127.0.0.1:8848", "agent1", "appName", "127.0.0.1", diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java index 7bfd557edae..e38baf9fc45 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/MockAbstractClient.java @@ -25,6 +25,10 @@ */ public class MockAbstractClient extends AbstractClient { + public MockAbstractClient(Long revision) { + super(revision); + } + @Override public String getClientId() { return "-1"; diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java index d5a61f7b7bc..3492fa3bda4 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/ConnectionBasedClientTest.java @@ -32,7 +32,7 @@ public class ConnectionBasedClientTest { @Before public void setUp() throws Exception { - connectionBasedClient = new ConnectionBasedClient(connectionId, isNative); + connectionBasedClient = new ConnectionBasedClient(connectionId, isNative, null); } @Test diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java index 28b1d9f946c..36727cb644d 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClientTest.java @@ -53,7 +53,7 @@ public static void setUpBeforeClass() { @Before public void setUp() throws Exception { - ipPortBasedClient = new IpPortBasedClient(clientId, true); + ipPortBasedClient = new IpPortBasedClient(clientId, true, 123L); ipPortBasedClient.init(); instancePublishInfo = new InstancePublishInfo(); } @@ -84,6 +84,16 @@ public void testGetAllInstancePublishInfo() { assertEquals(allInstancePublishInfo.iterator().next(), instancePublishInfo); } + @Test + public void testRecalculateRevision() { + assertEquals(123L, ipPortBasedClient.recalculateRevision()); + } + + @Test + public void testGetRevision() { + assertEquals(123L, ipPortBasedClient.getRevision()); + } + @After public void tearDown() { ipPortBasedClient.release();