diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 9e4cf00b4d7..fc4b40af0d2 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -217,10 +217,7 @@ type ( WorkflowID string RunID string - TransferTasks []tasks.Task - TimerTasks []tasks.Task - ReplicationTasks []tasks.Task - VisibilityTasks []tasks.Task + Tasks map[tasks.Category][]tasks.Task } // CreateWorkflowExecutionRequest is used to write a new workflow execution @@ -363,10 +360,7 @@ type ( NewBufferedEvents []*historypb.HistoryEvent ClearBufferedEvents bool - TransferTasks []tasks.Task - ReplicationTasks []tasks.Task - TimerTasks []tasks.Task - VisibilityTasks []tasks.Task + Tasks map[tasks.Category][]tasks.Task // TODO deprecate Condition in favor of DBRecordVersion Condition int64 @@ -388,10 +382,7 @@ type ( SignalInfos map[int64]*persistencespb.SignalInfo SignalRequestedIDs map[string]struct{} - TransferTasks []tasks.Task - ReplicationTasks []tasks.Task - TimerTasks []tasks.Task - VisibilityTasks []tasks.Task + Tasks map[tasks.Category][]tasks.Task // TODO deprecate Condition in favor of DBRecordVersion Condition int64 diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index ee3267efff5..d4d4e87aa47 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" + "go.temporal.io/server/service/history/tasks" ) type ( @@ -426,19 +427,19 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( var err error - transferTasks, err := m.serializer.SerializeTransferTasks(input.TransferTasks) + transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer]) if err != nil { return nil, err } - timerTasks, err := m.serializer.SerializeTimerTasks(input.TimerTasks) + timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer]) if err != nil { return nil, err } - replicationTasks, err := m.serializer.SerializeReplicationTasks(input.ReplicationTasks) + replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication]) if err != nil { return nil, err } - visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.VisibilityTasks) + visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility]) if err != nil { return nil, err } @@ -545,19 +546,19 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot( var err error - transferTasks, err := m.serializer.SerializeTransferTasks(input.TransferTasks) + transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer]) if err != nil { return nil, err } - timerTasks, err := m.serializer.SerializeTimerTasks(input.TimerTasks) + timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer]) if err != nil { return nil, err } - replicationTasks, err := m.serializer.SerializeReplicationTasks(input.ReplicationTasks) + replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication]) if err != nil { return nil, err } - visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.VisibilityTasks) + visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility]) if err != nil { return nil, err } @@ -700,19 +701,19 @@ func (m *executionManagerImpl) AddTasks( input *AddTasksRequest, ) error { - transferTasks, err := m.serializer.SerializeTransferTasks(input.TransferTasks) + transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer]) if err != nil { return err } - timerTasks, err := m.serializer.SerializeTimerTasks(input.TimerTasks) + timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer]) if err != nil { return err } - replicationTasks, err := m.serializer.SerializeReplicationTasks(input.ReplicationTasks) + replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication]) if err != nil { return err } - visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.VisibilityTasks) + visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility]) if err != nil { return err } diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index c79de00d40a..d01ded527dc 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -1055,15 +1055,16 @@ func (s *ExecutionManagerSuite) TestPersistenceStartWorkflow() { Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, NextEventID: int64(3), - TransferTasks: []tasks.Task{ - &tasks.WorkflowTask{ - WorkflowKey: workflowKey, - TaskID: s.GetNextSequenceNumber(), - TaskQueue: "queue1", - ScheduleID: int64(2), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: { + &tasks.WorkflowTask{ + WorkflowKey: workflowKey, + TaskID: s.GetNextSequenceNumber(), + TaskQueue: "queue1", + ScheduleID: int64(2), + }, }, }, - TimerTasks: nil, }, RangeID: s.ShardInfo.GetRangeId() - 1, }) diff --git a/common/persistence/persistence-tests/executionManagerTestForEventsV2.go b/common/persistence/persistence-tests/executionManagerTestForEventsV2.go index 35aa80679f0..395e315feaf 100644 --- a/common/persistence/persistence-tests/executionManagerTestForEventsV2.go +++ b/common/persistence/persistence-tests/executionManagerTestForEventsV2.go @@ -138,17 +138,18 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() { State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - TransferTasks: []tasks.Task{ - &tasks.WorkflowTask{ - WorkflowKey: workflowKey, - TaskID: s.GetNextSequenceNumber(), - TaskQueue: "taskQueue", - ScheduleID: 2, - VisibilityTimestamp: time.Now().UTC(), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: { + &tasks.WorkflowTask{ + WorkflowKey: workflowKey, + TaskID: s.GetNextSequenceNumber(), + TaskQueue: "taskQueue", + ScheduleID: 2, + VisibilityTimestamp: time.Now().UTC(), + }, }, }, - TimerTasks: nil, - Checksum: csum, + Checksum: csum, }, RangeID: s.ShardInfo.GetRangeId(), }) @@ -239,21 +240,22 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreationWithVersionHistor Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, NextEventID: common.EmptyEventID, - TransferTasks: []tasks.Task{ - &tasks.WorkflowTask{ - WorkflowKey: definition.NewWorkflowKey( - namespaceID, - workflowExecution.WorkflowId, - workflowExecution.RunId, - ), - TaskID: s.GetNextSequenceNumber(), - TaskQueue: "taskQueue", - ScheduleID: 2, - VisibilityTimestamp: time.Now().UTC(), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: { + &tasks.WorkflowTask{ + WorkflowKey: definition.NewWorkflowKey( + namespaceID, + workflowExecution.WorkflowId, + workflowExecution.RunId, + ), + TaskID: s.GetNextSequenceNumber(), + TaskQueue: "taskQueue", + ScheduleID: 2, + VisibilityTimestamp: time.Now().UTC(), + }, }, }, - TimerTasks: nil, - Checksum: csum, + Checksum: csum, }, }) @@ -340,11 +342,12 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() { _, err2 := s.ExecutionManager.UpdateWorkflowExecution(&p.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: p.WorkflowMutation{ - ExecutionInfo: updatedInfo, - ExecutionState: updatedState, - NextEventID: int64(5), - TransferTasks: []tasks.Task{newworkflowTask}, - TimerTasks: nil, + ExecutionInfo: updatedInfo, + ExecutionState: updatedState, + NextEventID: int64(5), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {newworkflowTask}, + }, Condition: state0.NextEventId, UpsertActivityInfos: nil, DeleteActivityInfos: nil, @@ -371,9 +374,8 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() { State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - NextEventID: state0.NextEventId, - TransferTasks: nil, - TimerTasks: nil, + NextEventID: state0.NextEventId, + Tasks: map[tasks.Category][]tasks.Task{}, }, RangeID: s.ShardInfo.GetRangeId(), }) diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index ee3726c2e93..fd01a2341b9 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -319,21 +319,23 @@ func (s *TestBase) CreateWorkflowExecutionWithBranchToken(namespaceID string, wo Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, NextEventID: nextEventID, - TransferTasks: []tasks.Task{ - &tasks.WorkflowTask{ - WorkflowKey: definition.NewWorkflowKey( - namespaceID, - workflowExecution.WorkflowId, - workflowExecution.RunId, - ), - TaskID: s.GetNextSequenceNumber(), - TaskQueue: taskQueue, - ScheduleID: workflowTaskScheduleID, - VisibilityTimestamp: time.Now().UTC(), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: { + &tasks.WorkflowTask{ + WorkflowKey: definition.NewWorkflowKey( + namespaceID, + workflowExecution.WorkflowId, + workflowExecution.RunId, + ), + TaskID: s.GetNextSequenceNumber(), + TaskQueue: taskQueue, + ScheduleID: workflowTaskScheduleID, + VisibilityTimestamp: time.Now().UTC(), + }, }, + tasks.CategoryTimer: timerTasks, }, - TimerTasks: timerTasks, - Checksum: testWorkflowChecksum, + Checksum: testWorkflowChecksum, }, RangeID: s.ShardInfo.GetRangeId(), }) @@ -401,8 +403,10 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(namespaceID string, workflow State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - TransferTasks: transferTasks, - Checksum: testWorkflowChecksum, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, + Checksum: testWorkflowChecksum, }, RangeID: s.ShardInfo.GetRangeId(), }) @@ -444,19 +448,21 @@ func (s *TestBase) CreateChildWorkflowExecution(namespaceID string, workflowExec Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, NextEventID: nextEventID, - TransferTasks: []tasks.Task{ - &tasks.WorkflowTask{ - WorkflowKey: definition.NewWorkflowKey( - namespaceID, - workflowExecution.WorkflowId, - workflowExecution.RunId, - ), - TaskID: s.GetNextSequenceNumber(), - TaskQueue: taskQueue, - ScheduleID: workflowTaskScheduleID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: { + &tasks.WorkflowTask{ + WorkflowKey: definition.NewWorkflowKey( + namespaceID, + workflowExecution.WorkflowId, + workflowExecution.RunId, + ), + TaskID: s.GetNextSequenceNumber(), + TaskQueue: taskQueue, + ScheduleID: workflowTaskScheduleID, + }, }, + tasks.CategoryTimer: timerTasks, }, - TimerTasks: timerTasks, }, RangeID: s.ShardInfo.GetRangeId(), }) @@ -512,11 +518,12 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *persistencespb.WorkflowEx req := &persistence.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - ExecutionState: updatedState, - NextEventID: updatedNextEventID, - TransferTasks: []tasks.Task{newworkflowTask}, - TimerTasks: nil, + ExecutionInfo: updatedInfo, + ExecutionState: updatedState, + NextEventID: updatedNextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {newworkflowTask}, + }, Condition: condition, UpsertActivityInfos: nil, DeleteActivityInfos: nil, @@ -547,8 +554,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *persistencespb.WorkflowEx State: updatedState.State, Status: updatedState.Status, }, - TransferTasks: nil, - TimerTasks: nil, + Tasks: map[tasks.Category][]tasks.Task{}, }, RangeID: s.ShardInfo.GetRangeId(), } @@ -586,11 +592,12 @@ func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *persistencespb. ShardID: s.ShardInfo.GetShardId(), RangeID: s.ShardInfo.GetRangeId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - ExecutionState: updatedState, - NextEventID: nextEventID, - TransferTasks: transferTasks, - TimerTasks: nil, + ExecutionInfo: updatedInfo, + ExecutionState: updatedState, + NextEventID: nextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, Condition: condition, UpsertActivityInfos: nil, DeleteActivityInfos: nil, @@ -767,9 +774,11 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *persisten UpsertSignalRequestedIDs: convert.StringSliceToSet(upsertSignalRequestedIDs), DeleteSignalRequestedIDs: convert.StringSliceToSet(deleteSignalRequestedIDs), - TransferTasks: transferTasks, - ReplicationTasks: replicationTasks, - TimerTasks: timerTasks, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + tasks.CategoryTimer: timerTasks, + tasks.CategoryReplication: replicationTasks, + }, Condition: condition, Checksum: testWorkflowChecksum, @@ -784,10 +793,12 @@ func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks( _, err := s.ExecutionManager.UpdateWorkflowExecution(&persistence.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - ExecutionState: updatedState, - NextEventID: updatedNextEventID, - TransferTasks: transferTasks, + ExecutionInfo: updatedInfo, + ExecutionState: updatedState, + NextEventID: updatedNextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, Condition: condition, UpsertActivityInfos: convertActivityInfos(upsertActivityInfo), }, @@ -802,9 +813,11 @@ func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated( _, err := s.ExecutionManager.UpdateWorkflowExecution(&persistence.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - NextEventID: updatedNextEventID, - TransferTasks: transferTasks, + ExecutionInfo: updatedInfo, + NextEventID: updatedNextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, Condition: condition, UpsertChildExecutionInfos: convertChildExecutionInfos(childInfos), }, @@ -820,9 +833,11 @@ func (s *TestBase) UpdateWorkflowExecutionForRequestCancel( _, err := s.ExecutionManager.UpdateWorkflowExecution(&persistence.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - NextEventID: updatedNextEventID, - TransferTasks: transferTasks, + ExecutionInfo: updatedInfo, + NextEventID: updatedNextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, Condition: condition, UpsertRequestCancelInfos: convertRequestCancelInfos(upsertRequestCancelInfo), }, @@ -838,9 +853,11 @@ func (s *TestBase) UpdateWorkflowExecutionForSignal( _, err := s.ExecutionManager.UpdateWorkflowExecution(&persistence.UpdateWorkflowExecutionRequest{ ShardID: s.ShardInfo.GetShardId(), UpdateWorkflowMutation: persistence.WorkflowMutation{ - ExecutionInfo: updatedInfo, - NextEventID: updatedNextEventID, - TransferTasks: transferTasks, + ExecutionInfo: updatedInfo, + NextEventID: updatedNextEventID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + }, Condition: condition, UpsertSignalInfos: convertSignalInfos(upsertSignalInfos), }, diff --git a/common/persistence/tests/util.go b/common/persistence/tests/util.go index 455d4f77482..b901b2af475 100644 --- a/common/persistence/tests/util.go +++ b/common/persistence/tests/util.go @@ -66,10 +66,12 @@ func RandomSnapshot( SignalInfos: RandomInt64SignalInfoMap(), SignalRequestedIDs: map[string]struct{}{uuid.New().String(): {}}, - TransferTasks: []tasks.Task{}, - ReplicationTasks: []tasks.Task{}, - TimerTasks: []tasks.Task{}, - VisibilityTasks: []tasks.Task{}, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {}, + tasks.CategoryTimer: {}, + tasks.CategoryReplication: {}, + tasks.CategoryVisibility: {}, + }, Condition: rand.Int63(), DBRecordVersion: dbRecordVersion, @@ -106,10 +108,12 @@ func RandomMutation( //NewBufferedEvents: see below //ClearBufferedEvents: see below - TransferTasks: []tasks.Task{}, - ReplicationTasks: []tasks.Task{}, - TimerTasks: []tasks.Task{}, - VisibilityTasks: []tasks.Task{}, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {}, + tasks.CategoryTimer: {}, + tasks.CategoryReplication: {}, + tasks.CategoryVisibility: {}, + }, Condition: rand.Int63(), DBRecordVersion: dbRecordVersion, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 39dedcd2c07..555b03c3f27 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -3323,10 +3323,12 @@ func (e *historyEngineImpl) GenerateLastHistoryReplicationTasks( err = e.shard.AddTasks(&persistence.AddTasksRequest{ ShardID: e.shard.GetShardID(), // RangeID is set by shard - NamespaceID: string(namespaceID), - WorkflowID: request.Execution.WorkflowId, - RunID: request.Execution.RunId, - ReplicationTasks: []tasks.Task{task}, + NamespaceID: string(namespaceID), + WorkflowID: request.Execution.WorkflowId, + RunID: request.Execution.RunId, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryReplication: {task}, + }, }) if err != nil { return nil, err diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index b6ac2a2bf7d..17bf281e094 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -539,10 +539,7 @@ func (s *ContextImpl) CreateWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.NewWorkflowSnapshot.TransferTasks, - request.NewWorkflowSnapshot.ReplicationTasks, - request.NewWorkflowSnapshot.TimerTasks, - request.NewWorkflowSnapshot.VisibilityTasks, + request.NewWorkflowSnapshot.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -580,10 +577,7 @@ func (s *ContextImpl) UpdateWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.UpdateWorkflowMutation.TransferTasks, - request.UpdateWorkflowMutation.ReplicationTasks, - request.UpdateWorkflowMutation.TimerTasks, - request.UpdateWorkflowMutation.VisibilityTasks, + request.UpdateWorkflowMutation.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -592,10 +586,7 @@ func (s *ContextImpl) UpdateWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.NewWorkflowSnapshot.TransferTasks, - request.NewWorkflowSnapshot.ReplicationTasks, - request.NewWorkflowSnapshot.TimerTasks, - request.NewWorkflowSnapshot.VisibilityTasks, + request.NewWorkflowSnapshot.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -635,10 +626,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.CurrentWorkflowMutation.TransferTasks, - request.CurrentWorkflowMutation.ReplicationTasks, - request.CurrentWorkflowMutation.TimerTasks, - request.CurrentWorkflowMutation.VisibilityTasks, + request.CurrentWorkflowMutation.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -647,10 +635,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.ResetWorkflowSnapshot.TransferTasks, - request.ResetWorkflowSnapshot.ReplicationTasks, - request.ResetWorkflowSnapshot.TimerTasks, - request.ResetWorkflowSnapshot.VisibilityTasks, + request.ResetWorkflowSnapshot.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -659,10 +644,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( if err := s.allocateTaskIDsLocked( namespaceEntry, workflowID, - request.NewWorkflowSnapshot.TransferTasks, - request.NewWorkflowSnapshot.ReplicationTasks, - request.NewWorkflowSnapshot.TimerTasks, - request.NewWorkflowSnapshot.VisibilityTasks, + request.NewWorkflowSnapshot.Tasks, &transferMaxReadLevel, ); err != nil { return nil, err @@ -707,10 +689,7 @@ func (s *ContextImpl) addTasksLocked( if err := s.allocateTaskIDsLocked( namespaceEntry, request.WorkflowID, - request.TransferTasks, - request.ReplicationTasks, - request.TimerTasks, - request.VisibilityTasks, + request.Tasks, &transferMaxReadLevel, ); err != nil { return err @@ -721,10 +700,10 @@ func (s *ContextImpl) addTasksLocked( if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil { return err } - s.engine.NotifyNewTransferTasks(namespaceEntry.ActiveClusterName(), request.TransferTasks) - s.engine.NotifyNewTimerTasks(namespaceEntry.ActiveClusterName(), request.TimerTasks) - s.engine.NotifyNewVisibilityTasks(request.VisibilityTasks) - s.engine.NotifyNewReplicationTasks(request.ReplicationTasks) + s.engine.NotifyNewTransferTasks(namespaceEntry.ActiveClusterName(), request.Tasks[tasks.CategoryTransfer]) + s.engine.NotifyNewTimerTasks(namespaceEntry.ActiveClusterName(), request.Tasks[tasks.CategoryTimer]) + s.engine.NotifyNewVisibilityTasks(request.Tasks[tasks.CategoryVisibility]) + s.engine.NotifyNewReplicationTasks(request.Tasks[tasks.CategoryReplication]) return nil } @@ -806,16 +785,17 @@ func (s *ContextImpl) DeleteWorkflowExecution( WorkflowID: key.WorkflowID, RunID: key.RunID, - TransferTasks: nil, - TimerTasks: nil, - ReplicationTasks: nil, - VisibilityTasks: []tasks.Task{&tasks.DeleteExecutionVisibilityTask{ - // TaskID is set by addTasksLocked - WorkflowKey: key, - VisibilityTimestamp: s.timeSource.Now(), - Version: newTaskVersion, - CloseTime: closeTime, - }}, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryVisibility: { + &tasks.DeleteExecutionVisibilityTask{ + // TaskID is set by addTasksLocked + WorkflowKey: key, + VisibilityTimestamp: s.timeSource.Now(), + Version: newTaskVersion, + CloseTime: closeTime, + }, + }, + }, } err = s.addTasksLocked(addTasksRequest, namespaceEntry) if err != nil { @@ -1074,94 +1054,48 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() { func (s *ContextImpl) allocateTaskIDsLocked( namespaceEntry *namespace.Namespace, workflowID string, - transferTasks []tasks.Task, - replicationTasks []tasks.Task, - timerTasks []tasks.Task, - visibilityTasks []tasks.Task, - transferMaxReadLevel *int64, -) error { - - if err := s.allocateTransferIDsLocked( - transferTasks, - transferMaxReadLevel); err != nil { - return err - } - if err := s.allocateTransferIDsLocked( - replicationTasks, - transferMaxReadLevel); err != nil { - return err - } - if err := s.allocateTransferIDsLocked( - visibilityTasks, - transferMaxReadLevel); err != nil { - return err - } - return s.allocateTimerIDsLocked( - namespaceEntry, - workflowID, - timerTasks, - transferMaxReadLevel) -} - -func (s *ContextImpl) allocateTransferIDsLocked( - tasks []tasks.Task, - transferMaxReadLevel *int64, -) error { - - for _, task := range tasks { - id, err := s.generateTransferTaskIDLocked() - if err != nil { - return err - } - s.contextTaggedLogger.Debug("Assigning task ID", tag.TaskID(id)) - task.SetTaskID(id) - *transferMaxReadLevel = id - } - return nil -} - -// NOTE: allocateTimerIDsLocked should always been called after assigning taskID for transferTasks when assigning taskID together, -// because Temporal Indexer assume timer taskID of deleteWorkflowExecution is larger than transfer taskID of closeWorkflowExecution -// for a given workflow. -func (s *ContextImpl) allocateTimerIDsLocked( - namespaceEntry *namespace.Namespace, - workflowID string, - timerTasks []tasks.Task, + newTasks map[tasks.Category][]tasks.Task, transferMaxReadLevel *int64, ) error { - - // assign IDs for the timer tasks. They need to be assigned under shard lock. currentCluster := s.GetClusterMetadata().GetCurrentClusterName() - for _, task := range timerTasks { - ts := task.GetVisibilityTime() - if task.GetVersion() != common.EmptyVersion { - // cannot use version to determine the corresponding cluster for timer task - // this is because during failover, timer task should be created as active - // or otherwise, failover + active processing logic may not pick up the task. - currentCluster = namespaceEntry.ActiveClusterName() - } - readCursorTS := s.timerMaxReadLevelMap[currentCluster] - if ts.Before(readCursorTS) { - // This can happen if shard move and new host have a time SKU, or there is db write delay. - // We generate a new timer ID using timerMaxReadLevel. - s.contextTaggedLogger.Debug("New timer generated is less than read level", - tag.WorkflowNamespaceID(namespaceEntry.ID().String()), - tag.WorkflowID(workflowID), - tag.Timestamp(ts), - tag.CursorTimestamp(readCursorTS), - tag.ValueShardAllocateTimerBeforeRead) - task.SetVisibilityTime(s.timerMaxReadLevelMap[currentCluster].Add(time.Millisecond)) - } + for _, tasks := range newTasks { + for _, task := range tasks { + // set taskID + id, err := s.generateTransferTaskIDLocked() + if err != nil { + return err + } + s.contextTaggedLogger.Debug("Assigning task ID", tag.TaskID(id)) + task.SetTaskID(id) + *transferMaxReadLevel = id + + // if scheduled task, check if fire time is in the past + if !task.GetKey().FireTime.IsZero() { + ts := task.GetVisibilityTime() + if task.GetVersion() != common.EmptyVersion { + // cannot use version to determine the corresponding cluster for timer task + // this is because during failover, timer task should be created as active + // or otherwise, failover + active processing logic may not pick up the task. + currentCluster = namespaceEntry.ActiveClusterName() + } + readCursorTS := s.timerMaxReadLevelMap[currentCluster] + if ts.Before(readCursorTS) { + // This can happen if shard move and new host have a time SKU, or there is db write delay. + // We generate a new timer ID using timerMaxReadLevel. + s.contextTaggedLogger.Debug("New timer generated is less than read level", + tag.WorkflowNamespaceID(namespaceEntry.ID().String()), + tag.WorkflowID(workflowID), + tag.Timestamp(ts), + tag.CursorTimestamp(readCursorTS), + tag.ValueShardAllocateTimerBeforeRead) + task.SetVisibilityTime(s.timerMaxReadLevelMap[currentCluster].Add(time.Millisecond)) + } - seqNum, err := s.generateTransferTaskIDLocked() - if err != nil { - return err + visibilityTs := task.GetVisibilityTime() + s.contextTaggedLogger.Debug("Assigning new timer", + tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.AckLevel(s.shardInfo.TimerAckLevelTime)) + } } - task.SetTaskID(seqNum) - *transferMaxReadLevel = seqNum - visibilityTs := task.GetVisibilityTime() - s.contextTaggedLogger.Debug("Assigning new timer", - tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.AckLevel(s.shardInfo.TimerAckLevelTime)) } return nil } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index b1635fc8a29..b6f85d24c12 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -127,10 +127,12 @@ func (s *contextSuite) TestAddTasks_Success() { WorkflowID: task.GetWorkflowId(), RunID: task.GetRunId(), - TransferTasks: transferTasks, - TimerTasks: timerTasks, - ReplicationTasks: replicationTasks, - VisibilityTasks: visibilityTasks, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: transferTasks, + tasks.CategoryTimer: timerTasks, + tasks.CategoryReplication: replicationTasks, + tasks.CategoryVisibility: visibilityTasks, + }, } s.mockNamespaceCache.EXPECT().GetNamespaceByID(s.namespaceID).Return(s.namespaceEntry, nil) diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index a7497268fb8..9f63f4a2eea 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -229,11 +229,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() { event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, timerTimeout) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextUserTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -300,11 +300,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() { event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, timerTimeout) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextUserTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -378,11 +378,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo ) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -460,11 +460,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -550,11 +550,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli s.Nil(startedEvent) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -641,11 +641,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli ) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -730,11 +730,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli s.Nil(startedEvent) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -820,11 +820,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat s.Nil(startedEvent) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, task.(*tasks.ActivityTimeoutTask).TimeoutType) timerTask := &tasks.ActivityTimeoutTask{ diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index c2dc7ec31e1..dd810329bf7 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -243,11 +243,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending nextEventID := event.GetEventId() timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextUserTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -332,11 +332,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Success event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, timerTimeout) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextUserTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -401,11 +401,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Multipl _, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID2, timerTimeout2) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextUserTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -470,11 +470,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending( nextEventID := scheduledEvent.GetEventId() timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -562,11 +562,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Success( startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -636,11 +636,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Heartbea mutableState.FlushBufferedEvents() timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) - task := mutableState.InsertTimerTasks[0] + task := mutableState.InsertTasks[tasks.CategoryTimer][0] s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, task.(*tasks.ActivityTimeoutTask).TimeoutType) timerTask := &tasks.ActivityTimeoutTask{ @@ -715,7 +715,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple activityInfo2.LastHeartbeatUpdateTime = timestamp.TimePtr(time.Now().UTC()) timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) - mutableState.InsertTimerTasks = nil + mutableState.InsertTasks[tasks.CategoryTimer] = nil modified, err := timerSequence.CreateNextActivityTimer() s.NoError(err) s.True(modified) @@ -742,7 +742,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).DoAndReturn( func(input *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { - s.Equal(1, len(input.UpdateWorkflowMutation.TimerTasks)) + s.Equal(1, len(input.UpdateWorkflowMutation.Tasks[tasks.CategoryTimer])) s.Equal(1, len(input.UpdateWorkflowMutation.UpsertActivityInfos)) mutableState.GetExecutionInfo().LastUpdateTime = input.UpdateWorkflowMutation.ExecutionInfo.LastUpdateTime input.RangeID = 0 @@ -761,9 +761,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple ExecutionInfo: mutableState.GetExecutionInfo(), ExecutionState: mutableState.GetExecutionState(), NextEventID: mutableState.GetNextEventID(), - TransferTasks: nil, - ReplicationTasks: nil, - TimerTasks: input.UpdateWorkflowMutation.TimerTasks, + Tasks: input.UpdateWorkflowMutation.Tasks, Condition: mutableState.GetNextEventID(), UpsertActivityInfos: input.UpdateWorkflowMutation.UpsertActivityInfos, DeleteActivityInfos: map[int64]struct{}{}, diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 85c28f34f44..a64e1a461ff 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -656,22 +656,22 @@ func (c *ContextImpl) mergeContinueAsNewReplicationTasks( // current workflow is doing continue as new // it is possible that continue as new is done as part of passive logic - if len(currentWorkflowMutation.ReplicationTasks) == 0 { + if len(currentWorkflowMutation.Tasks[tasks.CategoryReplication]) == 0 { return nil } - if newWorkflowSnapshot == nil || len(newWorkflowSnapshot.ReplicationTasks) != 1 { + if newWorkflowSnapshot == nil || len(newWorkflowSnapshot.Tasks[tasks.CategoryReplication]) != 1 { return serviceerror.NewInternal("unable to find replication task from new workflow for continue as new replication") } // merge the new run first event batch replication task // to current event batch replication task - newRunTask := newWorkflowSnapshot.ReplicationTasks[0].(*tasks.HistoryReplicationTask) - newWorkflowSnapshot.ReplicationTasks = nil + newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0].(*tasks.HistoryReplicationTask) + delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication) newRunBranchToken := newRunTask.BranchToken taskUpdated := false - for _, replicationTask := range currentWorkflowMutation.ReplicationTasks { + for _, replicationTask := range currentWorkflowMutation.Tasks[tasks.CategoryReplication] { if task, ok := replicationTask.(*tasks.HistoryReplicationTask); ok { taskUpdated = true task.NewRunBranchToken = newRunBranchToken diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index f48719dcd2b..fbf7d491732 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -238,16 +238,13 @@ type ( UpdateCurrentVersion(version int64, forceUpdate bool) error UpdateWorkflowStateStatus(state enumsspb.WorkflowExecutionState, status enumspb.WorkflowExecutionStatus) error - AddTransferTasks(transferTasks ...tasks.Task) - AddTimerTasks(timerTasks ...tasks.Task) - AddReplicationTasks(replicationTasks ...tasks.Task) - AddVisibilityTasks(visibilityTasks ...tasks.Task) + AddTasks(tasks ...tasks.Task) SetUpdateCondition(int64, int64) GetUpdateCondition() (int64, int64) StartTransaction(entry *namespace.Namespace) (bool, error) CloseTransactionAsMutation(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) CloseTransactionAsSnapshot(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) - GenerateLastHistoryReplicationTasks(now time.Time) (*tasks.HistoryReplicationTask, error) + GenerateLastHistoryReplicationTasks(now time.Time) (tasks.Task, error) } ) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 484fff7ef3c..0589663345e 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -156,10 +156,7 @@ type ( // TODO: persist this to db appliedEvents map[string]struct{} - InsertTransferTasks []tasks.Task - InsertTimerTasks []tasks.Task - InsertReplicationTasks []tasks.Task - InsertVisibilityTasks []tasks.Task + InsertTasks map[tasks.Category][]tasks.Task // do not rely on this, this is only updated on // Load() and closeTransactionXXX methods. So when @@ -226,6 +223,7 @@ func NewMutableState( dbRecordVersion: 1, namespaceEntry: namespaceEntry, appliedEvents: make(map[string]struct{}), + InsertTasks: make(map[tasks.Category][]tasks.Task), QueryRegistry: NewQueryRegistry(), @@ -1821,7 +1819,7 @@ func (e *MutableStateImpl) AddActivityTaskScheduledEvent( event := e.hBuilder.AddActivityTaskScheduledEvent(workflowTaskCompletedEventID, command) ai, err := e.ReplicateActivityTaskScheduledEvent(workflowTaskCompletedEventID, event) // TODO merge active & passive task generation - if err := e.taskGenerator.GenerateActivityTransferTasks( + if err := e.taskGenerator.GenerateActivityTasks( timestamp.TimeValue(event.GetEventTime()), event, ); err != nil { @@ -3596,40 +3594,17 @@ func (e *MutableStateImpl) RetryActivity( // TODO mutable state should generate corresponding transfer / timer tasks according to // updates accumulated, while currently all transfer / timer tasks are managed manually -// TODO convert AddTransferTasks to prepareTransferTasks +// TODO convert AddTasks to prepareTasks -// AddTransferTasks append transfer tasks -func (e *MutableStateImpl) AddTransferTasks( - transferTasks ...tasks.Task, +// AddTasks append transfer tasks +func (e *MutableStateImpl) AddTasks( + tasks ...tasks.Task, ) { - e.InsertTransferTasks = append(e.InsertTransferTasks, transferTasks...) -} - -// TODO convert AddTransferTasks to prepareTimerTasks - -// AddTimerTasks append timer tasks -func (e *MutableStateImpl) AddTimerTasks( - timerTasks ...tasks.Task, -) { - - e.InsertTimerTasks = append(e.InsertTimerTasks, timerTasks...) -} - -// AddReplicationTasks append visibility tasks -func (e *MutableStateImpl) AddReplicationTasks( - replicationTasks ...tasks.Task, -) { - - e.InsertReplicationTasks = append(e.InsertReplicationTasks, replicationTasks...) -} - -// AddVisibilityTasks append visibility tasks -func (e *MutableStateImpl) AddVisibilityTasks( - visibilityTasks ...tasks.Task, -) { - - e.InsertVisibilityTasks = append(e.InsertVisibilityTasks, visibilityTasks...) + for _, task := range tasks { + category := task.GetCategory() + e.InsertTasks[category] = append(e.InsertTasks[category], task) + } } func (e *MutableStateImpl) SetUpdateCondition( @@ -3707,7 +3682,7 @@ func (e *MutableStateImpl) CloseTransactionAsMutation( } } - setTaskInfo(e.GetCurrentVersion(), now, e.InsertTransferTasks, e.InsertTimerTasks, e.InsertVisibilityTasks) + setTaskInfo(e.GetCurrentVersion(), now, e.InsertTasks) // update last update time e.executionInfo.LastUpdateTime = &now @@ -3746,10 +3721,7 @@ func (e *MutableStateImpl) CloseTransactionAsMutation( NewBufferedEvents: bufferEvents, ClearBufferedEvents: clearBuffer, - TransferTasks: e.InsertTransferTasks, - ReplicationTasks: e.InsertReplicationTasks, - TimerTasks: e.InsertTimerTasks, - VisibilityTasks: e.InsertVisibilityTasks, + Tasks: e.InsertTasks, Condition: e.nextEventIDInDB, DBRecordVersion: e.dbRecordVersion, @@ -3796,7 +3768,7 @@ func (e *MutableStateImpl) CloseTransactionAsSnapshot( } } - setTaskInfo(e.GetCurrentVersion(), now, e.InsertTransferTasks, e.InsertTimerTasks, e.InsertVisibilityTasks) + setTaskInfo(e.GetCurrentVersion(), now, e.InsertTasks) // update last update time e.executionInfo.LastUpdateTime = &now @@ -3827,10 +3799,7 @@ func (e *MutableStateImpl) CloseTransactionAsSnapshot( SignalInfos: e.pendingSignalInfoIDs, SignalRequestedIDs: e.pendingSignalRequestedIDs, - TransferTasks: e.InsertTransferTasks, - ReplicationTasks: e.InsertReplicationTasks, - TimerTasks: e.InsertTimerTasks, - VisibilityTasks: e.InsertVisibilityTasks, + Tasks: e.InsertTasks, Condition: e.nextEventIDInDB, DBRecordVersion: e.dbRecordVersion, @@ -3859,7 +3828,9 @@ func (e *MutableStateImpl) UpdateDuplicatedResource( e.appliedEvents[id] = struct{}{} } -func (e *MutableStateImpl) GenerateLastHistoryReplicationTasks(now time.Time) (*tasks.HistoryReplicationTask, error) { +func (e *MutableStateImpl) GenerateLastHistoryReplicationTasks( + now time.Time, +) (tasks.Task, error) { return e.taskGenerator.GenerateLastHistoryReplicationTasks(now) } @@ -3933,10 +3904,7 @@ func (e *MutableStateImpl) cleanupTransaction( e.bufferEventsInDB, ) - e.InsertTransferTasks = nil - e.InsertReplicationTasks = nil - e.InsertTimerTasks = nil - e.InsertVisibilityTasks = nil + e.InsertTasks = make(map[tasks.Category][]tasks.Task) return nil } @@ -3995,12 +3963,13 @@ func (e *MutableStateImpl) prepareEventsAndReplicationTasks( } } - e.InsertReplicationTasks = append( - e.InsertReplicationTasks, + e.InsertTasks[tasks.CategoryReplication] = append( + e.InsertTasks[tasks.CategoryReplication], e.syncActivityToReplicationTask(now, transactionPolicy)..., ) - if transactionPolicy == TransactionPolicyPassive && len(e.InsertReplicationTasks) > 0 { + if transactionPolicy == TransactionPolicyPassive && + len(e.InsertTasks[tasks.CategoryReplication]) > 0 { return nil, nil, false, serviceerror.NewInternal("should not generate replication task when close transaction as passive") } diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index a6d66404d6f..2c7392257e1 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -375,22 +375,6 @@ func (mr *MockMutableStateMockRecorder) AddRecordMarkerEvent(arg0, arg1 interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecordMarkerEvent", reflect.TypeOf((*MockMutableState)(nil).AddRecordMarkerEvent), arg0, arg1) } -// AddReplicationTasks mocks base method. -func (m *MockMutableState) AddReplicationTasks(replicationTasks ...tasks.Task) { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range replicationTasks { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "AddReplicationTasks", varargs...) -} - -// AddReplicationTasks indicates an expected call of AddReplicationTasks. -func (mr *MockMutableStateMockRecorder) AddReplicationTasks(replicationTasks ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddReplicationTasks", reflect.TypeOf((*MockMutableState)(nil).AddReplicationTasks), replicationTasks...) -} - // AddRequestCancelExternalWorkflowExecutionFailedEvent mocks base method. func (m *MockMutableState) AddRequestCancelExternalWorkflowExecutionFailedEvent(arg0 int64, arg1 namespace.Name, arg2, arg3 string, arg4 v11.CancelExternalWorkflowExecutionFailedCause) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() @@ -496,6 +480,22 @@ func (mr *MockMutableStateMockRecorder) AddStartChildWorkflowExecutionInitiatedE return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddStartChildWorkflowExecutionInitiatedEvent", reflect.TypeOf((*MockMutableState)(nil).AddStartChildWorkflowExecutionInitiatedEvent), arg0, arg1, arg2) } +// AddTasks mocks base method. +func (m *MockMutableState) AddTasks(tasks ...tasks.Task) { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range tasks { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "AddTasks", varargs...) +} + +// AddTasks indicates an expected call of AddTasks. +func (mr *MockMutableStateMockRecorder) AddTasks(tasks ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTasks", reflect.TypeOf((*MockMutableState)(nil).AddTasks), tasks...) +} + // AddTimeoutWorkflowEvent mocks base method. func (m *MockMutableState) AddTimeoutWorkflowEvent(arg0 int64, arg1 v11.RetryState, arg2 string) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() @@ -557,38 +557,6 @@ func (mr *MockMutableStateMockRecorder) AddTimerStartedEvent(arg0, arg1 interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTimerStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddTimerStartedEvent), arg0, arg1) } -// AddTimerTasks mocks base method. -func (m *MockMutableState) AddTimerTasks(timerTasks ...tasks.Task) { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range timerTasks { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "AddTimerTasks", varargs...) -} - -// AddTimerTasks indicates an expected call of AddTimerTasks. -func (mr *MockMutableStateMockRecorder) AddTimerTasks(timerTasks ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTimerTasks", reflect.TypeOf((*MockMutableState)(nil).AddTimerTasks), timerTasks...) -} - -// AddTransferTasks mocks base method. -func (m *MockMutableState) AddTransferTasks(transferTasks ...tasks.Task) { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range transferTasks { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "AddTransferTasks", varargs...) -} - -// AddTransferTasks indicates an expected call of AddTransferTasks. -func (mr *MockMutableStateMockRecorder) AddTransferTasks(transferTasks ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTransferTasks", reflect.TypeOf((*MockMutableState)(nil).AddTransferTasks), transferTasks...) -} - // AddUpsertWorkflowSearchAttributesEvent mocks base method. func (m *MockMutableState) AddUpsertWorkflowSearchAttributesEvent(arg0 int64, arg1 *v1.UpsertWorkflowSearchAttributesCommandAttributes) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() @@ -604,22 +572,6 @@ func (mr *MockMutableStateMockRecorder) AddUpsertWorkflowSearchAttributesEvent(a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddUpsertWorkflowSearchAttributesEvent", reflect.TypeOf((*MockMutableState)(nil).AddUpsertWorkflowSearchAttributesEvent), arg0, arg1) } -// AddVisibilityTasks mocks base method. -func (m *MockMutableState) AddVisibilityTasks(visibilityTasks ...tasks.Task) { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range visibilityTasks { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "AddVisibilityTasks", varargs...) -} - -// AddVisibilityTasks indicates an expected call of AddVisibilityTasks. -func (mr *MockMutableStateMockRecorder) AddVisibilityTasks(visibilityTasks ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddVisibilityTasks", reflect.TypeOf((*MockMutableState)(nil).AddVisibilityTasks), visibilityTasks...) -} - // AddWorkflowExecutionCancelRequestedEvent mocks base method. func (m *MockMutableState) AddWorkflowExecutionCancelRequestedEvent(arg0 *v18.RequestCancelWorkflowExecutionRequest) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() @@ -940,10 +892,10 @@ func (mr *MockMutableStateMockRecorder) FlushBufferedEvents() *gomock.Call { } // GenerateLastHistoryReplicationTasks mocks base method. -func (m *MockMutableState) GenerateLastHistoryReplicationTasks(now time.Time) (*tasks.HistoryReplicationTask, error) { +func (m *MockMutableState) GenerateLastHistoryReplicationTasks(now time.Time) (tasks.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GenerateLastHistoryReplicationTasks", now) - ret0, _ := ret[0].(*tasks.HistoryReplicationTask) + ret0, _ := ret[0].(tasks.Task) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index 5db637e7bfa..5af72306703 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -294,7 +294,7 @@ func (b *MutableStateRebuilderImpl) ApplyEvents( return nil, err } - if err := taskGenerator.GenerateActivityTransferTasks( + if err := taskGenerator.GenerateActivityTasks( timestamp.TimeValue(event.GetEventTime()), event, ); err != nil { diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index afad23fd44a..d588a74c21a 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -1070,7 +1070,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskScheduled() { s.executionInfo.TaskQueue = taskqueue s.mockMutableState.EXPECT().ReplicateActivityTaskScheduledEvent(event.GetEventId(), event).Return(ai, nil) s.mockUpdateVersion(event) - s.mockTaskGenerator.EXPECT().GenerateActivityTransferTasks( + s.mockTaskGenerator.EXPECT().GenerateActivityTasks( timestamp.TimeValue(event.GetEventTime()), event, ).Return(nil) diff --git a/service/history/workflow/mutable_state_util.go b/service/history/workflow/mutable_state_util.go index 557ab9d8889..c83faeef2bb 100644 --- a/service/history/workflow/mutable_state_util.go +++ b/service/history/workflow/mutable_state_util.go @@ -56,23 +56,26 @@ func convertSyncActivityInfos( return outputs } +// TODO: can we deprecate this method and +// let task generator correctly set task version and +// visibility timestamp? func setTaskInfo( version int64, timestamp time.Time, - transferTasks []tasks.Task, - timerTasks []tasks.Task, - visibilityTasks []tasks.Task, + insertTasks map[tasks.Category][]tasks.Task, ) { - // set both the task version, as well as the Timestamp on the transfer tasks - for _, task := range transferTasks { - task.SetVersion(version) - task.SetVisibilityTime(timestamp) - } - for _, task := range timerTasks { - task.SetVersion(version) - } - for _, task := range visibilityTasks { - task.SetVersion(version) - task.SetVisibilityTime(timestamp) + // set the task version, + // as well as the Timestamp if not scheduled task + for category, tasksByCategory := range insertTasks { + if category == tasks.CategoryReplication { + continue + } + + for _, task := range tasksByCategory { + task.SetVersion(version) + if task.GetKey().FireTime.IsZero() { + task.SetVisibilityTime(timestamp) + } + } } } diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index ef1af631095..ba50843fa8a 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -71,7 +71,7 @@ type ( now time.Time, workflowTaskScheduleID int64, ) error - GenerateActivityTransferTasks( + GenerateActivityTasks( now time.Time, event *historypb.HistoryEvent, ) error @@ -115,7 +115,7 @@ type ( ) error GenerateLastHistoryReplicationTasks( now time.Time, - ) (*tasks.HistoryReplicationTask, error) + ) (tasks.Task, error) } TaskGeneratorImpl struct { @@ -158,7 +158,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks( return nil } - r.mutableState.AddTimerTasks(&tasks.WorkflowTimeoutTask{ + r.mutableState.AddTasks(&tasks.WorkflowTimeoutTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: workflowRunExpirationTime, @@ -175,20 +175,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( currentVersion := r.mutableState.GetCurrentVersion() executionInfo := r.mutableState.GetExecutionInfo() - r.mutableState.AddTransferTasks(&tasks.CloseExecutionTask{ - // TaskID is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - VisibilityTimestamp: now, - Version: currentVersion, - }) - - r.mutableState.AddVisibilityTasks(&tasks.CloseExecutionVisibilityTask{ - // TaskID is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - VisibilityTimestamp: now, - Version: currentVersion, - }) - retention := defaultWorkflowRetention namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId)) switch err.(type) { @@ -200,12 +186,26 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( return err } - r.mutableState.AddTimerTasks(&tasks.DeleteHistoryEventTask{ - // TaskID is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - VisibilityTimestamp: now.Add(retention), - Version: currentVersion, - }) + r.mutableState.AddTasks( + &tasks.CloseExecutionTask{ + // TaskID is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: now, + Version: currentVersion, + }, + &tasks.CloseExecutionVisibilityTask{ + // TaskID is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: now, + Version: currentVersion, + }, + &tasks.DeleteHistoryEventTask{ + // TaskID is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: now.Add(retention), + Version: currentVersion, + }, + ) return nil } @@ -216,7 +216,7 @@ func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask( currentVersion := r.mutableState.GetCurrentVersion() - r.mutableState.AddTransferTasks(&tasks.DeleteExecutionTask{ + r.mutableState.AddTasks(&tasks.DeleteExecutionTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -247,7 +247,7 @@ func (r *TaskGeneratorImpl) GenerateDelayedWorkflowTasks( return serviceerror.NewInternal(fmt.Sprintf("unknown initiator: %v", startAttr.GetInitiator())) } - r.mutableState.AddTimerTasks(&tasks.WorkflowBackoffTimerTask{ + r.mutableState.AddTasks(&tasks.WorkflowBackoffTimerTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: executionTimestamp, @@ -265,7 +265,7 @@ func (r *TaskGeneratorImpl) GenerateRecordWorkflowStartedTasks( startVersion := startEvent.GetVersion() - r.mutableState.AddVisibilityTasks(&tasks.StartExecutionVisibilityTask{ + r.mutableState.AddTasks(&tasks.StartExecutionVisibilityTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -286,7 +286,7 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduleID)) } - r.mutableState.AddTransferTasks(&tasks.WorkflowTask{ + r.mutableState.AddTasks(&tasks.WorkflowTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -299,7 +299,7 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( scheduledTime := timestamp.TimeValue(workflowTask.ScheduledTime) scheduleToStartTimeout := timestamp.DurationValue(r.mutableState.GetExecutionInfo().StickyScheduleToStartTimeout) - r.mutableState.AddTimerTasks(&tasks.WorkflowTaskTimeoutTask{ + r.mutableState.AddTasks(&tasks.WorkflowTaskTimeoutTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: scheduledTime.Add(scheduleToStartTimeout), @@ -328,7 +328,7 @@ func (r *TaskGeneratorImpl) GenerateStartWorkflowTaskTasks( startedTime := timestamp.TimeValue(workflowTask.StartedTime) workflowTaskTimeout := timestamp.DurationValue(workflowTask.WorkflowTaskTimeout) - r.mutableState.AddTimerTasks(&tasks.WorkflowTaskTimeoutTask{ + r.mutableState.AddTasks(&tasks.WorkflowTaskTimeoutTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: startedTime.Add(workflowTaskTimeout), @@ -341,7 +341,7 @@ func (r *TaskGeneratorImpl) GenerateStartWorkflowTaskTasks( return nil } -func (r *TaskGeneratorImpl) GenerateActivityTransferTasks( +func (r *TaskGeneratorImpl) GenerateActivityTasks( now time.Time, event *historypb.HistoryEvent, ) error { @@ -352,7 +352,7 @@ func (r *TaskGeneratorImpl) GenerateActivityTransferTasks( return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending activity: %v", activityScheduleID)) } - r.mutableState.AddTransferTasks(&tasks.ActivityTask{ + r.mutableState.AddTasks(&tasks.ActivityTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -374,7 +374,7 @@ func (r *TaskGeneratorImpl) GenerateActivityRetryTasks( return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending activity: %v", activityScheduleID)) } - r.mutableState.AddTimerTasks(&tasks.ActivityRetryTimerTask{ + r.mutableState.AddTasks(&tasks.ActivityRetryTimerTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), Version: ai.Version, @@ -404,7 +404,7 @@ func (r *TaskGeneratorImpl) GenerateChildWorkflowTasks( return err } - r.mutableState.AddTransferTasks(&tasks.StartChildExecutionTask{ + r.mutableState.AddTasks(&tasks.StartChildExecutionTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -440,7 +440,7 @@ func (r *TaskGeneratorImpl) GenerateRequestCancelExternalTasks( return err } - r.mutableState.AddTransferTasks(&tasks.CancelExecutionTask{ + r.mutableState.AddTasks(&tasks.CancelExecutionTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -478,7 +478,7 @@ func (r *TaskGeneratorImpl) GenerateSignalExternalTasks( return err } - r.mutableState.AddTransferTasks(&tasks.SignalExecutionTask{ + r.mutableState.AddTasks(&tasks.SignalExecutionTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -499,7 +499,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowSearchAttrTasks( currentVersion := r.mutableState.GetCurrentVersion() - r.mutableState.AddVisibilityTasks(&tasks.UpsertExecutionVisibilityTask{ + r.mutableState.AddTasks(&tasks.UpsertExecutionVisibilityTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -514,7 +514,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowResetTasks( currentVersion := r.mutableState.GetCurrentVersion() - r.mutableState.AddTransferTasks(&tasks.ResetWorkflowTask{ + r.mutableState.AddTasks(&tasks.ResetWorkflowTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, @@ -552,7 +552,7 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks( } version := firstEvent.GetVersion() - r.mutableState.AddReplicationTasks(&tasks.HistoryReplicationTask{ + r.mutableState.AddTasks(&tasks.HistoryReplicationTask{ // TaskID is set by shard VisibilityTimestamp: now, WorkflowKey: r.mutableState.GetWorkflowKey(), @@ -567,7 +567,7 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks( func (r *TaskGeneratorImpl) GenerateLastHistoryReplicationTasks( now time.Time, -) (*tasks.HistoryReplicationTask, error) { +) (tasks.Task, error) { executionInfo := r.mutableState.GetExecutionInfo() versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories()) if err != nil { diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index 81cb516e6bf..b0d0906d2db 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -74,32 +74,32 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateActivityRetryTasks(activitySche return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityRetryTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateActivityRetryTasks), activityScheduleID) } -// GenerateActivityTimerTasks mocks base method. -func (m *MockTaskGenerator) GenerateActivityTimerTasks(now time.Time) error { +// GenerateActivityTasks mocks base method. +func (m *MockTaskGenerator) GenerateActivityTasks(now time.Time, event *history.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateActivityTimerTasks", now) + ret := m.ctrl.Call(m, "GenerateActivityTasks", now, event) ret0, _ := ret[0].(error) return ret0 } -// GenerateActivityTimerTasks indicates an expected call of GenerateActivityTimerTasks. -func (mr *MockTaskGeneratorMockRecorder) GenerateActivityTimerTasks(now interface{}) *gomock.Call { +// GenerateActivityTasks indicates an expected call of GenerateActivityTasks. +func (mr *MockTaskGeneratorMockRecorder) GenerateActivityTasks(now, event interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityTimerTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateActivityTimerTasks), now) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateActivityTasks), now, event) } -// GenerateActivityTransferTasks mocks base method. -func (m *MockTaskGenerator) GenerateActivityTransferTasks(now time.Time, event *history.HistoryEvent) error { +// GenerateActivityTimerTasks mocks base method. +func (m *MockTaskGenerator) GenerateActivityTimerTasks(now time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateActivityTransferTasks", now, event) + ret := m.ctrl.Call(m, "GenerateActivityTimerTasks", now) ret0, _ := ret[0].(error) return ret0 } -// GenerateActivityTransferTasks indicates an expected call of GenerateActivityTransferTasks. -func (mr *MockTaskGeneratorMockRecorder) GenerateActivityTransferTasks(now, event interface{}) *gomock.Call { +// GenerateActivityTimerTasks indicates an expected call of GenerateActivityTimerTasks. +func (mr *MockTaskGeneratorMockRecorder) GenerateActivityTimerTasks(now interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityTransferTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateActivityTransferTasks), now, event) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityTimerTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateActivityTimerTasks), now) } // GenerateChildWorkflowTasks mocks base method. @@ -159,10 +159,10 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateHistoryReplicationTasks(now, br } // GenerateLastHistoryReplicationTasks mocks base method. -func (m *MockTaskGenerator) GenerateLastHistoryReplicationTasks(now time.Time) (*tasks.HistoryReplicationTask, error) { +func (m *MockTaskGenerator) GenerateLastHistoryReplicationTasks(now time.Time) (tasks.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GenerateLastHistoryReplicationTasks", now) - ret0, _ := ret[0].(*tasks.HistoryReplicationTask) + ret0, _ := ret[0].(tasks.Task) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index b614eda5f14..bae7cc82412 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -314,7 +314,7 @@ Loop: return err } - if err := taskGenerator.GenerateActivityTransferTasks( + if err := taskGenerator.GenerateActivityTasks( now, scheduleEvent, ); err != nil { diff --git a/service/history/workflow/timer_sequence.go b/service/history/workflow/timer_sequence.go index 6b84c735e73..81eeae9bcc8 100644 --- a/service/history/workflow/timer_sequence.go +++ b/service/history/workflow/timer_sequence.go @@ -136,7 +136,7 @@ func (t *timerSequenceImpl) CreateNextUserTimer() (bool, error) { if err := t.mutableState.UpdateUserTimer(timerInfo); err != nil { return false, err } - t.mutableState.AddTimerTasks(&tasks.UserTimerTask{ + t.mutableState.AddTasks(&tasks.UserTimerTask{ // TaskID is set by shard WorkflowKey: t.mutableState.GetWorkflowKey(), VisibilityTimestamp: firstTimerTask.Timestamp, @@ -183,7 +183,7 @@ func (t *timerSequenceImpl) CreateNextActivityTimer() (bool, error) { if err != nil { return false, err } - t.mutableState.AddTimerTasks(&tasks.ActivityTimeoutTask{ + t.mutableState.AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: t.mutableState.GetWorkflowKey(), VisibilityTimestamp: firstTimerTask.Timestamp, diff --git a/service/history/workflow/timer_sequence_test.go b/service/history/workflow/timer_sequence_test.go index f1f76b8be02..8a3ad781730 100644 --- a/service/history/workflow/timer_sequence_test.go +++ b/service/history/workflow/timer_sequence_test.go @@ -195,7 +195,7 @@ func (s *timerSequenceSuite) TestCreateNextUserTimer_NotCreated_BeforeWorkflowEx timerInfoUpdated.TaskStatus = TimerTaskStatusCreated s.mockMutableState.EXPECT().UpdateUserTimer(&timerInfoUpdated).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.UserTimerTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.UserTimerTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: *timerExpiry, @@ -230,7 +230,7 @@ func (s *timerSequenceSuite) TestCreateNextUserTimer_NotCreated_NoWorkflowExpiry timerInfoUpdated.TaskStatus = TimerTaskStatusCreated s.mockMutableState.EXPECT().UpdateUserTimer(&timerInfoUpdated).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.UserTimerTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.UserTimerTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: *timerExpiry, @@ -384,7 +384,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_BeforeWorkfl activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedScheduleToStart s.mockMutableState.EXPECT().UpdateActivity(&activityInfoUpdated).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.ActivityTimeoutTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: activityInfo.ScheduledTime.Add(*activityInfo.ScheduleToStartTimeout), @@ -428,7 +428,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_NoWorkflowEx activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedScheduleToStart s.mockMutableState.EXPECT().UpdateActivity(&activityInfoUpdated).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.ActivityTimeoutTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: activityInfo.ScheduledTime.Add(*activityInfo.ScheduleToStartTimeout), @@ -502,7 +502,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_BeforeWo activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedHeartbeat s.mockMutableState.EXPECT().UpdateActivityWithTimerHeartbeat(&activityInfoUpdated, taskVisibilityTimestamp).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.ActivityTimeoutTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: taskVisibilityTimestamp, @@ -548,7 +548,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_NoWorkfl activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedHeartbeat s.mockMutableState.EXPECT().UpdateActivityWithTimerHeartbeat(&activityInfoUpdated, taskVisibilityTimestamp).Return(nil) s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion) - s.mockMutableState.EXPECT().AddTimerTasks(&tasks.ActivityTimeoutTask{ + s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, VisibilityTimestamp: taskVisibilityTimestamp, diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index 8d2618991ea..ba6f56bec6d 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -522,10 +522,10 @@ func NotifyWorkflowSnapshotTasks( } notifyTasks( engine, - workflowSnapshot.TransferTasks, - workflowSnapshot.TimerTasks, - workflowSnapshot.ReplicationTasks, - workflowSnapshot.VisibilityTasks, + workflowSnapshot.Tasks[tasks.CategoryTransfer], + workflowSnapshot.Tasks[tasks.CategoryTimer], + workflowSnapshot.Tasks[tasks.CategoryReplication], + workflowSnapshot.Tasks[tasks.CategoryVisibility], clusterName, ) } @@ -540,10 +540,10 @@ func NotifyWorkflowMutationTasks( } notifyTasks( engine, - workflowMutation.TransferTasks, - workflowMutation.TimerTasks, - workflowMutation.ReplicationTasks, - workflowMutation.VisibilityTasks, + workflowMutation.Tasks[tasks.CategoryTransfer], + workflowMutation.Tasks[tasks.CategoryTimer], + workflowMutation.Tasks[tasks.CategoryReplication], + workflowMutation.Tasks[tasks.CategoryVisibility], clusterName, ) }