Skip to content

Commit

Permalink
Update task executor to handle WorkflowAlreadyCompletedError for sign…
Browse files Browse the repository at this point in the history
…al and cancel workflow (#5956)
  • Loading branch information
Shaddoll committed May 1, 2024
1 parent d8124b2 commit d877674
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 6 deletions.
10 changes: 10 additions & 0 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -265,6 +269,7 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(
taskInfo.TargetWorkflowID,
taskInfo.TargetRunID,
now,
cause,
)
}
return requestCancelExternalExecutionCompleted(
Expand Down Expand Up @@ -479,6 +484,10 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -488,6 +497,7 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(
taskInfo.TargetRunID,
signalInfo.Control,
now,
cause,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *crossClusterSourceTaskExecutorSuite) TestExecuteCancelExecution_Failure
&types.CrossClusterTaskResponse{
TaskType: types.CrossClusterTaskTypeCancelExecution.Ptr(),
TaskState: int16(processingStateInitialized),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowNotExists.Ptr(),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted.Ptr(),
},
func(
mutableState execution.MutableState,
Expand Down
22 changes: 19 additions & 3 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
return err
}
Expand All @@ -656,6 +657,11 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
// for retryable error just return
return err
}
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
if errors.As(err, &alreadyCompletedErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
task,
Expand All @@ -664,6 +670,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
cause,
)
}

Expand Down Expand Up @@ -750,6 +757,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
}

Expand All @@ -769,12 +777,17 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add SignalFailed
// Check to see if the error is non-transient, in which case add RequestCancelFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
return err
}
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if errors.As(err, &alreadyCompletedErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
task,
Expand All @@ -784,6 +797,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
cause,
)
}

Expand Down Expand Up @@ -1419,6 +1433,7 @@ func requestCancelExternalExecutionFailed(
targetWorkflowID string,
targetRunID string,
now time.Time,
cause types.CancelExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1439,7 +1454,7 @@ func requestCancelExternalExecutionFailed(
targetDomain,
targetWorkflowID,
targetRunID,
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand All @@ -1464,6 +1479,7 @@ func signalExternalExecutionFailed(
targetRunID string,
control []byte,
now time.Time,
cause types.SignalExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1485,7 +1501,7 @@ func signalExternalExecutionFailed(
targetWorkflowID,
targetRunID,
control,
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand Down
45 changes: 43 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() {
func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotExistsError() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
Expand All @@ -1067,6 +1067,27 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_WorkflowAlreadyCompleted() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
requestCancelInfo *persistence.RequestCancelInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
cancelRequest := createTestRequestCancelWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), requestCancelInfo.CancelRequestID)
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), cancelRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Duplication() {
s.testProcessCancelExecution(
s.targetDomainID,
Expand Down Expand Up @@ -1202,7 +1223,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() {
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotExistsError() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
Expand All @@ -1223,6 +1244,26 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_WorkflowAlreadyCompletedError() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
signalInfo *persistence.SignalInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
signalRequest := createTestSignalWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), signalInfo)
s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), signalRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() {
s.testProcessSignalExecution(
s.targetDomainID,
Expand Down

0 comments on commit d877674

Please sign in to comment.