Skip to content

Commit

Permalink
Check if WorkflowTaskFailed event is nil when workflow completes (#4500)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 16, 2023
1 parent ae2d232 commit 1644435
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 15 deletions.
5 changes: 2 additions & 3 deletions service/history/workflow/history_builder.go
Expand Up @@ -458,7 +458,7 @@ func (b *HistoryBuilder) AddFailWorkflowEvent(
retryState enumspb.RetryState,
command *commandpb.FailWorkflowExecutionCommandAttributes,
newExecutionRunID string,
) *historypb.HistoryEvent {
) (*historypb.HistoryEvent, int64) {
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, b.timeSource.Now())
event.Attributes = &historypb.HistoryEvent_WorkflowExecutionFailedEventAttributes{
WorkflowExecutionFailedEventAttributes: &historypb.WorkflowExecutionFailedEventAttributes{
Expand All @@ -469,8 +469,7 @@ func (b *HistoryBuilder) AddFailWorkflowEvent(
},
}

event, _ = b.appendEvents(event)
return event
return b.appendEvents(event)
}

func (b *HistoryBuilder) AddTimeoutWorkflowEvent(
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/history_builder_test.go
Expand Up @@ -41,6 +41,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -458,13 +459,14 @@ func (s *historyBuilderSuite) TestWorkflowExecutionFailed() {
attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: testFailure,
}
event := s.historyBuilder.AddFailWorkflowEvent(
event, batchID := s.historyBuilder.AddFailWorkflowEvent(
workflowTaskCompletionEventID,
retryState,
attributes,
"",
)
s.Equal(event, s.flush())
s.Equal(batchID, event.EventId)
s.Equal(&historypb.HistoryEvent{
EventId: s.nextEventID,
TaskId: s.nextTaskID,
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -2806,8 +2806,8 @@ func (ms *MutableStateImpl) AddFailWorkflowEvent(
return nil, err
}

event := ms.hBuilder.AddFailWorkflowEvent(workflowTaskCompletedEventID, retryState, command, newExecutionRunID)
if err := ms.ReplicateWorkflowExecutionFailedEvent(workflowTaskCompletedEventID, event); err != nil {
event, batchID := ms.hBuilder.AddFailWorkflowEvent(workflowTaskCompletedEventID, retryState, command, newExecutionRunID)
if err := ms.ReplicateWorkflowExecutionFailedEvent(batchID, event); err != nil {
return nil, err
}
// TODO merge active & passive task generation
Expand Down
17 changes: 13 additions & 4 deletions service/history/workflow/util.go
Expand Up @@ -49,6 +49,7 @@ func failWorkflowTask(
workflowTaskFailureCause enumspb.WorkflowTaskFailedCause,
) (*historypb.HistoryEvent, error) {

// IMPORTANT: wtFailedEvent can be nil under some circumstances. Specifically, if WT is transient.
wtFailedEvent, err := mutableState.AddWorkflowTaskFailedEvent(
workflowTask,
workflowTaskFailureCause,
Expand Down Expand Up @@ -100,7 +101,9 @@ func RetryWorkflow(
if err != nil {
return nil, err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
if wtFailedEvent != nil {
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}
}

_, newMutableState, err := mutableState.AddContinueAsNewEvent(
Expand Down Expand Up @@ -133,7 +136,9 @@ func TimeoutWorkflow(
if err != nil {
return err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
if wtFailedEvent != nil {
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}
}

_, err := mutableState.AddTimeoutWorkflowEvent(
Expand All @@ -156,7 +161,9 @@ func TerminateWorkflow(
// if there is started WT which needs to be failed before.
// Failing speculative WT creates 3 events: WTScheduled, WTStarted, and WTFailed.
// First 2 goes to separate batch and eventBatchFirstEventID has to point to WTFailed event.
// If there is no started WT, then eventBatchFirstEventID points to TerminateWorkflow event (which is next event).
// Failing transient WT doesn't create any events at all and wtFailedEvent is nil.
// WTFailed event wasn't created (because there were no WT or WT was transient),
// then eventBatchFirstEventID points to TerminateWorkflow event (which is next event).
eventBatchFirstEventID := mutableState.GetNextEventID()

if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil {
Expand All @@ -168,7 +175,9 @@ func TerminateWorkflow(
if err != nil {
return err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
if wtFailedEvent != nil {
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}
}

_, err := mutableState.AddWorkflowExecutionTerminatedEvent(
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/workflow_task_state_machine.go
Expand Up @@ -591,6 +591,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
forkEventVersion int64,
) (*historypb.HistoryEvent, error) {

// IMPORTANT: returned event can be nil under some circumstances. Specifically, if WT is transient.

if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
m.ms.RemoveSpeculativeWorkflowTaskTimeoutTask()

Expand Down
19 changes: 14 additions & 5 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -573,8 +573,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
// drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout.
return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message())
}
var nextEventBatchId int64
ms, nextEventBatchId, err = failWorkflowTask(ctx, weContext, currentWorkflowTask, wtFailedCause, request)
var wtFailedEventID int64
ms, wtFailedEventID, err = failWorkflowTask(ctx, weContext, currentWorkflowTask, wtFailedCause, request)
if err != nil {
return nil, err
}
Expand All @@ -588,7 +588,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: wtFailedCause.workflowFailure,
}
if _, err := ms.AddFailWorkflowEvent(nextEventBatchId, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes, ""); err != nil {
if _, err := ms.AddFailWorkflowEvent(wtFailedEventID, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes, ""); err != nil {
return nil, err
}
wtFailedShouldCreateNewTask = false
Expand Down Expand Up @@ -1003,8 +1003,17 @@ func failWorkflowTask(
return nil, common.EmptyEventID, err
}

// Return new mutable state back to the caller for further updates
return mutableState, wtFailedEvent.GetEventId(), nil
var wtFailedEventID int64
if wtFailedEvent != nil {
// If WTFailed event was added to the history then use its Id as wtFailedEventID.
wtFailedEventID = wtFailedEvent.GetEventId()
} else {
// Otherwise, if it was transient WT, last event should be WTFailed event from the 1st attempt.
wtFailedEventID = mutableState.GetNextEventID() - 1
}

// Return reloaded mutable state back to the caller for further updates.
return mutableState, wtFailedEventID, nil
}

// Filter function to be passed to mutable_state.HasAnyBufferedEvent
Expand Down

0 comments on commit 1644435

Please sign in to comment.