Skip to content

Commit

Permalink
[ISSUE alibaba#9210] Persist and sync revision for clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
pixystone committed Sep 26, 2022
1 parent 9f6b505 commit 068e82d
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 20 deletions.
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientSyncData;
Expand All @@ -40,7 +41,6 @@
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.utils.DistroUtils;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -159,7 +159,9 @@ public boolean processData(DistroData distroData) {
}

private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}",
clientSyncData.getClientId(),
clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
upgradeClient(client, clientSyncData);
Expand Down Expand Up @@ -280,7 +282,7 @@ public List<DistroData> getVerifyData() {
}
if (clientManager.isResponsibleClient(client)) {
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
DistroUtils.hash(client));
client.recalculateRevision());
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
Expand Down
Expand Up @@ -35,6 +35,8 @@ public class ClientConstants {

public static final String PERSISTENT_IP_PORT = "persistentIpPort";

public static final String REVISION = "revision";

public static final String PERSISTENT_SUFFIX = "false";

public static final String CLIENT_EXPIRED_TIME_CONFIG_KEY = "nacos.naming.client.expired.time";
Expand Down
Expand Up @@ -25,12 +25,16 @@
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.DistroUtils;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;

/**
* Abstract implementation of {@code Client}.
Expand All @@ -45,8 +49,11 @@ public abstract class AbstractClient implements Client {

protected volatile long lastUpdatedTime;

public AbstractClient() {
private final AtomicLong revision;

public AbstractClient(Long revision) {
lastUpdatedTime = System.currentTimeMillis();
this.revision = new AtomicLong(revision == null ? 0 : revision);
}

@Override
Expand Down Expand Up @@ -151,7 +158,9 @@ public ClientSyncData generateSyncData() {
instances.add(entry.getValue());
}
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
data.getAttributes().addClientAttribute(REVISION, recalculateRevision());
return data;
}

private static BatchInstanceData buildBatchInstanceData(BatchInstanceData batchInstanceData, List<String> batchNamespaces,
Expand All @@ -178,4 +187,17 @@ public void release() {
}
MetricsMonitor.getIpCountMonitor().addAndGet(-1 * subscribers.size());
}

@Override
public long recalculateRevision() {
int hash = DistroUtils.hash(this);
revision.set(hash);
return hash;
}

@Override
public long getRevision() {
return revision.get();
}

}
Expand Up @@ -141,4 +141,15 @@ public interface Client {
* Release current client and release resources if neccessary.
*/
void release();

/**
* Recalculate client revision and get its value.
*/
long recalculateRevision();

/**
* Get client revision.
*/
long getRevision();

}
Expand Up @@ -21,6 +21,8 @@
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient;

import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;

/**
* Client factory for {@link ConnectionBasedClient}.
*
Expand All @@ -35,11 +37,13 @@ public String getType() {

@Override
public ConnectionBasedClient newClient(String clientId, ClientAttributes attributes) {
return new ConnectionBasedClient(clientId, true);
Long revision = attributes.getClientAttribute(REVISION);
return new ConnectionBasedClient(clientId, true, revision);
}

@Override
public ConnectionBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {
return new ConnectionBasedClient(clientId, false);
Long revision = attributes.getClientAttribute(REVISION);
return new ConnectionBasedClient(clientId, false, revision);
}
}
Expand Up @@ -21,6 +21,8 @@
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;

import static com.alibaba.nacos.naming.constants.ClientConstants.REVISION;

/**
* Client factory for ephemeral {@link IpPortBasedClient}.
*
Expand All @@ -35,11 +37,13 @@ public String getType() {

@Override
public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
return new IpPortBasedClient(clientId, true);
long revision = attributes.getClientAttribute(REVISION, 0);
return new IpPortBasedClient(clientId, true, revision);
}

@Override
public IpPortBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {
return new IpPortBasedClient(clientId, true);
long revision = attributes.getClientAttribute(REVISION, 0);
return new IpPortBasedClient(clientId, true, revision);
}
}
Expand Up @@ -41,8 +41,8 @@ public class ConnectionBasedClient extends AbstractClient {
*/
private volatile long lastRenewTime;

