From 20baa97cf378835532c6aac18b3dabb3a528cffc Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Fri, 16 Jun 2023 16:53:54 -0700 Subject: [PATCH] Spool tasks with "default" versioning directive along with unversioned tasks (#4512) --- service/matching/matchingEngine.go | 36 ++++++++----- service/matching/taskQueueManager.go | 23 ++++++++- tests/versioning_test.go | 76 ++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 14 deletions(-) diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 0dd80edd697..ca881b502cf 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -325,7 +325,7 @@ func (e *matchingEngineImpl) AddWorkflowTask( // - if we spool to db, we'll re-resolve when it comes out of the db taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, addRequest.VersionDirective, stickyInfo) if err != nil { - if err == errUserDataDisabled { + if errors.Is(err, errUserDataDisabled) { // When user data loading is disabled, we intentionally drop tasks for versioned workflows // to avoid breaking versioning semantics and dispatching tasks to the wrong workers. err = nil @@ -362,11 +362,16 @@ func (e *matchingEngineImpl) AddWorkflowTask( VersionDirective: addRequest.VersionDirective, } + baseTqm, err := e.getTaskQueueManager(ctx, origTaskQueue, stickyInfo, true) + if err != nil { + return false, err + } return tqm.AddTask(ctx, addTaskParams{ execution: addRequest.Execution, taskInfo: taskInfo, source: addRequest.GetSource(), forwardedFrom: addRequest.GetForwardedSource(), + baseTqm: baseTqm, }) } @@ -390,7 +395,7 @@ func (e *matchingEngineImpl) AddActivityTask( // - if we spool to db, we'll re-resolve when it comes out of the db taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, addRequest.VersionDirective, stickyInfo) if err != nil { - if err == errUserDataDisabled { + if errors.Is(err, errUserDataDisabled) { // When user data loading is disabled, we intentionally drop tasks for versioned workflows // to avoid breaking versioning semantics and dispatching tasks to the wrong workers. err = nil @@ -422,11 +427,17 @@ func (e *matchingEngineImpl) AddActivityTask( VersionDirective: addRequest.VersionDirective, } + baseTqm, err := e.getTaskQueueManager(ctx, origTaskQueue, stickyInfo, true) + if err != nil { + return false, err + } + return tlMgr.AddTask(ctx, addTaskParams{ execution: addRequest.Execution, taskInfo: taskInfo, source: addRequest.GetSource(), forwardedFrom: addRequest.GetForwardedSource(), + baseTqm: baseTqm, }) } @@ -447,7 +458,12 @@ func (e *matchingEngineImpl) DispatchSpooledTask( taskQueue, userDataChanged, err := e.redirectToVersionedQueueForAdd( ctx, unversionedOrigTaskQueue, directive, stickyInfo) if err != nil { - return err + // Return error for tasks with compatiblity constraints when user data is disabled so they can be retried later. + // "default" directive tasks become unversioned. + if !errors.Is(err, errUserDataDisabled) || directive.GetBuildId() != "" { + return err + } + err = nil } sticky := stickyInfo.kind == enumspb.TASK_QUEUE_KIND_STICKY tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, !sticky) @@ -683,7 +699,7 @@ func (e *matchingEngineImpl) QueryWorkflow( // or fail with a relatively short timeout. taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, queryRequest.VersionDirective, stickyInfo) if err != nil { - if err == errUserDataDisabled { + if errors.Is(err, errUserDataDisabled) { // Rewrite to nicer error message err = serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled") } @@ -1204,7 +1220,7 @@ func (e *matchingEngineImpl) getTask( stickyInfo, ) if err != nil { - if err == errUserDataDisabled { + if errors.Is(err, errUserDataDisabled) { // Rewrite to nicer error message err = serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled") } @@ -1476,19 +1492,15 @@ func (e *matchingEngineImpl) redirectToVersionedQueueForAdd( return taskQueue, nil, nil } - // Have to look up versioning data. baseTqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, true) if err != nil { return nil, nil, err } + + // Have to look up versioning data. userData, userDataChanged, err := baseTqm.GetUserData(ctx) if err != nil { - if err == errUserDataDisabled && buildId == "" { - // Special case when user data disabled: we can send new workflows to the unversioned - // queue so they can potentially make progress. - return taskQueue, nil, nil - } - return nil, nil, err + return taskQueue, userDataChanged, err } data := userData.GetData().GetVersioningData() diff --git a/service/matching/taskQueueManager.go b/service/matching/taskQueueManager.go index e99f2f6903d..4e768c9827f 100644 --- a/service/matching/taskQueueManager.go +++ b/service/matching/taskQueueManager.go @@ -97,6 +97,7 @@ type ( taskInfo *persistencespb.TaskInfo source enumsspb.TaskSource forwardedFrom string + baseTqm taskQueueManager } stickyInfo struct { @@ -127,6 +128,8 @@ type ( // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched // from this task queue to pollers GetTask(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error) + // SpoolTask spools a task to persistence to be matched asynchronously when a poller is available. + SpoolTask(params addTaskParams) error // DispatchSpooledTask dispatches a task to a poller. When there are no pollers to pick // up the task, this method will return error. Task will not be persisted to db DispatchSpooledTask(ctx context.Context, task *internalTask, userDataChanged chan struct{}) error @@ -401,12 +404,28 @@ func (c *taskQueueManagerImpl) AddTask( return false, errRemoteSyncMatchFailed } - _, err = c.taskWriter.appendTask(params.execution, taskInfo) + // Ensure that tasks with the "default" versioning directive get spooled in the unversioned queue as they not + // associated with any version set until their execution is touched by a version specific worker. + // "compatible" tasks OTOH are associated with a specific version set and should be stored along with all tasks for + // that version set. + // The task queue default set is dynamic and applies only at dispatch time. Putting "default" tasks into version set + // specific queues could cause them to get stuck behind "compatible" tasks when they should be able to progress + // independently. + if taskInfo.VersionDirective.GetUseDefault() != nil { + err = params.baseTqm.SpoolTask(params) + } else { + err = c.SpoolTask(params) + } + return false, err +} + +func (c *taskQueueManagerImpl) SpoolTask(params addTaskParams) error { + _, err := c.taskWriter.appendTask(params.execution, params.taskInfo) c.signalIfFatal(err) if err == nil { c.taskReader.Signal() } - return false, err + return err } // GetTask blocks waiting for a task. diff --git a/tests/versioning_test.go b/tests/versioning_test.go index aa269a082b7..d0dfecfd4b0 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -397,6 +397,82 @@ func (s *versioningIntegSuite) dispatchNewWorkflowStartWorkerFirst() { s.Equal("done!", out) } +func (s *versioningIntegSuite) TestDisableLoadUserDataDefaultTasksBecomeUnversioned() { + dc := s.testCluster.host.dcClient + dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1) + defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions) + dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1) + defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueWritePartitions) + + tq := s.randomizeStr(s.T().Name()) + v0 := s.prefixed("v0") + + // Register a versioned "v0" worker to execute a single workflow task to constrain a workflow on the task queue to a + // compatible set. + ch := make(chan struct{}, 1) + wf1 := func(ctx workflow.Context) (string, error) { + close(ch) + workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) + return "done!", nil + } + + w1 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: v0, + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1.RegisterWorkflow(wf1) + s.NoError(w1.Start()) + defer w1.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.addNewDefaultBuildId(ctx, tq, v0) + s.waitForPropagation(ctx, tq, v0) + + // Start the first workflow while the task queue is still considered versioned. + // We want to verify that if a spooled task with a "compatible" versioning directive doesn't block a spooled task + // with a "default" directive. + // This should never happen in practice since we dispatch "default" tasks to the unversioned task queue but the test + // verifies this at a functional level. + run1, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, wf1) + s.NoError(err) + + // Wait for first WFT and stop the worker + <-ch + w1.Stop() + + // Generate a second workflow task with a "compatible" directive, it should be spooled in the versioned task queue. + s.NoError(s.sdkClient.SignalWorkflow(ctx, run1.GetID(), run1.GetRunID(), "unblock", nil)) + + wf2 := func(ctx workflow.Context) (string, error) { + return "done!", nil + } + run2, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, wf2) + s.NoError(err) + + // Wait a bit and allow tasks to be spooled. + time.Sleep(time.Second * 3) + + // Disable user data and unload the task queue. + dc.OverrideValue(dynamicconfig.MatchingLoadUserData, false) + defer dc.RemoveOverride(dynamicconfig.MatchingLoadUserData) + s.unloadTaskQueue(ctx, tq) + + // Start an unversioned worker and verify that the second workflow completes. + w2 := worker.New(s.sdkClient, tq, worker.Options{ + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w2.RegisterWorkflow(wf2) + s.NoError(w2.Start()) + defer w2.Stop() + + var out string + s.NoError(run2.Get(ctx, &out)) + s.Equal("done!", out) +} + func (s *versioningIntegSuite) TestDispatchUnversionedRemainsUnversioned() { s.testWithMatchingBehavior(s.dispatchUnversionedRemainsUnversioned) }