Skip to content

Commit

Permalink
Relax the constraint on which event ID is allow for ResetWorkflow API (
Browse files Browse the repository at this point in the history
…#1465)

* Allow reset workflow with event ID in range [workflow task schedule ID+1, workflow task start ID+1]
* Add UT & integration test
  • Loading branch information
wxing1292 committed Apr 16, 2021
1 parent 5d17b1e commit b4c4b84
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 70 deletions.
127 changes: 126 additions & 1 deletion host/resetworkflow_test.go
Expand Up @@ -50,7 +50,6 @@ func (s *integrationSuite) TestResetWorkflow() {
identity := "worker1"

workflowType := &commonpb.WorkflowType{Name: wt}

taskQueue := &taskqueuepb.TaskQueue{Name: tq}

// Start workflow execution
Expand Down Expand Up @@ -200,3 +199,129 @@ func (s *integrationSuite) TestResetWorkflow() {
s.NotNil(firstActivityCompletionEvent)
s.True(workflowComplete)
}

func (s *integrationSuite) TestResetWorkflow_WorkflowTask_Schedule() {
workflowID := "integration-reset-workflow-test-schedule"
workflowTypeName := "integration-reset-workflow-test-schedule-type"
taskQueueName := "integration-reset-workflow-test-schedule-taskqueue"
s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 3)
}

func (s *integrationSuite) TestResetWorkflow_WorkflowTask_ScheduleToStart() {
workflowID := "integration-reset-workflow-test-schedule-to-start"
workflowTypeName := "integration-reset-workflow-test-schedule-to-start-type"
taskQueueName := "integration-reset-workflow-test-schedule-to-start-taskqueue"
s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 4)
}

func (s *integrationSuite) TestResetWorkflow_WorkflowTask_Start() {
workflowID := "integration-reset-workflow-test-start"
workflowTypeName := "integration-reset-workflow-test-start-type"
taskQueueName := "integration-reset-workflow-test-start-taskqueue"
s.testResetWorkflowRangeScheduleToStart(workflowID, workflowTypeName, taskQueueName, 5)
}

func (s *integrationSuite) testResetWorkflowRangeScheduleToStart(
workflowID string,
workflowTypeName string,
taskQueueName string,
resetToEventID int64,
) {
identity := "worker1"

workflowType := &commonpb.WorkflowType{Name: workflowTypeName}
taskQueue := &taskqueuepb.TaskQueue{Name: taskQueueName}

// Start workflow execution
request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: workflowID,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}

we, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

_, err = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: we.RunId,
},
SignalName: "random signal name",
Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{
{Data: []byte("random signal payload")},
}},
Identity: identity,
})
s.NoError(err)

// workflow logic
workflowComplete := false
isWorkflowTaskProcessed := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

if !isWorkflowTaskProcessed {
isWorkflowTaskProcessed = true
return []*commandpb.Command{}, nil
}

// Complete workflow after reset
workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil

}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.Logger,
T: s.T(),
}

_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

// events layout
// 1. WorkflowExecutionStarted
// 2. WorkflowTaskScheduled
// 3. WorkflowExecutionSignaled
// 4. WorkflowTaskStarted
// 5. WorkflowTaskCompleted

// Reset workflow execution
_, err = s.engine.ResetWorkflowExecution(NewContext(), &workflowservice.ResetWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: we.RunId,
},
Reason: "reset execution from test",
WorkflowTaskFinishEventId: resetToEventID,
RequestId: uuid.New(),
})
s.NoError(err)

