Skip to content

Commit

Permalink
Fix: use normal task queue if sticky worker unavailable during update…
Browse files Browse the repository at this point in the history
… workflow API call (#4353)
  • Loading branch information
alexshtin committed May 18, 2023
1 parent ab2f6b9 commit 007606d
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 1 deletion.
17 changes: 16 additions & 1 deletion service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/internal/effect"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
Expand Down Expand Up @@ -92,6 +93,7 @@ func Invoke(
var (
upd *update.Update
taskQueue taskqueuepb.TaskQueue
normalTaskQueueName string
scheduledEventID int64
scheduleToStartTimeout time.Duration
)
Expand Down Expand Up @@ -160,6 +162,7 @@ func Invoke(
Name: newWorkflowTask.TaskQueue.Name,
Kind: newWorkflowTask.TaskQueue.Kind,
}
normalTaskQueueName = ms.GetExecutionInfo().TaskQueue
}
return nil
}
Expand All @@ -168,9 +171,21 @@ func Invoke(
return nil, err
}

// WT was created.
// WT was created and needs to be added directly to matching w/o transfer task.
// TODO (alex): This code is copied from transferQueueActiveTaskExecutor.processWorkflowTask.
// Helper function needs to be extracted to avoid code duplication.
if scheduledEventID != common.EmptyEventID {
err = addWorkflowTaskToMatching(ctx, wfKey, &taskQueue, scheduledEventID, &scheduleToStartTimeout, namespace.ID(req.GetNamespaceId()), shardCtx, matchingClient)

if _, isStickyWorkerUnavailable := err.(*serviceerrors.StickyWorkerUnavailable); isStickyWorkerUnavailable {
// If sticky worker is unavailable, switch to original normal task queue.
taskQueue = taskqueuepb.TaskQueue{
Name: normalTaskQueueName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
err = addWorkflowTaskToMatching(ctx, wfKey, &taskQueue, scheduledEventID, &scheduleToStartTimeout, namespace.ID(req.GetNamespaceId()), shardCtx, matchingClient)
}

if err != nil {
return nil, err
}
Expand Down
202 changes: 202 additions & 0 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,208 @@ func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_AcceptComplete_Sti
14 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_AcceptComplete_StickyWorkerUnavailable() {
id := "integration-update-workflow-test-swu"
wt := "integration-update-workflow-test-swu-type"
tq := "integration-update-workflow-test-swu-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
stickyQueue := &taskqueuepb.TaskQueue{Name: tq + "-sticky"}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with update unrelated command.
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
// Speculative WT, with update.Request message.
// Worker gets full history because update was issued after sticky worker is gone.
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.EqualHistory(`
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
11 WorkflowTaskScheduled
12 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody())

s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs())
s.Equal("update_handler", updRequest.GetInput().GetName())
s.EqualValues(6, updRequestMsg.GetEventId())

return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Acceptance{
AcceptedRequestMessageId: updRequestMsg.GetId(),
AcceptedRequestSequencingEventId: updRequestMsg.GetEventId(),
AcceptedRequest: updRequest,
}),
},
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Response{
Meta: updRequest.GetMeta(),
Outcome: &updatepb.Outcome{
Value: &updatepb.Outcome_Success{
Success: payloads.EncodeString("update success"),
},
},
}),
},
}, nil
case 3:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
StickyTaskQueue: stickyQueue,
StickyScheduleToStartTimeout: 3 * time.Second * debug.TimeoutMultiplier,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// poll from regular task queue, but respond with sticky enabled response to enable stick task queue.
_, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetry(false, false, false, true, 1, 5)
s.NoError(err)

s.Logger.Info("Sleep 10 seconds to make sure stickyPollerUnavailableWindow time has passed.")
time.Sleep(10 * time.Second)
s.Logger.Info("Sleep 10 seconds is done.")

// Now send an update. It should try sticky task queue first, but got "StickyWorkerUnavailable" error
// and resend it to normal.
// This can be observed in wtHandler: if history is partial => sticky task queue is used.

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
assert.NoError(s.T(), err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server.

// Process update in workflow task from non-sticky task queue.
_, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, false, false, 1, 5, true, nil)
s.NoError(err)
updateResult := <-updateResultCh
s.NoError(updateResult.Err)
s.EqualValues(payloads.EncodeString("update success"), updateResult.Response.GetOutcome().GetSuccess())
s.EqualValues(0, updateResp.ResetHistoryEventId)

// Complete workflow.
completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true)
s.NoError(err)
s.NotNil(completeWorkflowResp)
s.Nil(completeWorkflowResp.GetWorkflowTask())
s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId)

s.Equal(3, wtHandlerCalls)
s.Equal(3, msgHandlerCalls)

events := s.getHistory(s.namespace, we)

s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
11 WorkflowTaskScheduled
12 WorkflowTaskStarted
13 WorkflowTaskCompleted
14 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_Reject() {
id := "integration-update-workflow-test-3"
wt := "integration-update-workflow-test-3-type"
Expand Down

0 comments on commit 007606d

Please sign in to comment.