Skip to content

Commit

Permalink
Fix record child workflow complete mutable state stale check (#2673)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 29, 2022
1 parent 3af96c6 commit a274580
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 22 deletions.
63 changes: 45 additions & 18 deletions service/history/historyEngine.go
Expand Up @@ -1190,7 +1190,8 @@ func (e *historyEngineImpl) ResetStickyTaskQueue(
ctx,
namespaceID,
*resetRequest.Execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -1385,7 +1386,8 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -1535,7 +1537,8 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1614,7 +1617,8 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
var taskQueue string
var workflowTypeName string
err = e.updateWorkflowExecution(ctx, namespaceID, workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1716,7 +1720,8 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1810,7 +1815,8 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
e.logger.Debug("Heartbeat failed")
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1883,7 +1889,8 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution(
}

return e.updateWorkflow(ctx, namespaceID, execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
// the request to cancel this workflow is a success even
// if the target workflow has already finished
Expand Down Expand Up @@ -1951,7 +1958,8 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) {
return &updateWorkflowAction{
noop: true,
Expand Down Expand Up @@ -2259,7 +2267,8 @@ func (e *historyEngineImpl) RemoveSignalMutableState(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -2299,7 +2308,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -2375,7 +2385,8 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand All @@ -2387,7 +2398,22 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
// Check mutable state to make sure child execution is in pending child executions
ci, isRunning := mutableState.GetChildExecutionInfo(initiatedID)
if !isRunning && initiatedID >= mutableState.GetNextEventID() {
return nil, consts.ErrStaleState
// possible stale mutable state, try reload mutable state
//
// TODO: use initiate event ID and version to verify if the child exists or not
//
// NOTE: do not return ErrStaleState here, as in xdc there's no guarantee that parent
// will have the child information and its next eventID will larger than the initiatedID
// in the request after forced failover.
// If ErrStaleState is returned, the logic for this handler and processing of CloseWorkflowExecution
// task will keep retrying infinitely.
workflowContext.getContext().Clear()
mutableState, err = workflowContext.reloadMutableState(ctx)
if err != nil {
return nil, err
}

ci, isRunning = mutableState.GetChildExecutionInfo(initiatedID)
}
if !isRunning || ci.StartedId == common.EmptyEventID {
return nil, serviceerror.NewNotFound("Pending child execution not found.")
Expand Down Expand Up @@ -2663,11 +2689,8 @@ func (e *historyEngineImpl) updateWorkflowWithNewHelper(

UpdateHistoryLoop:
for attempt := 1; attempt <= conditionalRetryCount; attempt++ {
weContext := workflowContext.getContext()
mutableState := workflowContext.getMutableState()

// conduct caller action
postActions, err := action(weContext, mutableState)
postActions, err := action(workflowContext)
if err != nil {
if err == consts.ErrStaleState {
// Handler detected that cached workflow mutable could potentially be stale
Expand All @@ -2689,6 +2712,7 @@ UpdateHistoryLoop:
return nil
}

mutableState := workflowContext.getMutableState()
if postActions.createWorkflowTask {
// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() {
Expand Down Expand Up @@ -3064,7 +3088,8 @@ func (e *historyEngineImpl) applyWorkflowIDReusePolicyHelper(
case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING:
if wfIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
return func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
return func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -3196,7 +3221,9 @@ func (e *historyEngineImpl) ReapplyEvents(
ctx,
namespaceID,
currentExecution,
func(context workflow.Context, mutableState workflow.MutableState) (action *updateWorkflowAction, retErr error) {
func(workflowContext workflowContext) (action *updateWorkflowAction, retErr error) {
context := workflowContext.getContext()
mutableState := workflowContext.getMutableState()
// Filter out reapply event from the same cluster
toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents))
lastWriteVersion, err := mutableState.GetLastWriteVersion()
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowExecutionUtil.go
Expand Up @@ -59,7 +59,7 @@ var (
}
)

type updateWorkflowActionFunc func(workflow.Context, workflow.MutableState) (*updateWorkflowAction, error)
type updateWorkflowActionFunc func(workflowContext) (*updateWorkflowAction, error)

func (w *workflowContextImpl) getContext() workflow.Context {
return w.context
Expand Down
9 changes: 6 additions & 3 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -128,7 +128,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -177,7 +178,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -281,7 +283,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down

0 comments on commit a274580

Please sign in to comment.