diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 12a47006c6a9..28d8c53e33b9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -72,13 +72,15 @@ public class MasterFailoverService { private final NettyExecutorManager nettyExecutorManager; private final LogClient logClient; + private final TaskGroupService taskGroupService; public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull NettyExecutorManager nettyExecutorManager, @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager, - @NonNull LogClient logClient) { + @NonNull LogClient logClient, + @NonNull TaskGroupService taskGroupService) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; @@ -86,6 +88,7 @@ public MasterFailoverService(@NonNull RegistryClient registryClient, this.nettyExecutorManager = nettyExecutorManager; this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.logClient = logClient; + this.taskGroupService = taskGroupService; } @@ -253,6 +256,12 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setFlag(Flag.NO); processService.saveTaskInstance(taskInstance); + + if (taskInstance.getTaskGroupId() > 0) { + LOGGER.info("The failover taskInstance is using taskGroup: {}, will release the taskGroup", + taskInstance.getTaskGroupId()); + taskGroupService.releaseTaskGroup(taskInstance); + } } private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/TaskGroupService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/TaskGroupService.java new file mode 100644 index 000000000000..59383de94a59 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/TaskGroupService.java @@ -0,0 +1,28 @@ +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskGroupService { + + @Autowired + private ProcessService processService; + + public void releaseTaskGroup(TaskInstance taskInstance) { + if (taskInstance.getTaskGroupId() > 0) { + TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); + if (nextTaskInstance != null) { + ProcessInstance processInstance = + this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); + this.processService.sendStartTask2Master(processInstance, + nextTaskInstance.getId(), + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + } + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 0f43c28eb54c..2cbb8732d0ca 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -17,10 +17,6 @@ package org.apache.dolphinscheduler.server.master.service; -import lombok.NonNull; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -43,11 +39,11 @@ import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; -import javax.annotation.Nullable; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; + import java.util.Date; import java.util.HashMap; import java.util.List; @@ -56,6 +52,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import lombok.NonNull; + @Service public class WorkerFailoverService { @@ -67,6 +71,7 @@ public class WorkerFailoverService { private final WorkflowExecuteThreadPool workflowExecuteThreadPool; private final ProcessInstanceExecCacheManager cacheManager; private final LogClient logClient; + private final TaskGroupService taskGroupService; private final String localAddress; public WorkerFailoverService(@NonNull RegistryClient registryClient, @@ -74,7 +79,8 @@ public WorkerFailoverService(@NonNull RegistryClient registryClient, @NonNull ProcessService processService, @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, @NonNull ProcessInstanceExecCacheManager cacheManager, - @NonNull LogClient logClient) { + @NonNull LogClient logClient, + @NonNull TaskGroupService taskGroupService) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; @@ -82,6 +88,7 @@ public WorkerFailoverService(@NonNull RegistryClient registryClient, this.cacheManager = cacheManager; this.logClient = logClient; this.localAddress = masterConfig.getMasterAddress(); + this.taskGroupService = taskGroupService; } /** @@ -193,6 +200,12 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non stateEvent.setProcessInstanceId(processInstance.getId()); stateEvent.setExecutionStatus(taskInstance.getState()); workflowExecuteThreadPool.submitStateEvent(stateEvent); + + if (taskInstance.getTaskGroupId() > 0) { + LOGGER.info("The failover taskInstance is using taskGroup: {}, will release the taskGroup", + taskInstance.getTaskGroupId()); + taskGroupService.releaseTaskGroup(taskInstance); + } } /**