Skip to content

Commit

Permalink
Optimize the code for assigning tasks. (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
paderlol committed May 15, 2023
1 parent 98efe26 commit 0e73a08
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 200 deletions.
8 changes: 8 additions & 0 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class TaskUpdateProcessor implements Processor<TaskUpdateRequest, BaseRes
@Autowired
private TaskAccessService taskAccessService;

private Map<String,String> TaskIdAndOperationIdMap = new ConcurrentHashMap<>();
private Map<String,String> taskIdAndOperationIdMap = new ConcurrentHashMap<>();

@Override
public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult,
Expand All @@ -64,14 +64,14 @@ public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult,

taskDO.setTaskStatus(taskUpdateRequest.getTaskStatus());
//在id生成之前保存好操作id,可以在删除操作里面进行
TaskIdAndOperationIdMap.put(taskDO.getTaskId(),taskDO.getOperationId());
taskIdAndOperationIdMap.put(taskDO.getTaskId(),taskDO.getOperationId());

taskDO.setOperationId(SkyWalkerUtil.generateOperationId());

taskAccessService.addTask(taskDO);
}

public String getTaskIdAndOperationIdMap(String taskId) {
return TaskIdAndOperationIdMap.get(taskId);
return taskIdAndOperationIdMap.remove(taskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@

package com.alibaba.nacossync.timer;

import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.extension.SyncManagerService;
import com.alibaba.nacossync.extension.impl.NacosSyncToNacosServiceImpl;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.Tuple;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import static com.alibaba.nacossync.constant.SkyWalkerConstants.MAX_THREAD_NUM;

/**
* multi-threaded synchronization Task DO.
*
* @ClassName: FastSyncHelper
* @Author: ChenHao26
* @Date: 2022/7/19 17:02
Expand All @@ -47,116 +50,94 @@
@Slf4j
public class FastSyncHelper {

@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
private final SkyWalkerCacheServices skyWalkerCacheServices;

private final MetricsManager metricsManager;

private final SyncManagerService syncManagerService;

@Autowired
private MetricsManager metricsManager;
private final NacosSyncToNacosServiceImpl nacosSyncToNacosService;

@Autowired
private SyncManagerService syncManagerService;
private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);

public FastSyncHelper(SkyWalkerCacheServices skyWalkerCacheServices, MetricsManager metricsManager,
SyncManagerService syncManagerService, NacosSyncToNacosServiceImpl nacosSyncToNacosService) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.metricsManager = metricsManager;
this.syncManagerService = syncManagerService;
this.nacosSyncToNacosService = nacosSyncToNacosService;
}

@Autowired
private NacosSyncToNacosServiceImpl nacosSyncToNacosService;

/**
* every 200 services start a thread to perform synchronization.
*
* @param taskDOS task list
*/
public void syncWithThread(List<TaskDO> taskDOS) {
long startTime = System.currentTimeMillis();
List<List<TaskDO>> taskGroupList = averageAssign(taskDOS, MAX_THREAD_NUM);
CountDownLatch countDownLatch = new CountDownLatch(taskGroupList.size());

// 创建分组线程,每个线程执行自己的任务
for (int i = 0; i < taskGroupList.size(); i++) {
new SyncThread(i, countDownLatch, taskGroupList.get(i)).start();
}
try {
countDownLatch.await();
}catch (InterruptedException exception) {
exception.printStackTrace();
}

log.info("新增同步任务数量 {}, 执行耗时:{}ms",taskDOS.size() , System.currentTimeMillis() - startTime);
sync(taskDOS, tuple -> {
for (TaskDO task : tuple.getT2()) {
//执行兜底的定时同步
nacosSyncToNacosService.timeSync(task);
}
});
}


/**
* every 200 services start a thread to perform synchronization.
* @param taskDO task info
*
* @param taskDO task info
* @param filterServices filterServices
*/
public void syncWithThread(TaskDO taskDO, List<String> filterServices) {
long start=System.currentTimeMillis();
List<List<String>> lists = averageAssign(filterServices, MAX_THREAD_NUM);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());

//创建分组线程,每个线程执行自己的任务
for (int i = 0; i < lists.size(); i++) {
new SyncThread(i, countDownLatch, lists.get(i), taskDO).start();
}
public void syncWithThread(TaskDO taskDO, List<String> filterServices) {
sync(filterServices, tuple -> {
// 执行数据同步
for (String serviceName : tuple.getT2()) {
syncByIndex(taskDO, serviceName, tuple.getT1());
}
});
}

public <T> void sync(List<T> items, Consumer<Tuple<Integer, List<T>>> itemConsumer) {
long startTime = System.currentTimeMillis();
List<Tuple<Integer, List<T>>> taskGroupList = averageAssign(items, MAX_THREAD_NUM);

// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(taskGroupList.stream()
.map(tuple -> CompletableFuture.runAsync(() -> performSync(tuple, itemConsumer), executorService))
.toArray(CompletableFuture[]::new));
try {
countDownLatch.await();
} catch (InterruptedException e) {
allTasks.get();
} catch (Exception e) {
e.printStackTrace();
}
log.info("新增同步任务数量 {} ,执行耗时:{} ms",filterServices.size(),System.currentTimeMillis() - start);
}

class SyncThread extends Thread {
private int index;

private CountDownLatch countDownLatch;

private List<String> serviceNames;

private List<TaskDO> taskDOS;

private TaskDO taskDO;

public SyncThread(int index, CountDownLatch countDownLatch, List<String> serviceNames, TaskDO taskDO) {
this.index = index;
this.countDownLatch = countDownLatch;
this.serviceNames = serviceNames;
this.taskDO = taskDO;
}
log.info("新增同步任务数量 {}, 执行耗时:{}ms", items.size(), System.currentTimeMillis() - startTime);
}

public SyncThread(int index, CountDownLatch countDownLatch, List<TaskDO> taskDOS) {
this.index = index;
this.countDownLatch = countDownLatch;
this.taskDOS = taskDOS;
private <T> void performSync(Tuple<Integer, List<T>> tuple, Consumer<Tuple<Integer, List<T>>> itemConsumer) {
if (tuple == null || tuple.getT2() == null || tuple.getT2().isEmpty()) {
return;
}
itemConsumer.accept(tuple);

//创建任务,执行数据同步
@Override
public void run() {
if (!CollectionUtils.isEmpty(serviceNames)) {
// 执行数据同步
for (String serviceName : serviceNames) {
sync(taskDO, serviceName, index);
}
} else {
for (TaskDO taskDO : taskDOS) {
//执行兜底的定时同步
nacosSyncToNacosService.timeSync(taskDO);
}
}
countDownLatch.countDown();
}
}

private void sync(TaskDO taskDO, String serviceName, int index) {


private void syncByIndex(TaskDO taskDO, String serviceName, int index) {
long startTime = System.currentTimeMillis();
TaskDO task = new TaskDO();
BeanUtils.copyProperties(taskDO,task);
BeanUtils.copyProperties(taskDO, task);
task.setServiceName(serviceName);
task.setOperationId(taskDO.getTaskId() + serviceName);
if (syncManagerService.sync(task,index)) {
if (syncManagerService.sync(task, index)) {
skyWalkerCacheServices.addFinishedTask(task);
log.info("sync thread : {} sync finish ,time consuming :{}", Thread.currentThread().getId(), System.currentTimeMillis() - startTime);
log.info("sync thread : {} sync finish ,time consuming :{}", Thread.currentThread().getId(),
System.currentTimeMillis() - startTime);
metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - startTime);
}else {
} else {
log.warn("listenerSyncTaskEvent sync failure.");
}
}
Expand All @@ -165,32 +146,28 @@ private void sync(TaskDO taskDO, String serviceName, int index) {
* 将一个List均分成n个list,主要通过偏移量来实现的
*
* @param source 源集合
* @param limit 最大值
* @param limit 最大值
* @return
*/
public static <T> List<List<T>> averageAssign(List<T> source, int limit) {
public static <T> List<Tuple<Integer, List<T>>> averageAssign(List<T> source, int limit) {
if (null == source || source.isEmpty()) {
return Collections.emptyList();
}
List<List<T>> result = new ArrayList<>();
int listCount = (source.size() - 1) / limit + 1;
// (先计算出余数)
int remaider = source.size() % listCount;
// 然后是商
int number = source.size() / listCount;
// 偏移量
int offset = 0;
int size = source.size();
List<Tuple<Integer, List<T>>> result = new ArrayList<>();
// 通过减去1并加1,我们可以确保将多余的元素放在最后一个子列表中。在上述示例中,计算结果为 ((10 - 1) / 3 + 1) = 4,我们创建了4个子列表,其中最后一个子列表包含2个元素,而不是1个。这样可以更均匀地分配源列表的元素.
int listCount = (int) Math.ceil((double) source.size() / limit); // 计算子列表数量,使用 Math.ceil 向上取整,确保多余的元素放在最后一个子列表中
int remainder = source.size() % listCount; // 计算多余的元素数量
int assigned = 0; // 记录已分配的元素索引
for (int i = 0; i < listCount; i++) {
List<T> value;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
int sublistSize = size / listCount + (remainder-- > 0 ? 1 : 0); // 计算子列表大小,平均分配元素,并在有多余元素时将其分配到子列表中
List<T> sublist = source.subList(assigned, assigned + sublistSize); // 获取子列表
result.add(Tuple.of(i, sublist)); // 将子列表添加到结果列表
assigned += sublistSize; // 更新已分配的元素索引
}

return result;
}


}
Loading

0 comments on commit 0e73a08

Please sign in to comment.