Skip to content

Commit

Permalink
Release task group when failover (apache#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Mar 24, 2023
1 parent 06698e3 commit 7b00816
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
Expand Up @@ -72,20 +72,23 @@ public class MasterFailoverService {
private final NettyExecutorManager nettyExecutorManager;

private final LogClient logClient;
private final TaskGroupService taskGroupService;

public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
@NonNull LogClient logClient) {
@NonNull LogClient logClient,
@NonNull TaskGroupService taskGroupService) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = masterConfig.getMasterAddress();
this.nettyExecutorManager = nettyExecutorManager;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.logClient = logClient;
this.taskGroupService = taskGroupService;

}

Expand Down Expand Up @@ -253,6 +256,12 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
processService.saveTaskInstance(taskInstance);

if (taskInstance.getTaskGroupId() > 0) {
LOGGER.info("The failover taskInstance is using taskGroup: {}, will release the taskGroup",
taskInstance.getTaskGroupId());
taskGroupService.releaseTaskGroup(taskInstance);
}
}

private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
Expand Down
@@ -0,0 +1,28 @@
package org.apache.dolphinscheduler.server.master.service;

import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskGroupService {

@Autowired
private ProcessService processService;

public void releaseTaskGroup(TaskInstance taskInstance) {
if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) {
ProcessInstance processInstance =
this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance,
nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
}
Expand Up @@ -17,10 +17,6 @@

package org.apache.dolphinscheduler.server.master.service;

import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
Expand All @@ -43,11 +39,11 @@
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -56,6 +52,14 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import lombok.NonNull;

@Service
public class WorkerFailoverService {

Expand All @@ -67,21 +71,24 @@ public class WorkerFailoverService {
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
private final LogClient logClient;
private final TaskGroupService taskGroupService;
private final String localAddress;

public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager,
@NonNull LogClient logClient) {
@NonNull LogClient logClient,
@NonNull TaskGroupService taskGroupService) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.logClient = logClient;
this.localAddress = masterConfig.getMasterAddress();
this.taskGroupService = taskGroupService;
}

/**
Expand Down Expand Up @@ -193,6 +200,12 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);

if (taskInstance.getTaskGroupId() > 0) {
LOGGER.info("The failover taskInstance is using taskGroup: {}, will release the taskGroup",
taskInstance.getTaskGroupId());
taskGroupService.releaseTaskGroup(taskInstance);
}
}

/**
Expand Down

0 comments on commit 7b00816

Please sign in to comment.