-
Notifications
You must be signed in to change notification settings - Fork 792
/
request_cancel_workflow_execution.go
112 lines (101 loc) · 4.77 KB
/
request_cancel_workflow_execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (c) 2017-2021 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2021 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package engineimpl
import (
"context"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/workflow"
)
// RequestCancelWorkflowExecution records request cancellation event for workflow execution
func (e *historyEngineImpl) RequestCancelWorkflowExecution(
ctx context.Context,
req *types.HistoryRequestCancelWorkflowExecutionRequest,
) error {
domainEntry, err := e.getActiveDomainByID(req.DomainUUID)
if err != nil {
return err
}
domainID := domainEntry.GetInfo().ID
request := req.CancelRequest
parentExecution := req.ExternalWorkflowExecution
childWorkflowOnly := req.GetChildWorkflowOnly()
workflowExecution := types.WorkflowExecution{
WorkflowID: request.WorkflowExecution.WorkflowID,
}
// If firstExecutionRunID is set on the request always try to cancel currently running execution
if request.GetFirstExecutionRunID() == "" {
workflowExecution.RunID = request.WorkflowExecution.RunID
}
return workflow.UpdateCurrentWithActionFunc(ctx, e.executionCache, e.executionManager, domainID, e.shard.GetDomainCache(), workflowExecution, e.timeSource.Now(),
func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) {
isCancelRequested, cancelRequestID := mutableState.IsCancelRequested()
if !mutableState.IsWorkflowExecutionRunning() {
_, closeStatus := mutableState.GetWorkflowStateCloseStatus()
if isCancelRequested && closeStatus == persistence.WorkflowCloseStatusCanceled {
cancelRequest := req.CancelRequest
if cancelRequest.RequestID != "" && cancelRequest.RequestID == cancelRequestID {
return &workflow.UpdateAction{Noop: true}, nil
}
}
return nil, workflow.ErrAlreadyCompleted
}
executionInfo := mutableState.GetExecutionInfo()
if request.GetFirstExecutionRunID() != "" {
firstRunID := executionInfo.FirstExecutionRunID
if firstRunID == "" {
// This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier
// does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from
// workflow execution started event.
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return nil, err
}
firstRunID = startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID()
}
if request.GetFirstExecutionRunID() != firstRunID {
return nil, &types.EntityNotExistsError{Message: "Workflow execution not found"}
}
}
if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
if parentExecution.GetWorkflowID() != parentWorkflowID ||
parentExecution.GetRunID() != parentRunID {
return nil, workflow.ErrParentMismatch
}
}
if isCancelRequested {
cancelRequest := req.CancelRequest
if cancelRequest.RequestID != "" && cancelRequest.RequestID == cancelRequestID {
return workflow.UpdateWithNewDecision, nil
}
// if we consider workflow cancellation idempotent, then this error is redundant
// this error maybe useful if this API is invoked by external, not decision from transfer queue
return nil, workflow.ErrCancellationAlreadyRequested
}
if _, err := mutableState.AddWorkflowExecutionCancelRequestedEvent(req.CancelRequest.Cause, req); err != nil {
return nil, &types.InternalServiceError{Message: "Unable to cancel workflow execution."}
}
return workflow.UpdateWithNewDecision, nil
})
}