diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 643b21e0879..2b9e4835d8d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1190,7 +1190,8 @@ func (e *historyEngineImpl) ResetStickyTaskQueue( ctx, namespaceID, *resetRequest.Execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -1385,7 +1386,8 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -1535,7 +1537,8 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted( ctx, namespaceID, workflowExecution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() workflowTypeName = mutableState.GetWorkflowType().GetName() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted @@ -1614,7 +1617,8 @@ func (e *historyEngineImpl) RespondActivityTaskFailed( var taskQueue string var workflowTypeName string err = e.updateWorkflowExecution(ctx, namespaceID, workflowExecution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() workflowTypeName = mutableState.GetWorkflowType().GetName() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted @@ -1716,7 +1720,8 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled( ctx, namespaceID, workflowExecution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() workflowTypeName = mutableState.GetWorkflowType().GetName() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted @@ -1810,7 +1815,8 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( ctx, namespaceID, workflowExecution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { e.logger.Debug("Heartbeat failed") return nil, consts.ErrWorkflowCompleted @@ -1883,7 +1889,8 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( } return e.updateWorkflow(ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { // the request to cancel this workflow is a success even // if the target workflow has already finished @@ -1951,7 +1958,8 @@ func (e *historyEngineImpl) SignalWorkflowExecution( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) { return &updateWorkflowAction{ noop: true, @@ -2259,7 +2267,8 @@ func (e *historyEngineImpl) RemoveSignalMutableState( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -2299,7 +2308,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -2375,7 +2385,8 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -2387,7 +2398,22 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( // Check mutable state to make sure child execution is in pending child executions ci, isRunning := mutableState.GetChildExecutionInfo(initiatedID) if !isRunning && initiatedID >= mutableState.GetNextEventID() { - return nil, consts.ErrStaleState + // possible stale mutable state, try reload mutable state + // + // TODO: use initiate event ID and version to verify if the child exists or not + // + // NOTE: do not return ErrStaleState here, as in xdc there's no guarantee that parent + // will have the child information and its next eventID will larger than the initiatedID + // in the request after forced failover. + // If ErrStaleState is returned, the logic for this handler and processing of CloseWorkflowExecution + // task will keep retrying infinitely. + workflowContext.getContext().Clear() + mutableState, err = workflowContext.reloadMutableState(ctx) + if err != nil { + return nil, err + } + + ci, isRunning = mutableState.GetChildExecutionInfo(initiatedID) } if !isRunning || ci.StartedId == common.EmptyEventID { return nil, serviceerror.NewNotFound("Pending child execution not found.") @@ -2663,11 +2689,8 @@ func (e *historyEngineImpl) updateWorkflowWithNewHelper( UpdateHistoryLoop: for attempt := 1; attempt <= conditionalRetryCount; attempt++ { - weContext := workflowContext.getContext() - mutableState := workflowContext.getMutableState() - // conduct caller action - postActions, err := action(weContext, mutableState) + postActions, err := action(workflowContext) if err != nil { if err == consts.ErrStaleState { // Handler detected that cached workflow mutable could potentially be stale @@ -2689,6 +2712,7 @@ UpdateHistoryLoop: return nil } + mutableState := workflowContext.getMutableState() if postActions.createWorkflowTask { // Create a transfer task to schedule a workflow task if !mutableState.HasPendingWorkflowTask() { @@ -3064,7 +3088,8 @@ func (e *historyEngineImpl) applyWorkflowIDReusePolicyHelper( case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING: if wfIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING { - return func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + return func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -3196,7 +3221,9 @@ func (e *historyEngineImpl) ReapplyEvents( ctx, namespaceID, currentExecution, - func(context workflow.Context, mutableState workflow.MutableState) (action *updateWorkflowAction, retErr error) { + func(workflowContext workflowContext) (action *updateWorkflowAction, retErr error) { + context := workflowContext.getContext() + mutableState := workflowContext.getMutableState() // Filter out reapply event from the same cluster toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents)) lastWriteVersion, err := mutableState.GetLastWriteVersion() diff --git a/service/history/workflowExecutionUtil.go b/service/history/workflowExecutionUtil.go index 0b8d4f37283..bc2ac81178c 100644 --- a/service/history/workflowExecutionUtil.go +++ b/service/history/workflowExecutionUtil.go @@ -59,7 +59,7 @@ var ( } ) -type updateWorkflowActionFunc func(workflow.Context, workflow.MutableState) (*updateWorkflowAction, error) +type updateWorkflowActionFunc func(workflowContext) (*updateWorkflowAction, error) func (w *workflowContextImpl) getContext() workflow.Context { return w.context diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index bbaee0c40de..9cdf84970e1 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -128,7 +128,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -177,7 +178,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( ctx, namespaceID, execution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted } @@ -281,7 +283,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( ctx, namespaceID, workflowExecution, - func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { + func(workflowContext workflowContext) (*updateWorkflowAction, error) { + mutableState := workflowContext.getMutableState() if !mutableState.IsWorkflowExecutionRunning() { return nil, consts.ErrWorkflowCompleted }