Skip to content

Commit

Permalink
[ISSUE alibaba#9210] Using hash as ephemeral clients' revision for DI…
Browse files Browse the repository at this point in the history
…STRO verification.
  • Loading branch information
pixystone committed Sep 26, 2022
1 parent a2b997f commit c9ce6f2
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 26 deletions.
Expand Up @@ -40,6 +40,7 @@
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.utils.DistroUtils;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -226,7 +227,7 @@ private static void processBatchInstanceDistroData(Set<Service> syncedService, C
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
if (clientManager.verifyClient(verifyData.getClientId())) {
if (clientManager.verifyClient(verifyData)) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
Expand Down Expand Up @@ -271,19 +272,22 @@ public DistroData getDatumSnapshot() {

@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>();
List<DistroData> result = null;
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
if (clientManager.isResponsibleClient(client)) {
// TODO add revision for client.
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
DistroUtils.hash(client));
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
data.setType(DataOperation.VERIFY);
if (result == null) {
result = new LinkedList<>();
}
result.add(data);
}
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.naming.core.v2.client.manager;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;

Expand Down Expand Up @@ -96,8 +97,8 @@ public interface ClientManager {
/**
* verify client.
*
* @param clientId client id
* @param verifyData verify data from remote responsible server
* @return true if client is valid, otherwise is false.
*/
boolean verifyClient(String clientId);
boolean verifyClient(DistroClientVerifyInfo verifyData);
}
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.naming.core.v2.client.manager;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
Expand Down Expand Up @@ -96,8 +97,8 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public boolean verifyClient(String clientId) {
return getClientManagerById(clientId).verifyClient(clientId);
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);
}

private ClientManager getClientManagerById(String clientId) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
Expand Down Expand Up @@ -126,8 +127,8 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public boolean verifyClient(String clientId) {
ConnectionBasedClient client = clients.get(clientId);
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
ConnectionBasedClient client = clients.get(verifyData.getClientId());
if (null != client) {
client.setLastRenewTime();
return true;
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.v2.client.Client;
Expand All @@ -33,6 +34,7 @@
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.utils.DistroUtils;
import org.springframework.stereotype.Component;

import java.util.Collection;
Expand Down Expand Up @@ -118,9 +120,10 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public boolean verifyClient(String clientId) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
String clientId = verifyData.getClientId();
IpPortBasedClient client = clients.get(clientId);
if (null != client) {
if (null != client && DistroUtils.hash(client) == (int) verifyData.getRevision()) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.core.v2.client.manager.impl;

import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientAttributes;
Expand Down Expand Up @@ -114,7 +115,7 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public boolean verifyClient(String clientId) {
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
throw new UnsupportedOperationException("");
}

Expand Down
194 changes: 194 additions & 0 deletions naming/src/main/java/com/alibaba/nacos/naming/utils/DistroUtils.java
@@ -0,0 +1,194 @@
package com.alibaba.nacos.naming.utils;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.apache.commons.lang.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.alibaba.nacos.naming.constants.Constants.DEFAULT_INSTANCE_WEIGHT;
import static com.alibaba.nacos.naming.constants.Constants.PUBLISH_INSTANCE_ENABLE;
import static com.alibaba.nacos.naming.constants.Constants.PUBLISH_INSTANCE_WEIGHT;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DEFAULT_CLUSTER_NAME;

/**
* Utils to generate revision/checksum of distro clients.
*
* @author Pixy Yuan
* on 2021/10/9
*/
public class DistroUtils {

/**
* Build service key.
*/
public static String serviceKey(Service service) {
return service.getNamespace()
+ "##"
+ service.getGroupedServiceName()
+ "##"
+ service.isEphemeral();
}

/**
* Calculate revision for client.
*/
public static int revision(Client client) {
String s = buildUniqueString(client);
if (s == null) {
return 0;
}
return s.hashCode();
}

/**
* Calculate hash for client. Reduce strings in memory and cpu costs.
*/
public static int hash(Client client) {
if (!(client instanceof IpPortBasedClient)) {
return 0;
}
return Objects.hash(client.getClientId(),
client.getAllPublishedService().stream()
.sorted(Comparator.comparing(DistroUtils::serviceKey))
.map(s -> {
InstancePublishInfo ip = client.getInstancePublishInfo(s);
double weight = getWeight(ip);
Boolean enabled = getEnabled(ip);
String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME);
return Objects.hash(serviceKey(s),
ip.getIp(),
ip.getPort(),
weight,
ip.isHealthy(),
enabled,
cluster,
ip.getExtendDatum()
);
})
.collect(Collectors.toSet()),
client.getAllSubscribeService().stream()
.sorted(Comparator.comparing(DistroUtils::serviceKey))
.map(s -> {
Subscriber subscriber = client.getSubscriber(s);
String cluster = StringUtils.defaultIfBlank(subscriber.getCluster(), DEFAULT_CLUSTER_NAME);
String agent = StringUtils.defaultIfBlank(subscriber.getAgent(), StringUtils.EMPTY);
String app = StringUtils.defaultIfBlank(subscriber.getApp(), StringUtils.EMPTY);
return Objects.hash(serviceKey(s),
subscriber.getAddrStr(),
cluster,
agent,
app
);
})
.collect(Collectors.toSet())
);
}

/**
* Calculate checksum for client.
*/
public static String checksum(Client client) {
String s = buildUniqueString(client);
if (s == null) {
return "0";
}
return MD5Utils.md5Hex(s, Constants.ENCODE);
}

/**
* Calculate unique string for client.
*/
public static String buildUniqueString(Client client) {
if (!(client instanceof IpPortBasedClient)) {
return null;
}
StringBuilder sb = new StringBuilder();
sb.append(client.getClientId()).append('|');
client.getAllPublishedService().stream()
.sorted(Comparator.comparing(DistroUtils::serviceKey))
.forEach(s -> {
InstancePublishInfo ip = client.getInstancePublishInfo(s);
double weight = getWeight(ip);
Boolean enabled = getEnabled(ip);
String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME);
sb.append(serviceKey(s)).append('_')
.append(ip.getIp()).append(':').append(ip.getPort()).append('_')
.append(weight).append('_')
.append(ip.isHealthy()).append('_')
.append(enabled).append('_')
.append(cluster).append('_')
.append(convertMap2String(ip.getExtendDatum()))
.append(',');
});
sb.append('|');
client.getAllSubscribeService().stream()
.sorted(Comparator.comparing(DistroUtils::serviceKey))
.forEach(s -> {
Subscriber subscriber = client.getSubscriber(s);
String cluster = StringUtils.defaultIfBlank(subscriber.getCluster(), DEFAULT_CLUSTER_NAME);
String agent = StringUtils.defaultIfBlank(subscriber.getAgent(), StringUtils.EMPTY);
String app = StringUtils.defaultIfBlank(subscriber.getApp(), StringUtils.EMPTY);
sb.append(serviceKey(s)).append('_')
.append(subscriber.getAddrStr()).append('_')
.append(cluster).append('_')
.append(agent).append('_')
.append(app)
.append(',');
});
sb.append('|');
return sb.toString();
}

private static boolean getEnabled(InstancePublishInfo ip) {
Object enabled0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_ENABLE);
if (!(enabled0 instanceof Boolean)) {
return true;
} else {
return (Boolean) enabled0;
}
}

private static double getWeight(InstancePublishInfo ip) {
Object weight0 = ip.getExtendDatum().get(PUBLISH_INSTANCE_WEIGHT);
if (!(weight0 instanceof Number)) {
return DEFAULT_INSTANCE_WEIGHT;
} else {
return ((Number) weight0).doubleValue();
}
}

/**
* Convert Map to KV string with ':'.
*
* @param map map need to be converted
* @return KV string with ':'
*/
private static String convertMap2String(Map<String, Object> map) {
if (map == null || map.isEmpty()) {
return StringUtils.EMPTY;
}
StringBuilder sb = new StringBuilder();
List<String> keys = new ArrayList<>(map.keySet());
Collections.sort(keys);
for (String key : keys) {
sb.append(key);
sb.append(':');
sb.append(map.get(key));
sb.append(',');
}
return sb.toString();
}

}
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.naming.core.v2.client.manager;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientVerifyInfo;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager;
Expand Down Expand Up @@ -86,18 +87,20 @@ public void testChooseConnectionClientForV6() {

@Test
public void testChooseEphemeralIpPortClient() {
delegate.verifyClient(ephemeralIpPortId);
verify(connectionBasedClientManager, never()).verifyClient(ephemeralIpPortId);
verify(ephemeralIpPortClientManager).verifyClient(ephemeralIpPortId);
verify(persistentIpPortClientManager, never()).verifyClient(ephemeralIpPortId);
DistroClientVerifyInfo verify = new DistroClientVerifyInfo(ephemeralIpPortId, 0);
delegate.verifyClient(verify);
verify(connectionBasedClientManager, never()).verifyClient(verify);
verify(ephemeralIpPortClientManager).verifyClient(verify);
verify(persistentIpPortClientManager, never()).verifyClient(verify);
}

@Test
public void testChoosePersistentIpPortClient() {
delegate.verifyClient(persistentIpPortId);
verify(connectionBasedClientManager, never()).verifyClient(persistentIpPortId);
verify(ephemeralIpPortClientManager, never()).verifyClient(persistentIpPortId);
verify(persistentIpPortClientManager).verifyClient(persistentIpPortId);
DistroClientVerifyInfo verify = new DistroClientVerifyInfo(persistentIpPortId, 0);
delegate.verifyClient(verify);
verify(connectionBasedClientManager, never()).verifyClient(verify);
verify(ephemeralIpPortClientManager, never()).verifyClient(verify);
verify(persistentIpPortClientManager).verifyClient(verify);
}

@Test
Expand Down

0 comments on commit c9ce6f2

Please sign in to comment.