Skip to content

Commit

Permalink
Merge pull request #207 from nacos-group/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
paderlol authored Feb 11, 2021
2 parents fa11559 + a4e453b commit 1713fd8
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 47 deletions.
2 changes: 1 addition & 1 deletion nacossync-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.1</version>
<version>0.4.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nacossync-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.1</version>
<version>0.4.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.1</version>
<version>0.4.2</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.4.1</version>
<version>0.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.1</version>
<version>0.4.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nacossync-worker</artifactId>
<version>0.4.1</version>
<version>0.4.2</version>
<properties>
<zookeeper.version>3.4.9</zookeeper.version>
<curator.version>4.1.0</curator.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public String getClusterConnectKey(String clusterId) {
}

public List<String> getAllClusterConnectKey(String clusterId) {
ClusterDO clusterDOS = clusterAccessService.findByClusterId(clusterId);
ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterId);

List<String> connectKeyList = JSONObject.parseObject(clusterDOS.getConnectKeyList(),
List<String> connectKeyList = JSONObject.parseObject(clusterDO.getConnectKeyList(),
new TypeReference<List<String>>() {
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.google.common.base.Joiner;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,15 +33,29 @@
@Slf4j
public class NacosServerHolder extends AbstractServerHolderImpl<NamingService> {

private final ClusterAccessService clusterAccessService;

public NacosServerHolder(ClusterAccessService clusterAccessService) {
this.clusterAccessService = clusterAccessService;
}

@Override
NamingService createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace)
throws Exception {
List<String> allClusterConnectKey = skyWalkerCacheServices
.getAllClusterConnectKey(clusterId);
ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterId);
String serverList = Joiner.on(",").join(allClusterConnectKey);
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
properties.setProperty(PropertyKeyConst.NAMESPACE, namespace);
Optional.ofNullable(clusterDO.getUserName()).ifPresent(value ->
properties.setProperty(PropertyKeyConst.USERNAME, value)
);

Optional.ofNullable(clusterDO.getPassword()).ifPresent(value ->
properties.setProperty(PropertyKeyConst.PASSWORD, value)
);
return NamingFactory.createNamingService(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,24 @@
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author yangyshdan
* @version $Id: ConfigServerSyncManagerService.java, v 0.1 2018-11-12 下午5:17 NacosSync Exp $$
*/

@Slf4j
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS,destinationCluster = ClusterTypeEnum.NACOS)
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.NACOS)
public class NacosSyncToNacosServiceImpl implements SyncService {

private Map<String, EventListener> nacosListenerMap = new ConcurrentHashMap<>();

@Autowired
Expand All @@ -58,7 +62,7 @@ public boolean delete(TaskDO taskDO) {
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());

sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.remove(taskDO.getTaskId()));

// 删除目标集群中同步的实例列表
List<Instance> instances = destNamingService.getAllInstances(taskDO.getServiceName());
Expand All @@ -85,26 +89,30 @@ public boolean sync(TaskDO taskDO) {
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
if (event instanceof NamingEvent) {
try {
Set instanceKeySet = new HashSet();
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
// 先将新的注册一遍
List<Instance> sourceInstances = ((NamingEvent) event).getInstances();
log.info("任务Id:{},迁入实例数量:{}", taskDO.getTaskId(), sourceInstances.size());

List<Instance> destInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
new ArrayList<>(), false);
// 先删除不存在的
List<String> instanceKeys = sourceInstances.stream().map(this::composeInstanceKey)
.collect(Collectors.toList());
for (Instance instance : destInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeys.contains(composeInstanceKey(instance))) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(),
instance.getPort());
}
}

//再次添加新实例
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(instance, taskDO));
instanceKeySet.add(composeInstanceKey(instance));
}
}

// 再将不存在的删掉
List<Instance> destInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : destInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeySet.contains(composeInstanceKey(instance))) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(),
instance.getPort());
}
}
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
Expand All @@ -126,9 +134,6 @@ private String composeInstanceKey(Instance instance) {
}





public Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(instance.getIp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,15 @@ public class ClusterDO implements Serializable {
*/
private String clusterType;

/**
* The username of the Nacos.
*
*/
private String userName;

/**
* The password of the Nacos.
*/
private String password;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,15 @@ public class ClusterAddRequest extends BaseRequest {
*/
private String clusterType;

/**
* The username of the Nacos.
*
*/
private String userName;

/**
* The password of the Nacos.
*/
private String password;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@
*/
package com.alibaba.nacossync.template.processor;

import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.monitor.MetricsManager;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.exception.SkyWalkerException;
import com.alibaba.nacossync.pojo.result.ClusterAddResult;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.request.ClusterAddRequest;
import com.alibaba.nacossync.pojo.result.ClusterAddResult;
import com.alibaba.nacossync.template.Processor;
import com.alibaba.nacossync.util.SkyWalkerUtil;

import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author NacosSync
Expand All @@ -52,15 +47,16 @@ public class ClusterAddProcessor implements Processor<ClusterAddRequest, Cluster

@Override
public void process(ClusterAddRequest clusterAddRequest, ClusterAddResult clusterAddResult,
Object... others) throws Exception {
Object... others) throws Exception {
ClusterDO clusterDO = new ClusterDO();

if (null == clusterAddRequest.getConnectKeyList() || 0 == clusterAddRequest.getConnectKeyList().size()) {

throw new SkyWalkerException("集群列表不能为空!");
}

if (StringUtils.isBlank(clusterAddRequest.getClusterName()) || StringUtils.isBlank(clusterAddRequest.getClusterType())) {
if (StringUtils.isBlank(clusterAddRequest.getClusterName()) || StringUtils
.isBlank(clusterAddRequest.getClusterType())) {

throw new SkyWalkerException("集群名字或者类型不能为空!");
}
Expand All @@ -81,6 +77,8 @@ public void process(ClusterAddRequest clusterAddRequest, ClusterAddResult cluste
clusterDO.setClusterName(clusterAddRequest.getClusterName());
clusterDO.setClusterType(clusterAddRequest.getClusterType());
clusterDO.setConnectKeyList(JSONObject.toJSONString(clusterAddRequest.getConnectKeyList()));
clusterDO.setUserName(clusterAddRequest.getUserName());
clusterDO.setPassword(clusterAddRequest.getPassword());
clusterAccessService.insert(clusterDO);
}
}
2 changes: 1 addition & 1 deletion nacossync-worker/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
server.port=8081
server.port=8083
server.servlet.context-path=/

spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-parent</artifactId>
<version>0.4.1</version>
<version>0.4.2</version>
<modules>
<module>nacossync-console</module>
<module>nacossync-worker</module>
Expand Down Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.4.1</version>
<version>0.4.2</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit 1713fd8

Please sign in to comment.