Skip to content

Commit

Permalink
Handle worker versioning for sticky queues (#4397)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed May 26, 2023
1 parent ea6246e commit cd4ed59
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 256 deletions.
1 change: 1 addition & 0 deletions service/frontend/errors.go
Expand Up @@ -81,6 +81,7 @@ var (
errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.")
errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.")
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errUseVersioningWithoutNormalName = serviceerror.NewInvalidArgument("NormalName must be set on sticky queue if UseVersioning is true.")
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")

errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
Expand Down
61 changes: 31 additions & 30 deletions service/frontend/workflow_handler.go
Expand Up @@ -839,12 +839,8 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w
return nil, errIdentityTooLong
}

if request.GetWorkerVersionCapabilities().GetUseVersioning() && !wh.config.EnableWorkerVersioningWorkflow(request.Namespace) {
return nil, errWorkerVersioningNotAllowed
}

if len(request.GetWorkerVersionCapabilities().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() {
return nil, errBuildIdTooLong
if err := wh.validateVersioningInfo(request.Namespace, request.WorkerVersionCapabilities, request.TaskQueue); err != nil {
return nil, err
}

if err := wh.validateTaskQueue(request.TaskQueue); err != nil {
Expand All @@ -857,14 +853,6 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w
}
namespaceID := namespaceEntry.ID()

// Copy WorkerVersionCapabilities.BuildId to BinaryChecksum if BinaryChecksum is missing (small
// optimization to save space in the poll request).
if request.WorkerVersionCapabilities != nil {
if len(request.WorkerVersionCapabilities.BuildId) > 0 && len(request.BinaryChecksum) == 0 {
request.BinaryChecksum = request.WorkerVersionCapabilities.BuildId
}
}

wh.logger.Debug("Poll workflow task queue.", tag.WorkflowNamespace(namespaceEntry.Name().String()), tag.WorkflowNamespaceID(namespaceID.String()))
if err := wh.checkBadBinary(namespaceEntry, request.GetBinaryChecksum()); err != nil {
return nil, err
Expand Down Expand Up @@ -941,12 +929,12 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
return nil, errIdentityTooLong
}

if request.GetWorkerVersionStamp().GetUseVersioning() && !wh.config.EnableWorkerVersioningWorkflow(request.Namespace) {
return nil, errWorkerVersioningNotAllowed
}

if len(request.GetWorkerVersionStamp().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() {
return nil, errBuildIdTooLong
if err := wh.validateVersioningInfo(
request.Namespace,
request.WorkerVersionStamp,
request.StickyAttributes.GetWorkerTaskQueue(),
); err != nil {
return nil, err
}

taskToken, err := wh.tokenSerializer.Deserialize(request.TaskToken)
Expand All @@ -955,10 +943,6 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
}
namespaceId := namespace.ID(taskToken.GetNamespaceId())

if request.WorkerVersionStamp.GetUseVersioning() && len(request.WorkerVersionStamp.GetBuildId()) == 0 {
return nil, errUseVersioningWithoutBuildId
}

wh.overrides.DisableEagerActivityDispatchForBuggyClients(ctx, request)

histResp, err := wh.historyClient.RespondWorkflowTaskCompleted(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
Expand Down Expand Up @@ -1101,12 +1085,8 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w
return nil, errIdentityTooLong
}

if request.GetWorkerVersionCapabilities().GetUseVersioning() && !wh.config.EnableWorkerVersioningWorkflow(request.Namespace) {
return nil, errWorkerVersioningNotAllowed
}

if len(request.GetWorkerVersionCapabilities().GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() {
return nil, errBuildIdTooLong
if err := wh.validateVersioningInfo(request.Namespace, request.WorkerVersionCapabilities, request.TaskQueue); err != nil {
return nil, err
}

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
Expand Down Expand Up @@ -4286,6 +4266,27 @@ func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue) error {
return nil
}

type buildIdAndFlag interface {
GetBuildId() string
GetUseVersioning() bool
}

func (wh *WorkflowHandler) validateVersioningInfo(namespace string, id buildIdAndFlag, tq *taskqueuepb.TaskQueue) error {
if id.GetUseVersioning() && !wh.config.EnableWorkerVersioningWorkflow(namespace) {
return errWorkerVersioningNotAllowed
}
if id.GetUseVersioning() && tq.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY && len(tq.GetNormalName()) == 0 {
return errUseVersioningWithoutNormalName
}
if id.GetUseVersioning() && len(id.GetBuildId()) == 0 {
return errUseVersioningWithoutBuildId
}
if len(id.GetBuildId()) > wh.config.WorkerBuildIdSizeLimit() {
return errBuildIdTooLong
}
return nil
}

//nolint:revive // cyclomatic complexity
func (wh *WorkflowHandler) validateBuildIdCompatibilityUpdate(
req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest,
Expand Down
5 changes: 3 additions & 2 deletions service/history/api/get_workflow_util.go
Expand Up @@ -205,8 +205,9 @@ func MutableStateToGetResponse(
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
StickyTaskQueue: &taskqueuepb.TaskQueue{
Name: executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
Name: executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
NormalName: executionInfo.TaskQueue,
},
StickyTaskQueueScheduleToStartTimeout: executionInfo.StickyScheduleToStartTimeout,
CurrentBranchToken: currentBranchToken,
Expand Down
9 changes: 5 additions & 4 deletions service/history/transferQueueActiveTaskExecutor.go
Expand Up @@ -237,6 +237,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
}

executionInfo := mutableState.GetExecutionInfo()
originalTaskQueue := executionInfo.TaskQueue

// NOTE: previously this section check whether mutable state has enabled
// sticky workflowTask, if so convert the workflowTask to a sticky workflowTask.
Expand All @@ -245,12 +246,13 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
// task or not.
var taskQueue *taskqueuepb.TaskQueue
taskScheduleToStartTimeoutSeconds := int64(0)
if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue {
if originalTaskQueue != task.TaskQueue {
// this workflowTask is an sticky workflowTask
// there shall already be an timer set
taskQueue = &taskqueuepb.TaskQueue{
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
NormalName: originalTaskQueue,
}
taskScheduleToStartTimeoutSeconds = int64(timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout).Seconds())
} else {
Expand All @@ -262,7 +264,6 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
taskScheduleToStartTimeoutSeconds = int64(workflowRunTimeout.Round(time.Second).Seconds())
}

originalTaskQueue := mutableState.GetExecutionInfo().TaskQueue
directive := common.MakeVersionDirectiveForWorkflowTask(
mutableState.GetWorkerVersionStamp(),
mutableState.GetLastWorkflowTaskStartedEventID(),
Expand Down
3 changes: 2 additions & 1 deletion service/history/transferQueueActiveTaskExecutor_test.go
Expand Up @@ -2642,8 +2642,9 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest(
}
executionInfo := mutableState.GetExecutionInfo()
timeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout)
if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue {
if executionInfo.TaskQueue != task.TaskQueue {
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
taskQueue.NormalName = executionInfo.TaskQueue
timeout = timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout)
}

Expand Down
5 changes: 3 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -639,8 +639,9 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
func (ms *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
if ms.IsStickyTaskQueueEnabled() {
return &taskqueuepb.TaskQueue{
Name: ms.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
Name: ms.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
NormalName: ms.executionInfo.TaskQueue,
}
}
return &taskqueuepb.TaskQueue{
Expand Down
10 changes: 8 additions & 2 deletions service/matching/db.go
Expand Up @@ -297,6 +297,12 @@ func (db *taskQueueDB) CompleteTasksLessThan(
return n, err
}

// Returns true if we are storing user data in the db. We need to be the root partition,
// workflow type, unversioned, and also a normal queue.
func (db *taskQueueDB) DbStoresUserData() bool {
return db.taskQueue.OwnsUserData() && db.taskQueueKind == enumspb.TASK_QUEUE_KIND_NORMAL
}

// GetUserData returns the versioning data for this task queue. Do not mutate the returned pointer, as doing so
// will cause cache inconsistency.
func (db *taskQueueDB) GetUserData(
Expand All @@ -320,7 +326,7 @@ func (db *taskQueueDB) getUserDataLocked(
ctx context.Context,
) (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) {
if db.userData == nil {
if !db.taskQueue.OwnsUserData() {
if !db.DbStoresUserData() {
return nil, db.userDataChanged, nil
}

Expand Down Expand Up @@ -350,7 +356,7 @@ func (db *taskQueueDB) getUserDataLocked(
//
// On success returns a pointer to the updated data, which must *not* be mutated.
func (db *taskQueueDB) UpdateUserData(ctx context.Context, updateFn func(*persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, error), taskQueueLimitPerBuildId int) (*persistencespb.VersionedTaskQueueUserData, error) {
if !db.taskQueue.OwnsUserData() {
if !db.DbStoresUserData() {
return nil, errUserDataNoMutateNonRoot
}
db.Lock()
Expand Down

0 comments on commit cd4ed59

Please sign in to comment.