From b4c4b84abe797f3c43b09591fafc10e40cbc85bb Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 15 Apr 2021 23:03:02 -0700 Subject: [PATCH] Relax the constraint on which event ID is allow for ResetWorkflow API (#1465) * Allow reset workflow with event ID in range [workflow task schedule ID+1, workflow task start ID+1] * Add UT & integration test --- host/resetworkflow_test.go | 127 +++++++++++++++++- service/history/historyEngine.go | 8 ++ service/history/historyEngine_test.go | 21 ++- service/history/mutableState.go | 2 +- service/history/mutableStateBuilder.go | 8 +- service/history/mutableStateBuilder_test.go | 4 +- .../mutableStateWorkflowTaskManager.go | 11 +- .../mutableStateWorkflowTaskManager_mock.go | 8 +- service/history/mutableState_mock.go | 8 +- .../nDCTransactionMgrForNewWorkflow.go | 6 + .../timerQueueStandbyTaskExecutor_test.go | 2 +- service/history/workflowExecutionContext.go | 22 +-- service/history/workflowResetter.go | 83 +++++++++--- service/history/workflowResetter_test.go | 116 ++++++++++++++++ .../history/workflowTaskHandlerCallbacks.go | 18 ++- 15 files changed, 374 insertions(+), 70 deletions(-) diff --git a/host/resetworkflow_test.go b/host/resetworkflow_test.go index 1a51c162479..642a38ebfd6 100644 --- a/host/resetworkflow_test.go +++ b/host/resetworkflow_test.go @@ -50,7 +50,6 @@ func (s *integrationSuite) TestResetWorkflow() { identity := "worker1" workflowType := &commonpb.WorkflowType{Name: wt} - taskQueue := &taskqueuepb.TaskQueue{Name: tq} // Start workflow execution @@ -200,3 +199,129 @@ func (s *integrationSuite) TestResetWorkflow() { s.NotNil(firstActivityCompletionEvent) s.True(workflowComplete) } + +func (s *integrationSuite) TestResetWorkflow_WorkflowTask_Schedule() { + workflowID := "integration-reset-workflow-test-schedule" + workflowTypeName := "integration-reset-workflow-test-schedule-type" + taskQueueName := "integration-reset-workflow-test-schedule-taskqueue" + s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 3) +} + +func (s *integrationSuite) TestResetWorkflow_WorkflowTask_ScheduleToStart() { + workflowID := "integration-reset-workflow-test-schedule-to-start" + workflowTypeName := "integration-reset-workflow-test-schedule-to-start-type" + taskQueueName := "integration-reset-workflow-test-schedule-to-start-taskqueue" + s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 4) +} + +func (s *integrationSuite) TestResetWorkflow_WorkflowTask_Start() { + workflowID := "integration-reset-workflow-test-start" + workflowTypeName := "integration-reset-workflow-test-start-type" + taskQueueName := "integration-reset-workflow-test-start-taskqueue" + s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 5) +} + +func (s *integrationSuite) testResetWorkflowRangeScheduleToStart( + workflowID string, + workflowTypeName string, + taskQueueName string, + resetToEventID int64, +) { + identity := "worker1" + + workflowType := &commonpb.WorkflowType{Name: workflowTypeName} + taskQueue := &taskqueuepb.TaskQueue{Name: taskQueueName} + + // Start workflow execution + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: workflowID, + WorkflowType: workflowType, + TaskQueue: taskQueue, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + Identity: identity, + } + + we, err := s.engine.StartWorkflowExecution(NewContext(), request) + s.NoError(err) + + _, err = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.RunId, + }, + SignalName: "random signal name", + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{ + {Data: []byte("random signal payload")}, + }}, + Identity: identity, + }) + s.NoError(err) + + // workflow logic + workflowComplete := false + isWorkflowTaskProcessed := false + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, + previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + + if !isWorkflowTaskProcessed { + isWorkflowTaskProcessed = true + return []*commandpb.Command{}, nil + } + + // Complete workflow after reset + workflowComplete = true + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }}, + }}, nil + + } + + poller := &TaskPoller{ + Engine: s.engine, + Namespace: s.namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + ActivityTaskHandler: nil, + Logger: s.Logger, + T: s.T(), + } + + _, err = poller.PollAndProcessWorkflowTask(false, false) + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + + // events layout + // 1. WorkflowExecutionStarted + // 2. WorkflowTaskScheduled + // 3. WorkflowExecutionSignaled + // 4. WorkflowTaskStarted + // 5. WorkflowTaskCompleted + + // Reset workflow execution + _, err = s.engine.ResetWorkflowExecution(NewContext(), &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.RunId, + }, + Reason: "reset execution from test", + WorkflowTaskFinishEventId: resetToEventID, + RequestId: uuid.New(), + }) + s.NoError(err) + + _, err = poller.PollAndProcessWorkflowTask(false, false) + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + s.True(workflowComplete) +} diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index b524c0e2429..54e5f31ccac 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -567,6 +567,10 @@ func (e *historyEngineImpl) StartWorkflowExecution( if err != nil { return nil, err } + if len(newWorkflowEventsSeq) != 1 { + return nil, serviceerror.NewInternal("unable to create 1st event batch") + } + historySize, err := weContext.persistFirstWorkflowEvents(newWorkflowEventsSeq[0]) if err != nil { return nil, err @@ -1989,6 +1993,10 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( if err != nil { return nil, err } + if len(newWorkflowEventsSeq) != 1 { + return nil, serviceerror.NewInternal("unable to create 1st event batch") + } + historySize, err := context.persistFirstWorkflowEvents(newWorkflowEventsSeq[0]) if err != nil { return nil, err diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 6a8ff7d8b38..8592950a64f 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -5027,10 +5027,12 @@ func addWorkflowTaskStartedEvent(builder mutableState, scheduleID int64, taskQue func addWorkflowTaskStartedEventWithRequestID(builder mutableState, scheduleID int64, requestID string, taskQueue, identity string) *historypb.HistoryEvent { - event, _, _ := builder.AddWorkflowTaskStartedEvent(scheduleID, requestID, &workflowservice.PollWorkflowTaskQueueRequest{ - TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, - Identity: identity, - }) + event, _, _ := builder.AddWorkflowTaskStartedEvent( + scheduleID, + requestID, + &taskqueuepb.TaskQueue{Name: taskQueue}, + identity, + ) return event } @@ -5270,10 +5272,12 @@ func newMutableStateBuilderWithVersionHistoriesForTest( func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState { builder := ms.(*mutableStateBuilder) - builder.FlushBufferedEvents() + _, _, _ = builder.CloseTransactionAsMutation(time.Now().UTC(), transactionPolicyActive) + info := copyWorkflowExecutionInfo(builder.executionInfo) state := copyWorkflowExecutionState(builder.executionState) info.ExecutionStats = &persistencespb.ExecutionStats{} + info.VersionHistories = versionhistory.CopyVersionHistories(builder.executionInfo.VersionHistories) activityInfos := make(map[int64]*persistencespb.ActivityInfo) for id, info := range builder.pendingActivityInfoIDs { @@ -5296,12 +5300,6 @@ func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState { childInfos[id] = copyChildInfo(info) } - // FlushBuffer will also be called within the CloseTransactionAsMutation - _, _, _ = builder.CloseTransactionAsMutation(time.Now().UTC(), transactionPolicyActive) - if builder.executionInfo.VersionHistories != nil { - info.VersionHistories = versionhistory.CopyVersionHistories(builder.executionInfo.VersionHistories) - } - return &persistencespb.WorkflowMutableState{ ExecutionInfo: info, ExecutionState: state, @@ -5341,6 +5339,7 @@ func copyWorkflowExecutionInfo(sourceInfo *persistencespb.WorkflowExecutionInfo) WorkflowRunTimeout: sourceInfo.WorkflowRunTimeout, DefaultWorkflowTaskTimeout: sourceInfo.DefaultWorkflowTaskTimeout, LastFirstEventId: sourceInfo.LastFirstEventId, + LastFirstEventTxnId: sourceInfo.LastFirstEventTxnId, LastEventTaskId: sourceInfo.LastEventTaskId, LastProcessedEvent: sourceInfo.LastProcessedEvent, StartTime: sourceInfo.StartTime, diff --git a/service/history/mutableState.go b/service/history/mutableState.go index 53c356df2a9..1f7014d97c9 100644 --- a/service/history/mutableState.go +++ b/service/history/mutableState.go @@ -90,7 +90,7 @@ type ( AddFirstWorkflowTaskScheduled(*historypb.HistoryEvent) error AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool) (*workflowTaskInfo, error) AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time) (*workflowTaskInfo, error) - AddWorkflowTaskStartedEvent(int64, string, *workflowservice.PollWorkflowTaskQueueRequest) (*historypb.HistoryEvent, *workflowTaskInfo, error) + AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *workflowTaskInfo, error) AddWorkflowTaskTimedOutEvent(int64, int64) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionCancelRequested(int64, string, string, string) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionSignaled(int64, string, string, string, string) (*historypb.HistoryEvent, error) diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index a3f7e29a36e..1989ceb066a 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -1526,13 +1526,14 @@ func (e *mutableStateBuilder) ReplicateWorkflowTaskScheduledEvent( func (e *mutableStateBuilder) AddWorkflowTaskStartedEvent( scheduleEventID int64, requestID string, - request *workflowservice.PollWorkflowTaskQueueRequest, + taskQueue *taskqueuepb.TaskQueue, + identity string, ) (*historypb.HistoryEvent, *workflowTaskInfo, error) { opTag := tag.WorkflowActionWorkflowTaskStarted if err := e.checkMutability(opTag); err != nil { return nil, nil, err } - return e.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduleEventID, requestID, request) + return e.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduleEventID, requestID, taskQueue, identity) } func (e *mutableStateBuilder) ReplicateWorkflowTaskStartedEvent( @@ -3697,9 +3698,6 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( return nil, nil, err } - if len(workflowEventsSeq) > 1 { - return nil, nil, serviceerror.NewInternal("cannot generate workflow snapshot with transient events") - } if len(bufferEvents) > 0 { // TODO do we need the functionality to generate snapshot with buffered events? return nil, nil, serviceerror.NewInternal("cannot generate workflow snapshot with buffered events") diff --git a/service/history/mutableStateBuilder_test.go b/service/history/mutableStateBuilder_test.go index aa27bf580c7..5863a48b3a3 100644 --- a/service/history/mutableStateBuilder_test.go +++ b/service/history/mutableStateBuilder_test.go @@ -37,7 +37,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/api/workflowservice/v1" enumsspb "go.temporal.io/server/api/enums/v1" historyspb "go.temporal.io/server/api/history/v1" @@ -410,7 +409,8 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged _, _, err = s.msBuilder.AddWorkflowTaskStartedEvent( s.msBuilder.GetNextEventID(), uuid.New(), - &workflowservice.PollWorkflowTaskQueueRequest{}, + &taskqueuepb.TaskQueue{}, + "random identity", ) s.NoError(err) s.Equal(0, s.msBuilder.hBuilder.BufferEventSize()) diff --git a/service/history/mutableStateWorkflowTaskManager.go b/service/history/mutableStateWorkflowTaskManager.go index d473f3aa622..5d52a2fd863 100644 --- a/service/history/mutableStateWorkflowTaskManager.go +++ b/service/history/mutableStateWorkflowTaskManager.go @@ -70,7 +70,8 @@ type ( AddWorkflowTaskStartedEvent( scheduleEventID int64, requestID string, - request *workflowservice.PollWorkflowTaskQueueRequest, + taskQueue *taskqueuepb.TaskQueue, + identity string, ) (*historypb.HistoryEvent, *workflowTaskInfo, error) AddWorkflowTaskCompletedEvent( scheduleEventID int64, @@ -413,7 +414,8 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddFirstWorkflowTaskScheduled( func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent( scheduleEventID int64, requestID string, - request *workflowservice.PollWorkflowTaskQueueRequest, + taskQueue *taskqueuepb.TaskQueue, + identity string, ) (*historypb.HistoryEvent, *workflowTaskInfo, error) { opTag := tag.WorkflowActionWorkflowTaskStarted workflowTask, ok := m.GetWorkflowTaskInfo(scheduleEventID) @@ -433,7 +435,7 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent( if workflowTask.Attempt > 1 && (workflowTask.ScheduleID != m.msb.GetNextEventID() || workflowTask.Version != m.msb.GetCurrentVersion()) { // Also create a new WorkflowTaskScheduledEvent since new events came in when it was scheduled scheduleEvent := m.msb.hBuilder.AddWorkflowTaskScheduledEvent( - request.GetTaskQueue(), + taskQueue, int32(workflowTask.WorkflowTaskTimeout.Seconds()), 1, m.msb.timeSource.Now(), @@ -448,9 +450,10 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent( event = m.msb.hBuilder.AddWorkflowTaskStartedEvent( scheduleID, requestID, - request.GetIdentity(), + identity, m.msb.timeSource.Now(), ) + m.msb.hBuilder.FlushAndCreateNewBatch() startedID = event.GetEventId() startTime = timestamp.TimeValue(event.GetEventTime()) } diff --git a/service/history/mutableStateWorkflowTaskManager_mock.go b/service/history/mutableStateWorkflowTaskManager_mock.go index 935dfca281b..b0b5ebf666a 100644 --- a/service/history/mutableStateWorkflowTaskManager_mock.go +++ b/service/history/mutableStateWorkflowTaskManager_mock.go @@ -153,9 +153,9 @@ func (mr *MockmutableStateWorkflowTaskManagerMockRecorder) AddWorkflowTaskSchedu } // AddWorkflowTaskStartedEvent mocks base method. -func (m *MockmutableStateWorkflowTaskManager) AddWorkflowTaskStartedEvent(scheduleEventID int64, requestID string, request *workflowservice.PollWorkflowTaskQueueRequest) (*history.HistoryEvent, *workflowTaskInfo, error) { +func (m *MockmutableStateWorkflowTaskManager) AddWorkflowTaskStartedEvent(scheduleEventID int64, requestID string, taskQueue *taskqueue.TaskQueue, identity string) (*history.HistoryEvent, *workflowTaskInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", scheduleEventID, requestID, request) + ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", scheduleEventID, requestID, taskQueue, identity) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(*workflowTaskInfo) ret2, _ := ret[2].(error) @@ -163,9 +163,9 @@ func (m *MockmutableStateWorkflowTaskManager) AddWorkflowTaskStartedEvent(schedu } // AddWorkflowTaskStartedEvent indicates an expected call of AddWorkflowTaskStartedEvent. -func (mr *MockmutableStateWorkflowTaskManagerMockRecorder) AddWorkflowTaskStartedEvent(scheduleEventID, requestID, request interface{}) *gomock.Call { +func (mr *MockmutableStateWorkflowTaskManagerMockRecorder) AddWorkflowTaskStartedEvent(scheduleEventID, requestID, taskQueue, identity interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockmutableStateWorkflowTaskManager)(nil).AddWorkflowTaskStartedEvent), scheduleEventID, requestID, request) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockmutableStateWorkflowTaskManager)(nil).AddWorkflowTaskStartedEvent), scheduleEventID, requestID, taskQueue, identity) } // AddWorkflowTaskTimedOutEvent mocks base method. diff --git a/service/history/mutableState_mock.go b/service/history/mutableState_mock.go index ee6ed9e80bb..00a2192a818 100644 --- a/service/history/mutableState_mock.go +++ b/service/history/mutableState_mock.go @@ -754,9 +754,9 @@ func (mr *MockmutableStateMockRecorder) AddWorkflowTaskScheduledEventAsHeartbeat } // AddWorkflowTaskStartedEvent mocks base method. -func (m *MockmutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, arg2 *workflowservice.PollWorkflowTaskQueueRequest) (*history.HistoryEvent, *workflowTaskInfo, error) { +func (m *MockmutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, arg2 *taskqueue.TaskQueue, arg3 string) (*history.HistoryEvent, *workflowTaskInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(*workflowTaskInfo) ret2, _ := ret[2].(error) @@ -764,9 +764,9 @@ func (m *MockmutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, } // AddWorkflowTaskStartedEvent indicates an expected call of AddWorkflowTaskStartedEvent. -func (mr *MockmutableStateMockRecorder) AddWorkflowTaskStartedEvent(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockmutableStateMockRecorder) AddWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockmutableState)(nil).AddWorkflowTaskStartedEvent), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockmutableState)(nil).AddWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3) } // AddWorkflowTaskTimedOutEvent mocks base method. diff --git a/service/history/nDCTransactionMgrForNewWorkflow.go b/service/history/nDCTransactionMgrForNewWorkflow.go index cd55ff5fae2..5c6febb0673 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow.go +++ b/service/history/nDCTransactionMgrForNewWorkflow.go @@ -162,6 +162,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent( if err != nil { return err } + if len(targetWorkflowEventsSeq) != 1 { + return serviceerror.NewInternal("unable to create 1st event batch") + } targetWorkflowHistorySize, err := r.persistNewNDCWorkflowEvents( targetWorkflow, @@ -233,6 +236,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( if err != nil { return err } + if len(targetWorkflowEventsSeq) != 1 { + return serviceerror.NewInternal("unable to create 1st event batch") + } targetWorkflowHistorySize, err := r.persistNewNDCWorkflowEvents( targetWorkflow, diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index 9faf62617fb..8491fc1c48b 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -690,7 +690,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple activityType2 := "activity type 2" timerTimeout2 := 20 * time.Second scheduledEvent2, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID2, activityType2, taskqueue, nil, - timerTimeout2, timerTimeout2, timerTimeout2, timerTimeout2) + timerTimeout2, timerTimeout2, timerTimeout2, 10*time.Second) addActivityTaskStartedEvent(mutableState, scheduledEvent2.GetEventId(), identity) activityInfo2 := mutableState.pendingActivityInfoIDs[scheduledEvent2.GetEventId()] activityInfo2.TimerTaskStatus |= timerTaskStatusCreatedHeartbeat diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index cfea24303c1..9f10a30bb84 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -477,12 +477,13 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution( return err } newWorkflowSizeSize := newContext.getHistorySize() - startEvents := newWorkflowEventsSeq[0] - eventsSize, err := c.persistNewWorkflowEvents(startEvents) - if err != nil { - return err + for _, workflowEvents := range newWorkflowEventsSeq { + eventsSize, err := c.persistNewWorkflowEvents(workflowEvents) + if err != nil { + return err + } + newWorkflowSizeSize += eventsSize } - newWorkflowSizeSize += eventsSize newContext.setHistorySize(newWorkflowSizeSize) newWorkflow.ExecutionInfo.ExecutionStats = &persistencespb.ExecutionStats{ HistorySize: newWorkflowSizeSize, @@ -721,12 +722,13 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNew( return err } newWorkflowSizeSize := newContext.getHistorySize() - startEvents := newWorkflowEventsSeq[0] - eventsSize, err := c.persistNewWorkflowEvents(startEvents) - if err != nil { - return err + for _, workflowEvents := range newWorkflowEventsSeq { + eventsSize, err := c.persistNewWorkflowEvents(workflowEvents) + if err != nil { + return err + } + newWorkflowSizeSize += eventsSize } - newWorkflowSizeSize += eventsSize newContext.setHistorySize(newWorkflowSizeSize) newWorkflow.ExecutionInfo.ExecutionStats = &persistencespb.ExecutionStats{ HistorySize: newWorkflowSizeSize, diff --git a/service/history/workflowResetter.go b/service/history/workflowResetter.go index dff7f53d158..79af352715e 100644 --- a/service/history/workflowResetter.go +++ b/service/history/workflowResetter.go @@ -203,8 +203,7 @@ func (r *workflowResetterImpl) prepareResetWorkflow( executionInfo.WorkflowExecutionExpirationTime = timestamp.TimeNowPtrUtcAddDuration(weTimeout) } - baseLastEventVersion := resetMutableState.GetCurrentVersion() - if baseLastEventVersion > resetWorkflowVersion { + if resetMutableState.GetCurrentVersion() > resetWorkflowVersion { return nil, serviceerror.NewInternal("workflowResetter encounter version mismatch.") } if err := resetMutableState.UpdateCurrentVersion( @@ -214,27 +213,18 @@ func (r *workflowResetterImpl) prepareResetWorkflow( return nil, err } - // TODO add checking of reset until event ID == workflow task started ID + 1 - workflowTask, ok := resetMutableState.GetInFlightWorkflowTask() - if !ok || workflowTask.StartedID+1 != resetMutableState.GetNextEventID() { - return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Can only reset workflow to WorkflowTaskStarted + 1: %v", baseRebuildLastEventID+1)) - } if len(resetMutableState.GetPendingChildExecutionInfos()) > 0 { return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Can only reset workflow with pending child workflows")) } - resetFailure := failure.NewResetWorkflowFailure(resetReason, nil) - _, err = resetMutableState.AddWorkflowTaskFailedEvent( - workflowTask.ScheduleID, - workflowTask.StartedID, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, - resetFailure, - identityHistoryService, - "", + if err := r.failWorkflowTask( + resetMutableState, baseRunID, + baseRebuildLastEventID, + baseRebuildLastEventVersion, resetRunID, - baseLastEventVersion, - ) - if err != nil { + resetReason, + ); err != nil { return nil, err } @@ -295,11 +285,14 @@ func (r *workflowResetterImpl) persistToDB( if err != nil { return err } - resetHistorySize, err := resetWorkflow.getContext().persistNonFirstWorkflowEvents(resetWorkflowEventsSeq[0]) - if err != nil { - return err + var resetHistorySize int64 + for _, workflowEvents := range resetWorkflowEventsSeq { + size, err := resetWorkflow.getContext().persistNonFirstWorkflowEvents(workflowEvents) + if err != nil { + return err + } + resetHistorySize += size } - return resetWorkflow.getContext().createWorkflowExecution( resetWorkflowSnapshot, resetHistorySize, @@ -377,6 +370,54 @@ func (r *workflowResetterImpl) replayResetWorkflow( ), nil } +func (r *workflowResetterImpl) failWorkflowTask( + resetMutableState mutableState, + baseRunID string, + baseRebuildLastEventID int64, + baseRebuildLastEventVersion int64, + resetRunID string, + resetReason string, +) error { + + workflowTask, ok := resetMutableState.GetPendingWorkflowTask() + if !ok { + // TODO if resetMutableState.HasProcessedOrPendingWorkflowTask() == true + // meaning workflow history has NO workflow task ever + // should also allow workflow reset, the only remaining issues are + // * what if workflow is a cron workflow, e.g. should add a workflow task directly or still respect the cron job + return serviceerror.NewInvalidArgument(fmt.Sprintf( + "Can only reset workflow to event ID in range [WorkflowTaskScheduled +1, WorkflowTaskStarted + 1]: %v", + baseRebuildLastEventID+1, + )) + } + + var err error + if workflowTask.StartedID == common.EmptyVersion { + _, workflowTask, err = resetMutableState.AddWorkflowTaskStartedEvent( + workflowTask.ScheduleID, + workflowTask.RequestID, + workflowTask.TaskQueue, + identityHistoryService, + ) + if err != nil { + return err + } + } + + _, err = resetMutableState.AddWorkflowTaskFailedEvent( + workflowTask.ScheduleID, + workflowTask.StartedID, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, + failure.NewResetWorkflowFailure(resetReason, nil), + identityHistoryService, + "", + baseRunID, + resetRunID, + baseRebuildLastEventVersion, + ) + return err +} + func (r *workflowResetterImpl) failInflightActivity( mutableState mutableState, terminateReason string, diff --git a/service/history/workflowResetter_test.go b/service/history/workflowResetter_test.go index 150ced6cb42..4e9f5fb0c34 100644 --- a/service/history/workflowResetter_test.go +++ b/service/history/workflowResetter_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" @@ -264,6 +265,121 @@ func (s *workflowResetterSuite) TestReplayResetWorkflow() { s.Equal(resetMutableState, resetWorkflow.getMutableState()) } +func (s *workflowResetterSuite) TestFailWorkflowTask_NoWorkflowTask() { + baseRunID := uuid.New() + baseRebuildLastEventID := int64(1234) + baseRebuildLastEventVersion := int64(5678) + resetRunID := uuid.New() + resetReason := "some random reset reason" + + mutableState := NewMockmutableState(s.controller) + mutableState.EXPECT().GetPendingWorkflowTask().Return(nil, false).AnyTimes() + + err := s.workflowResetter.failWorkflowTask( + mutableState, + baseRunID, + baseRebuildLastEventID, + baseRebuildLastEventVersion, + resetRunID, + resetReason, + ) + s.Error(err) +} + +func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() { + baseRunID := uuid.New() + baseRebuildLastEventID := int64(1234) + baseRebuildLastEventVersion := int64(5678) + resetRunID := uuid.New() + resetReason := "some random reset reason" + + mutableState := NewMockmutableState(s.controller) + workflowTaskSchedule := &workflowTaskInfo{ + ScheduleID: baseRebuildLastEventID - 12, + StartedID: common.EmptyEventID, + RequestID: uuid.New(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: "random task queue name", + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + } + workflowTaskStart := &workflowTaskInfo{ + ScheduleID: workflowTaskSchedule.ScheduleID, + StartedID: workflowTaskSchedule.ScheduleID + 1, + RequestID: workflowTaskSchedule.RequestID, + TaskQueue: workflowTaskSchedule.TaskQueue, + } + mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTaskSchedule, true).AnyTimes() + mutableState.EXPECT().AddWorkflowTaskStartedEvent( + workflowTaskSchedule.ScheduleID, + workflowTaskSchedule.RequestID, + workflowTaskSchedule.TaskQueue, + identityHistoryService, + ).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil) + mutableState.EXPECT().AddWorkflowTaskFailedEvent( + workflowTaskStart.ScheduleID, + workflowTaskStart.StartedID, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, + failure.NewResetWorkflowFailure(resetReason, nil), + identityHistoryService, + "", + baseRunID, + resetRunID, + baseRebuildLastEventVersion, + ).Return(&historypb.HistoryEvent{}, nil) + + err := s.workflowResetter.failWorkflowTask( + mutableState, + baseRunID, + baseRebuildLastEventID, + baseRebuildLastEventVersion, + resetRunID, + resetReason, + ) + s.NoError(err) +} + +func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskStarted() { + baseRunID := uuid.New() + baseRebuildLastEventID := int64(1234) + baseRebuildLastEventVersion := int64(5678) + resetRunID := uuid.New() + resetReason := "some random reset reason" + + mutableState := NewMockmutableState(s.controller) + workflowTask := &workflowTaskInfo{ + ScheduleID: baseRebuildLastEventID - 12, + StartedID: baseRebuildLastEventID - 10, + RequestID: uuid.New(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: "random task queue name", + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + } + mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask, true).AnyTimes() + mutableState.EXPECT().AddWorkflowTaskFailedEvent( + workflowTask.ScheduleID, + workflowTask.StartedID, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, + failure.NewResetWorkflowFailure(resetReason, nil), + identityHistoryService, + "", + baseRunID, + resetRunID, + baseRebuildLastEventVersion, + ).Return(&historypb.HistoryEvent{}, nil) + + err := s.workflowResetter.failWorkflowTask( + mutableState, + baseRunID, + baseRebuildLastEventID, + baseRebuildLastEventVersion, + resetRunID, + resetReason, + ) + s.NoError(err) +} + func (s *workflowResetterSuite) TestFailInflightActivity() { terminateReason := "some random termination reason" diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index ccbf79ad21f..74dfbd6390b 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -34,7 +34,6 @@ import ( querypb "go.temporal.io/api/query/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/api/workflowservice/v1" historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" @@ -211,7 +210,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( return nil, serviceerrors.NewTaskAlreadyStarted("Workflow") } - _, workflowTask, err = mutableState.AddWorkflowTaskStartedEvent(scheduleID, requestID, req.PollRequest) + _, workflowTask, err = mutableState.AddWorkflowTaskStartedEvent( + scheduleID, + requestID, + req.PollRequest.TaskQueue, + req.PollRequest.Identity, + ) if err != nil { // Unable to add WorkflowTaskStarted event to history return nil, serviceerror.NewInternal("Unable to add WorkflowTaskStarted event to history.") @@ -475,10 +479,12 @@ Update_History_Loop: if request.GetReturnNewWorkflowTask() { // start the new workflow task if request asked to do so // TODO: replace the poll request - _, _, err := msBuilder.AddWorkflowTaskStartedEvent(newWorkflowTask.ScheduleID, "request-from-RespondWorkflowTaskCompleted", &workflowservice.PollWorkflowTaskQueueRequest{ - TaskQueue: newWorkflowTask.TaskQueue, - Identity: request.Identity, - }) + _, _, err := msBuilder.AddWorkflowTaskStartedEvent( + newWorkflowTask.ScheduleID, + "request-from-RespondWorkflowTaskCompleted", + newWorkflowTask.TaskQueue, + request.Identity, + ) if err != nil { return nil, err }