_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(workflowComplete)
}
8 changes: 8 additions & 0 deletions service/history/historyEngine.go
Expand Up @@ -567,6 +567,10 @@ func (e *historyEngineImpl) StartWorkflowExecution(
if err != nil {
return nil, err
}
if len(newWorkflowEventsSeq) != 1 {
return nil, serviceerror.NewInternal("unable to create 1st event batch")
}

historySize, err := weContext.persistFirstWorkflowEvents(newWorkflowEventsSeq[0])
if err != nil {
return nil, err
Expand Down Expand Up @@ -1989,6 +1993,10 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
if err != nil {
return nil, err
}
if len(newWorkflowEventsSeq) != 1 {
return nil, serviceerror.NewInternal("unable to create 1st event batch")
}

historySize, err := context.persistFirstWorkflowEvents(newWorkflowEventsSeq[0])
if err != nil {
return nil, err
Expand Down
21 changes: 10 additions & 11 deletions service/history/historyEngine_test.go
Expand Up @@ -5027,10 +5027,12 @@ func addWorkflowTaskStartedEvent(builder mutableState, scheduleID int64, taskQue

func addWorkflowTaskStartedEventWithRequestID(builder mutableState, scheduleID int64, requestID string,
taskQueue, identity string) *historypb.HistoryEvent {
event, _, _ := builder.AddWorkflowTaskStartedEvent(scheduleID, requestID, &workflowservice.PollWorkflowTaskQueueRequest{
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Identity: identity,
})
event, _, _ := builder.AddWorkflowTaskStartedEvent(
scheduleID,
requestID,
&taskqueuepb.TaskQueue{Name: taskQueue},
identity,
)

return event
}
Expand Down Expand Up @@ -5270,10 +5272,12 @@ func newMutableStateBuilderWithVersionHistoriesForTest(

func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState {
builder := ms.(*mutableStateBuilder)
builder.FlushBufferedEvents()
_, _, _ = builder.CloseTransactionAsMutation(time.Now().UTC(), transactionPolicyActive)

info := copyWorkflowExecutionInfo(builder.executionInfo)
state := copyWorkflowExecutionState(builder.executionState)
info.ExecutionStats = &persistencespb.ExecutionStats{}
info.VersionHistories = versionhistory.CopyVersionHistories(builder.executionInfo.VersionHistories)

activityInfos := make(map[int64]*persistencespb.ActivityInfo)
for id, info := range builder.pendingActivityInfoIDs {
Expand All @@ -5296,12 +5300,6 @@ func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState {
childInfos[id] = copyChildInfo(info)
}

// FlushBuffer will also be called within the CloseTransactionAsMutation
_, _, _ = builder.CloseTransactionAsMutation(time.Now().UTC(), transactionPolicyActive)
if builder.executionInfo.VersionHistories != nil {
info.VersionHistories = versionhistory.CopyVersionHistories(builder.executionInfo.VersionHistories)
}

return &persistencespb.WorkflowMutableState{
ExecutionInfo: info,
ExecutionState: state,
Expand Down Expand Up @@ -5341,6 +5339,7 @@ func copyWorkflowExecutionInfo(sourceInfo *persistencespb.WorkflowExecutionInfo)
WorkflowRunTimeout: sourceInfo.WorkflowRunTimeout,
DefaultWorkflowTaskTimeout: sourceInfo.DefaultWorkflowTaskTimeout,
LastFirstEventId: sourceInfo.LastFirstEventId,
LastFirstEventTxnId: sourceInfo.LastFirstEventTxnId,
LastEventTaskId: sourceInfo.LastEventTaskId,
LastProcessedEvent: sourceInfo.LastProcessedEvent,
StartTime: sourceInfo.StartTime,
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableState.go
Expand Up @@ -90,7 +90,7 @@ type (
AddFirstWorkflowTaskScheduled(*historypb.HistoryEvent) error
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool) (*workflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time) (*workflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *workflowservice.PollWorkflowTaskQueueRequest) (*historypb.HistoryEvent, *workflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *workflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(int64, int64) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, string, string, string) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionSignaled(int64, string, string, string, string) (*historypb.HistoryEvent, error)
Expand Down
8 changes: 3 additions & 5 deletions service/history/mutableStateBuilder.go
Expand Up @@ -1526,13 +1526,14 @@ func (e *mutableStateBuilder) ReplicateWorkflowTaskScheduledEvent(
func (e *mutableStateBuilder) AddWorkflowTaskStartedEvent(
scheduleEventID int64,
requestID string,
request *workflowservice.PollWorkflowTaskQueueRequest,
taskQueue *taskqueuepb.TaskQueue,
identity string,
) (*historypb.HistoryEvent, *workflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
if err := e.checkMutability(opTag); err != nil {
return nil, nil, err
}
return e.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduleEventID, requestID, request)
return e.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduleEventID, requestID, taskQueue, identity)
}

func (e *mutableStateBuilder) ReplicateWorkflowTaskStartedEvent(
Expand Down Expand Up @@ -3697,9 +3698,6 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot(
return nil, nil, err
}

if len(workflowEventsSeq) > 1 {
return nil, nil, serviceerror.NewInternal("cannot generate workflow snapshot with transient events")
}
if len(bufferEvents) > 0 {
// TODO do we need the functionality to generate snapshot with buffered events?
return nil, nil, serviceerror.NewInternal("cannot generate workflow snapshot with buffered events")
Expand Down
4 changes: 2 additions & 2 deletions service/history/mutableStateBuilder_test.go
Expand Up @@ -37,7 +37,6 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
Expand Down Expand Up @@ -410,7 +409,8 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
_, _, err = s.msBuilder.AddWorkflowTaskStartedEvent(
s.msBuilder.GetNextEventID(),
uuid.New(),
&workflowservice.PollWorkflowTaskQueueRequest{},
&taskqueuepb.TaskQueue{},
"random identity",
)
s.NoError(err)
s.Equal(0, s.msBuilder.hBuilder.BufferEventSize())
Expand Down
11 changes: 7 additions & 4 deletions service/history/mutableStateWorkflowTaskManager.go
Expand Up @@ -70,7 +70,8 @@ type (
AddWorkflowTaskStartedEvent(
scheduleEventID int64,
requestID string,
request *workflowservice.PollWorkflowTaskQueueRequest,
taskQueue *taskqueuepb.TaskQueue,
identity string,
) (*historypb.HistoryEvent, *workflowTaskInfo, error)
AddWorkflowTaskCompletedEvent(
scheduleEventID int64,
Expand Down Expand Up @@ -413,7 +414,8 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddFirstWorkflowTaskScheduled(
func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent(
scheduleEventID int64,
requestID string,
request *workflowservice.PollWorkflowTaskQueueRequest,
taskQueue *taskqueuepb.TaskQueue,
identity string,
) (*historypb.HistoryEvent, *workflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
workflowTask, ok := m.GetWorkflowTaskInfo(scheduleEventID)
Expand All @@ -433,7 +435,7 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent(
if workflowTask.Attempt > 1 && (workflowTask.ScheduleID != m.msb.GetNextEventID() || workflowTask.Version != m.msb.GetCurrentVersion()) {
// Also create a new WorkflowTaskScheduledEvent since new events came in when it was scheduled
scheduleEvent := m.msb.hBuilder.AddWorkflowTaskScheduledEvent(
request.GetTaskQueue(),
taskQueue,
int32(workflowTask.WorkflowTaskTimeout.Seconds()),
1,
m.msb.timeSource.Now(),
Expand All @@ -448,9 +450,10 @@ func (m *mutableStateWorkflowTaskManagerImpl) AddWorkflowTaskStartedEvent(
event = m.msb.hBuilder.AddWorkflowTaskStartedEvent(
scheduleID,
requestID,
request.GetIdentity(),
identity,
m.msb.timeSource.Now(),
)
m.msb.hBuilder.FlushAndCreateNewBatch()
startedID = event.GetEventId()
startTime = timestamp.TimeValue(event.GetEventTime())
}
Expand Down
8 changes: 4 additions & 4 deletions service/history/mutableStateWorkflowTaskManager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions service/history/mutableState_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/history/nDCTransactionMgrForNewWorkflow.go
Expand Up @@ -162,6 +162,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent(
if err != nil {
return err
}
if len(targetWorkflowEventsSeq) != 1 {
return serviceerror.NewInternal("unable to create 1st event batch")
}

targetWorkflowHistorySize, err := r.persistNewNDCWorkflowEvents(
targetWorkflow,
Expand Down Expand Up @@ -233,6 +236,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
if err != nil {
return err
}
if len(targetWorkflowEventsSeq) != 1 {
return serviceerror.NewInternal("unable to create 1st event batch")
}

targetWorkflowHistorySize, err := r.persistNewNDCWorkflowEvents(
targetWorkflow,
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueStandbyTaskExecutor_test.go
Expand Up @@ -690,7 +690,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
activityType2 := "activity type 2"
timerTimeout2 := 20 * time.Second
scheduledEvent2, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID2, activityType2, taskqueue, nil,
timerTimeout2, timerTimeout2, timerTimeout2, timerTimeout2)
timerTimeout2, timerTimeout2, timerTimeout2, 10*time.Second)
addActivityTaskStartedEvent(mutableState, scheduledEvent2.GetEventId(), identity)
activityInfo2 := mutableState.pendingActivityInfoIDs[scheduledEvent2.GetEventId()]
activityInfo2.TimerTaskStatus |= timerTaskStatusCreatedHeartbeat
Expand Down

0 comments on commit b4c4b84

Please sign in to comment.