Skip to content

Commit

Permalink
Do not wake up workflow in retry backoff upon signal (#2771)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 27, 2022
1 parent 8e3fb97 commit f7efbad
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
5 changes: 3 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,8 +2003,9 @@ func (e *historyEngineImpl) SignalWorkflowExecution(

executionInfo := mutableState.GetExecutionInfo()
createWorkflowTask := true
// Do not create workflow task when the workflow is cron and the cron has not been started yet
if executionInfo.CronSchedule != "" && !mutableState.HasProcessedOrPendingWorkflowTask() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
workflowTaskBackoff := timestamp.TimeValue(executionInfo.GetExecutionTime()).After(timestamp.TimeValue(executionInfo.GetStartTime()))
if workflowTaskBackoff && !mutableState.HasProcessedOrPendingWorkflowTask() {
createWorkflowTask = false
}

Expand Down
64 changes: 64 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4940,6 +4940,70 @@ func (s *engineSuite) TestSignalWorkflowExecution_Failed() {
s.EqualError(err, "workflow execution already completed")
}

func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}
taskqueue := "testTaskQueue"
identity := "testIdentity"
signalName := "my signal name"
signalInput := payloads.EncodeString("test input")
signalRequest = &historyservice.SignalWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: tests.NamespaceID.String(),
WorkflowExecution: &we,
Identity: identity,
SignalName: signalName,
Input: signalInput,
},
}

msBuilder := workflow.TestLocalMutableState(s.mockHistoryEngine.shard, s.eventsCache,
tests.LocalNamespaceEntry, log.NewTestLogger(), we.GetRunId())
startRequest := &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: we.WorkflowId,
WorkflowType: &commonpb.WorkflowType{Name: "wType"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskqueue},
Input: payloads.EncodeString("input"),
WorkflowExecutionTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowRunTimeout: timestamp.DurationPtr(50 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(200 * time.Second),
Identity: identity,
}

_, err = msBuilder.AddWorkflowExecutionStartedEvent(
we,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: tests.NamespaceID.String(),
StartRequest: startRequest,
ContinueAsNewInitiator: enumspb.CONTINUE_AS_NEW_INITIATOR_RETRY,
FirstWorkflowTaskBackoff: timestamp.DurationPtr(time.Second * 10),
},
)
s.NoError(err)

ms := workflow.TestCloneToProto(msBuilder)
ms.ExecutionInfo.NamespaceId = tests.NamespaceID.String()
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) {
s.Len(request.UpdateWorkflowEvents[0].Events, 1) // no workflow task scheduled event
// s.Empty(request.UpdateWorkflowMutation.Tasks[tasks.CategoryTransfer]) // no workflow transfer task
return tests.UpdateWorkflowExecutionResponse, nil
})

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

func (s *engineSuite) TestRemoveSignalMutableState() {
removeRequest := &historyservice.RemoveSignalMutableStateRequest{}
err := s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest)
Expand Down

0 comments on commit f7efbad

Please sign in to comment.