Skip to content

Commit

Permalink
#346 #350:nacos 2 nacos 同步关闭后心跳没有停止问题 ,nacos-sync删除目标节点问题 (#347)
Browse files Browse the repository at this point in the history
* update port

* fix #297 (#298)

Co-authored-by: yangchun2 <yangchun2@joyy.com>

* Revert "fix #297 (#298)" (#318)

This reverts commit a9df169.

* 0.4.9-pre (#325)

* Feat/sync support2.x#mutiple thread sync02 (#304)

* update port

* Multithreading sync

* solve conflict

* imple SyncService

* adapter deregister

* optimization some code

* fix deregister instance equals logic

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>

* Optimize the code for assigning tasks. (#320)

* Develop (#321)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix #305 (#322)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code (#323)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic (#324)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

---------

Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
Co-authored-by: Oliver <wqdyxnbd@163.com>

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:集群信息保存 cluster_level 为 null

* fix:
1、注册时根据中心化逻辑判断同步,但是删除时逻辑不一致问题
2、如果停止同步nameservice为空问题

* 问题在于对destInstances列表的更新方式。在Java中,方法参数是按值传递的。这意味着当你传递一个对象到方法中时,实际上传递的是对象引用的副本。因此,如果你在方法内部改变了这个引用指向的对象(例如,将其指向一个新的对象),这个改变不会影响到原始的对象引用。

在你的代码中,destInstances = newDestInstance;这一行只是改变了destInstances引用在方法内部的指向,而不会改变方法外部传入的destInstances列表对象。这意味着,尽管你筛选出了需要反注册的实例,但这个改变不会反映到方法调用者那里。

---------

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>
Co-authored-by: 杨春 <chun@kpromise.top>
Co-authored-by: yangchun2 <yangchun2@joyy.com>
Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
  • Loading branch information
6 people committed May 15, 2024
1 parent be65db1 commit 0c24dcb
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@
import com.alibaba.nacossync.timer.FastSyncHelper;
import com.alibaba.nacossync.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -126,7 +122,7 @@ public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(),
taskDO.getSourceClusterId());

int level = clusterAccessService.findClusterLevel(taskDO.getSourceClusterId());
if ("ALL".equals(taskDO.getServiceName())) {
String operationId = taskUpdateProcessor.getTaskIdAndOperationIdMap(taskDO.getTaskId());
if (!StringUtils.isEmpty(operationId)) {
Expand All @@ -141,16 +137,27 @@ public boolean delete(TaskDO taskDO) {
String operationKey = taskDO.getTaskId() + serviceName;
skyWalkerCacheServices.removeFinishedTask(operationKey);
allSyncTaskMap.remove(operationKey);
NamingService destNamingService = popNamingService(taskDO);
sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(taskDO.getGroupName()),
listenerMap.remove(taskDO.getTaskId() + serviceName));

TaskDO task = new TaskDO();
BeanUtils.copyProperties(taskDO, task);
task.setServiceName(serviceName);
task.setOperationId(taskDO.getTaskId() + serviceName);
NamingService destNamingService = popNamingService(task);
sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(task.getGroupName()),
listenerMap.remove(task.getTaskId() + serviceName));
if (Objects.isNull(destNamingService)) {
log.warn("task {} not found NamingService", task.getTaskId() + serviceName);
continue;
}
List<Instance> sourceInstances = sourceNamingService.getAllInstances(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false);
getGroupNameOrDefault(task.getGroupName()), new ArrayList<>(), false);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
// if (needSync(instance.getMetadata())) {
// destNamingService.deregisterInstance(serviceName,
// getGroupNameOrDefault(task.getGroupName()), instance.getIp(), instance.getPort());
// }
if (needSync(instance.getMetadata(),level , taskDO.getDestClusterId())){
destNamingService.deregisterInstance(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
getGroupNameOrDefault(task.getGroupName()), instance.getIp(), instance.getPort());
}
}
}
Expand All @@ -169,10 +176,14 @@ public boolean delete(TaskDO taskDO) {

NamingService destNamingService = popNamingService(taskDO);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
if (needSync(instance.getMetadata(),level , taskDO.getDestClusterId())){
destNamingService.deregisterInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), instance);
}
// if (needSync(instance.getMetadata())) {
// destNamingService.deregisterInstance(taskDO.getServiceName(),
// getGroupNameOrDefault(taskDO.getGroupName()), instance);
// }
}
// 移除任务
skyWalkerCacheServices.removeFinishedTask(operationId);
Expand Down Expand Up @@ -371,7 +382,7 @@ private void processDeRegisterInstances(TaskDO taskDO, NamingService destNamingS
if (CollectionUtils.isEmpty(destInstances)) {
return;
}
deRegisterFilter(destInstances, taskDO.getSourceClusterId());
destInstances = deRegisterFilter(destInstances, taskDO.getSourceClusterId());
if (CollectionUtils.isNotEmpty(destInstances)) {
//逐个执行反注册,拿出一个实例即可, 需要处理redo,否则会被重新注册上来
for (Instance destInstance : destInstances) {
Expand All @@ -381,7 +392,7 @@ private void processDeRegisterInstances(TaskDO taskDO, NamingService destNamingS
}
}

private void deRegisterFilter(List<Instance> destInstances, String sourceClusterId) {
private List<Instance> deRegisterFilter(List<Instance> destInstances, String sourceClusterId) {
List<Instance> newDestInstance = new ArrayList<>();
for (Instance destInstance : destInstances) {
Map<String, String> metadata = destInstance.getMetadata();
Expand All @@ -391,7 +402,7 @@ private void deRegisterFilter(List<Instance> destInstances, String sourceCluster
newDestInstance.add(destInstance);
}
}
destInstances = newDestInstance;
return newDestInstance;
}

private boolean needDeregister(String destClusterId, String sourceClusterId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void process(ClusterAddRequest clusterAddRequest, ClusterAddResult cluste
clusterDO.setUserName(clusterAddRequest.getUserName());
clusterDO.setPassword(clusterAddRequest.getPassword());
clusterDO.setNamespace(clusterAddRequest.getNamespace());
clusterDO.setClusterLevel(0);
clusterAccessService.insert(clusterDO);
}
}

0 comments on commit 0c24dcb

Please sign in to comment.