Skip to content

Commit

Permalink
Convert speculative workflow task to normal when it is persisted (#4069)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 20, 2023
1 parent 501fdae commit 0ca9e83
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 39 deletions.
8 changes: 8 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -3940,6 +3940,10 @@ func (ms *MutableStateImpl) CloseTransactionAsMutation(
return nil, nil, err
}

if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil {
return nil, nil, err
}

workflowEventsSeq, bufferEvents, clearBuffer, err := ms.prepareEventsAndReplicationTasks(transactionPolicy)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -4017,6 +4021,10 @@ func (ms *MutableStateImpl) CloseTransactionAsSnapshot(
return nil, nil, err
}

if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil {
return nil, nil, err
}

workflowEventsSeq, bufferEvents, _, err := ms.prepareEventsAndReplicationTasks(transactionPolicy)
if err != nil {
return nil, nil, err
Expand Down
63 changes: 63 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Expand Up @@ -705,6 +705,7 @@ func (s *mutableStateSuite) buildWorkflowMutableState() *persistencespb.Workflow
WorkflowTaskStartedEventId: 102,
WorkflowTaskTimeout: timestamp.DurationFromSeconds(100),
WorkflowTaskAttempt: 1,
WorkflowTaskType: enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
VersionHistories: &historyspb.VersionHistories{
Histories: []*historyspb.VersionHistory{
{
Expand Down Expand Up @@ -919,3 +920,65 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
s.Equal(int64(1), mutation.ExecutionInfo.SignalExternalCount)
s.Equal(int64(1), mutation.ExecutionInfo.SignalCount)
}

func (s *mutableStateSuite) TestSpeculativeWorkflowTaskNotPersisted() {
testCases := []struct {
name string
enableBufferedEvents bool
closeTxFunc func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error)
}{
{
name: "CloseTransactionAsSnapshot",
closeTxFunc: func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error) {
snapshot, _, err := ms.CloseTransactionAsSnapshot(TransactionPolicyPassive)
if err != nil {
return nil, err
}
return snapshot.ExecutionInfo, err
},
},
{
name: "CloseTransactionAsMutation",
enableBufferedEvents: true,
closeTxFunc: func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error) {
mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyPassive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
},
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
dbState := s.buildWorkflowMutableState()
if !tc.enableBufferedEvents {
dbState.BufferedEvents = nil
}

var err error
s.mutableState, err = newMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, tests.LocalNamespaceEntry, dbState, 123)
s.NoError(err)

s.mutableState.executionInfo.WorkflowTaskScheduledEventId = s.mutableState.GetNextEventID()
s.mutableState.executionInfo.WorkflowTaskStartedEventId = s.mutableState.GetNextEventID() + 1

// Normal WT is persisted as is.
execInfo, err := tc.closeTxFunc(s.mutableState)
s.Nil(err)
s.Equal(enumsspb.WORKFLOW_TASK_TYPE_NORMAL, execInfo.WorkflowTaskType)
s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskScheduledEventId)
s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskStartedEventId)

s.mutableState.executionInfo.WorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE

// Speculative WT is converted to normal.
execInfo, err = tc.closeTxFunc(s.mutableState)
s.Nil(err)
s.Equal(enumsspb.WORKFLOW_TASK_TYPE_NORMAL, execInfo.WorkflowTaskType)
s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskScheduledEventId)
s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskStartedEventId)
})
}
}
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_rebuilder.go
Expand Up @@ -235,6 +235,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
// NOTE: at the beginning of the loop, stickyness is cleared
if err := taskGenerator.GenerateScheduleWorkflowTaskTasks(
workflowTask.ScheduledEventID,
false,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,6 +288,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
// NOTE: at the beginning of the loop, stickyness is cleared
if err := taskGenerator.GenerateScheduleWorkflowTaskTasks(
workflowTask.ScheduledEventID,
false,
); err != nil {
return nil, err
}
Expand All @@ -309,6 +311,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
// NOTE: at the beginning of the loop, stickyness is cleared
if err := taskGenerator.GenerateScheduleWorkflowTaskTasks(
workflowTask.ScheduledEventID,
false,
); err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions service/history/workflow/mutable_state_rebuilder_test.go
Expand Up @@ -519,6 +519,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
).Return(nil)
s.mockTaskGeneratorForNew.EXPECT().GenerateScheduleWorkflowTaskTasks(
newRunWorkflowTaskEvent.GetEventId(),
false,
).Return(nil)
s.mockTaskGeneratorForNew.EXPECT().GenerateActivityTimerTasks().Return(nil)
s.mockTaskGeneratorForNew.EXPECT().GenerateUserTimerTasks().Return(nil)
Expand Down Expand Up @@ -764,6 +765,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskScheduled() {
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks(
wt.ScheduledEventID,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyness()

Expand Down Expand Up @@ -858,6 +860,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskTimedOut() {
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks(
newScheduledEventID,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyness()

Expand Down Expand Up @@ -902,6 +905,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskFailed() {
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks(
newScheduledEventID,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyness()

Expand Down
22 changes: 14 additions & 8 deletions service/history/workflow/task_generator.go
Expand Up @@ -71,6 +71,7 @@ type (
) error
GenerateScheduleWorkflowTaskTasks(
workflowTaskScheduledEventID int64,
generateTimeoutTaskOnly bool,
) error
GenerateStartWorkflowTaskTasks(
workflowTaskScheduledEventID int64,
Expand Down Expand Up @@ -332,6 +333,7 @@ func (r *TaskGeneratorImpl) GenerateRecordWorkflowStartedTasks(

func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks(
workflowTaskScheduledEventID int64,
generateTimeoutTaskOnly bool, // Only generate SCHEDULE_TO_START timeout timer task, but not a transfer task which push WT to matching.
) error {

workflowTask := r.mutableState.GetWorkflowTaskByID(
Expand All @@ -341,14 +343,6 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks(
return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduledEventID))
}

r.mutableState.AddTasks(&tasks.WorkflowTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
TaskQueue: workflowTask.TaskQueue.GetName(),
ScheduledEventID: workflowTask.ScheduledEventID,
Version: workflowTask.Version,
})

if r.mutableState.IsStickyTaskQueueEnabled() {
scheduledTime := timestamp.TimeValue(workflowTask.ScheduledTime)
scheduleToStartTimeout := timestamp.DurationValue(r.mutableState.GetExecutionInfo().StickyScheduleToStartTimeout)
Expand All @@ -364,6 +358,18 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks(
})
}

if generateTimeoutTaskOnly {
return nil
}

r.mutableState.AddTasks(&tasks.WorkflowTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
TaskQueue: workflowTask.TaskQueue.GetName(),
ScheduledEventID: workflowTask.ScheduledEventID,
Version: workflowTask.Version,
})

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/workflow/task_refresher.go
Expand Up @@ -268,6 +268,7 @@ func (r *TaskRefresherImpl) refreshWorkflowTaskTasks(
// workflowTask only scheduled
return taskGenerator.GenerateScheduleWorkflowTaskTasks(
workflowTask.ScheduledEventID,
false,
)
}

Expand Down
65 changes: 63 additions & 2 deletions service/history/workflow/workflow_task_state_machine.go
Expand Up @@ -42,7 +42,6 @@ import (

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -345,6 +344,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
if !bypassTaskGeneration && workflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
if err := m.ms.taskGenerator.GenerateScheduleWorkflowTaskTasks(
scheduledEventID,
false,
); err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEvent(
return m.AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration, timestamp.TimePtr(m.ms.timeSource.Now()), workflowTaskType)
}

// AddFirstWorkflowTaskScheduled adds the first workflow task scehduled event unless it should be delayed as indicated
// AddFirstWorkflowTaskScheduled adds the first workflow task scheduled event unless it should be delayed as indicated
// by the startEvent's FirstWorkflowTaskBackoff.
// If bypassTaskGeneration is specified, a transfer task will not be created.
// Returns the workflow task's scheduled event ID if a task was scheduled, 0 otherwise.
Expand Down Expand Up @@ -936,3 +936,64 @@ func (m *workflowTaskStateMachine) getHistorySizeInfo() (bool, int64) {
suggestContinueAsNew := historySize >= sizeLimit || historyCount >= countLimit
return suggestContinueAsNew, historySize
}

func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() error {
if m.ms.executionInfo.WorkflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
return nil
}

// Workflow task can't be persisted as Speculative, because when it is completed,
// it gets deleted from memory only but not from the database.
// If execution info in mutable state has speculative workflow task, then
// convert it to normal workflow task before persisting.
m.ms.executionInfo.WorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL

wt := m.getWorkflowTaskInfo()

// TODO (alex-update): cancel in-memory timer for this speculative WT.

scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
wt.TaskQueue,
wt.WorkflowTaskTimeout,
wt.Attempt,
timestamp.TimeValue(wt.ScheduledTime),
)

if scheduledEvent.EventId != wt.ScheduledEventID {
return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, scheduled event Id: %d for normal workflow task doesn't match the one from speculative workflow task: %d", scheduledEvent.EventId, wt.ScheduledEventID))
}

if wt.StartedEventID != common.EmptyEventID {
// If WT is has started then started event is written to the history and
// timeout timer task (for START_TO_CLOSE timeout) is created.

_ = m.ms.hBuilder.AddWorkflowTaskStartedEvent(
scheduledEvent.EventId,
wt.RequestID,
"", // identity is not stored in the mutable state.
timestamp.TimeValue(wt.StartedTime),
wt.SuggestContinueAsNew,
wt.HistorySizeBytes,
)
m.ms.hBuilder.FlushAndCreateNewBatch()

if err := m.ms.taskGenerator.GenerateStartWorkflowTaskTasks(
scheduledEvent.EventId,
); err != nil {
return err
}
} else {
// If WT was only scheduled but not started yet, then SCHEDULE_TO_START timeout timer task is created only if using sticky task queue.
// Normal task queue doesn't have a timeout.
if m.ms.IsStickyTaskQueueEnabled() {
if err := m.ms.taskGenerator.GenerateScheduleWorkflowTaskTasks(
scheduledEvent.EventId,
true, // Only generate SCHEDULE_TO_START timeout timer task, but not a transfer task which push WT to matching because WT was already pushed to matching.
); err != nil {
return err
}
}
}

return nil
}

0 comments on commit 0ca9e83

Please sign in to comment.