public ConnectionBasedClient(String connectionId, boolean isNative) {
super();
public ConnectionBasedClient(String connectionId, boolean isNative, Long revision) {
super(revision);
this.connectionId = connectionId;
this.isNative = isNative;
lastRenewTime = getLastUpdatedTime();
Expand Down
Expand Up @@ -51,6 +51,11 @@ public class IpPortBasedClient extends AbstractClient {
private HealthCheckTaskV2 healthCheckTaskV2;

public IpPortBasedClient(String clientId, boolean ephemeral) {
this(clientId, ephemeral, null);
}

public IpPortBasedClient(String clientId, boolean ephemeral, Long revision) {
super(revision);
this.ephemeral = ephemeral;
this.clientId = clientId;
this.responsibleId = getResponsibleTagFromId();
Expand Down
Expand Up @@ -130,8 +130,13 @@ public boolean isResponsibleClient(Client client) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
ConnectionBasedClient client = clients.get(verifyData.getClientId());
if (null != client) {
client.setLastRenewTime();
return true;
if (client.getRevision() == verifyData.getRevision()) {
client.setLastRenewTime();
return true;
} else {
Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] ConnectionBasedClient revision local={}, remote={}",
client.getRevision(), verifyData.getRevision());
}
}
return false;
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.utils.DistroUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import java.util.Collection;
Expand Down Expand Up @@ -123,10 +124,15 @@ public boolean isResponsibleClient(Client client) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
String clientId = verifyData.getClientId();
IpPortBasedClient client = clients.get(clientId);
if (null != client && DistroUtils.hash(client) == (int) verifyData.getRevision()) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
if (null != client) {
if (client.getRevision() == verifyData.getRevision()) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
} else {
Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient revision local={}, remote={}",
client.getRevision(), verifyData.getRevision());
}
}
return false;
}
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class AbstractClientTest {

@Before
public void setUp() {
abstractClient = new MockAbstractClient();
abstractClient = new MockAbstractClient(0L);
service = Service.newService("ns1", "group1", "serviceName001");
instancePublishInfo = new InstancePublishInfo("127.0.0.1", 8890);
subscriber = new Subscriber("127.0.0.1:8848", "agent1", "appName", "127.0.0.1",
Expand Down
Expand Up @@ -25,6 +25,10 @@
*/
public class MockAbstractClient extends AbstractClient {

public MockAbstractClient(Long revision) {
super(revision);
}

@Override
public String getClientId() {
return "-1";
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class ConnectionBasedClientTest {

@Before
public void setUp() throws Exception {
connectionBasedClient = new ConnectionBasedClient(connectionId, isNative);
connectionBasedClient = new ConnectionBasedClient(connectionId, isNative, null);
}

@Test
Expand Down
Expand Up @@ -53,7 +53,7 @@ public static void setUpBeforeClass() {

@Before
public void setUp() throws Exception {
ipPortBasedClient = new IpPortBasedClient(clientId, true);
ipPortBasedClient = new IpPortBasedClient(clientId, true, 123L);
ipPortBasedClient.init();
instancePublishInfo = new InstancePublishInfo();
}
Expand Down Expand Up @@ -84,6 +84,16 @@ public void testGetAllInstancePublishInfo() {
assertEquals(allInstancePublishInfo.iterator().next(), instancePublishInfo);
}

@Test
public void testRecalculateRevision() {
assertEquals(123L, ipPortBasedClient.recalculateRevision());
}

@Test
public void testGetRevision() {
assertEquals(123L, ipPortBasedClient.getRevision());
}

@After
public void tearDown() {
ipPortBasedClient.release();
Expand Down

0 comments on commit 068e82d

Please sign in to comment.