Skip to content

Commit

Permalink
Spool tasks with "default" versioning directive along with unversione…
Browse files Browse the repository at this point in the history
…d tasks (#4512)
  • Loading branch information
bergundy committed Jun 16, 2023
1 parent 7f3bb1a commit 20baa97
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 14 deletions.
36 changes: 24 additions & 12 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (e *matchingEngineImpl) AddWorkflowTask(
// - if we spool to db, we'll re-resolve when it comes out of the db
taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, addRequest.VersionDirective, stickyInfo)
if err != nil {
if err == errUserDataDisabled {
if errors.Is(err, errUserDataDisabled) {
// When user data loading is disabled, we intentionally drop tasks for versioned workflows
// to avoid breaking versioning semantics and dispatching tasks to the wrong workers.
err = nil
Expand Down Expand Up @@ -362,11 +362,16 @@ func (e *matchingEngineImpl) AddWorkflowTask(
VersionDirective: addRequest.VersionDirective,
}

baseTqm, err := e.getTaskQueueManager(ctx, origTaskQueue, stickyInfo, true)
if err != nil {
return false, err
}
return tqm.AddTask(ctx, addTaskParams{
execution: addRequest.Execution,
taskInfo: taskInfo,
source: addRequest.GetSource(),
forwardedFrom: addRequest.GetForwardedSource(),
baseTqm: baseTqm,
})
}

Expand All @@ -390,7 +395,7 @@ func (e *matchingEngineImpl) AddActivityTask(
// - if we spool to db, we'll re-resolve when it comes out of the db
taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, addRequest.VersionDirective, stickyInfo)
if err != nil {
if err == errUserDataDisabled {
if errors.Is(err, errUserDataDisabled) {
// When user data loading is disabled, we intentionally drop tasks for versioned workflows
// to avoid breaking versioning semantics and dispatching tasks to the wrong workers.
err = nil
Expand Down Expand Up @@ -422,11 +427,17 @@ func (e *matchingEngineImpl) AddActivityTask(
VersionDirective: addRequest.VersionDirective,
}

baseTqm, err := e.getTaskQueueManager(ctx, origTaskQueue, stickyInfo, true)
if err != nil {
return false, err
}

return tlMgr.AddTask(ctx, addTaskParams{
execution: addRequest.Execution,
taskInfo: taskInfo,
source: addRequest.GetSource(),
forwardedFrom: addRequest.GetForwardedSource(),
baseTqm: baseTqm,
})
}

Expand All @@ -447,7 +458,12 @@ func (e *matchingEngineImpl) DispatchSpooledTask(
taskQueue, userDataChanged, err := e.redirectToVersionedQueueForAdd(
ctx, unversionedOrigTaskQueue, directive, stickyInfo)
if err != nil {
return err
// Return error for tasks with compatiblity constraints when user data is disabled so they can be retried later.
// "default" directive tasks become unversioned.
if !errors.Is(err, errUserDataDisabled) || directive.GetBuildId() != "" {
return err
}
err = nil
}
sticky := stickyInfo.kind == enumspb.TASK_QUEUE_KIND_STICKY
tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, !sticky)
Expand Down Expand Up @@ -683,7 +699,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
// or fail with a relatively short timeout.
taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, queryRequest.VersionDirective, stickyInfo)
if err != nil {
if err == errUserDataDisabled {
if errors.Is(err, errUserDataDisabled) {
// Rewrite to nicer error message
err = serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled")
}
Expand Down Expand Up @@ -1204,7 +1220,7 @@ func (e *matchingEngineImpl) getTask(
stickyInfo,
)
if err != nil {
if err == errUserDataDisabled {
if errors.Is(err, errUserDataDisabled) {
// Rewrite to nicer error message
err = serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled")
}
Expand Down Expand Up @@ -1476,19 +1492,15 @@ func (e *matchingEngineImpl) redirectToVersionedQueueForAdd(
return taskQueue, nil, nil
}

// Have to look up versioning data.
baseTqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, true)
if err != nil {
return nil, nil, err
}

// Have to look up versioning data.
userData, userDataChanged, err := baseTqm.GetUserData(ctx)
if err != nil {
if err == errUserDataDisabled && buildId == "" {
// Special case when user data disabled: we can send new workflows to the unversioned
// queue so they can potentially make progress.
return taskQueue, nil, nil
}
return nil, nil, err
return taskQueue, userDataChanged, err
}
data := userData.GetData().GetVersioningData()

Expand Down
23 changes: 21 additions & 2 deletions service/matching/taskQueueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type (
taskInfo *persistencespb.TaskInfo
source enumsspb.TaskSource
forwardedFrom string
baseTqm taskQueueManager
}

stickyInfo struct {
Expand Down Expand Up @@ -127,6 +128,8 @@ type (
// maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched
// from this task queue to pollers
GetTask(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
// SpoolTask spools a task to persistence to be matched asynchronously when a poller is available.
SpoolTask(params addTaskParams) error
// DispatchSpooledTask dispatches a task to a poller. When there are no pollers to pick
// up the task, this method will return error. Task will not be persisted to db
DispatchSpooledTask(ctx context.Context, task *internalTask, userDataChanged chan struct{}) error
Expand Down Expand Up @@ -401,12 +404,28 @@ func (c *taskQueueManagerImpl) AddTask(
return false, errRemoteSyncMatchFailed
}

_, err = c.taskWriter.appendTask(params.execution, taskInfo)
// Ensure that tasks with the "default" versioning directive get spooled in the unversioned queue as they not
// associated with any version set until their execution is touched by a version specific worker.
// "compatible" tasks OTOH are associated with a specific version set and should be stored along with all tasks for
// that version set.
// The task queue default set is dynamic and applies only at dispatch time. Putting "default" tasks into version set
// specific queues could cause them to get stuck behind "compatible" tasks when they should be able to progress
// independently.
if taskInfo.VersionDirective.GetUseDefault() != nil {
err = params.baseTqm.SpoolTask(params)
} else {
err = c.SpoolTask(params)
}
return false, err
}

func (c *taskQueueManagerImpl) SpoolTask(params addTaskParams) error {
_, err := c.taskWriter.appendTask(params.execution, params.taskInfo)
c.signalIfFatal(err)
if err == nil {
c.taskReader.Signal()
}
return false, err
return err
}

// GetTask blocks waiting for a task.
Expand Down
76 changes: 76 additions & 0 deletions tests/versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,82 @@ func (s *versioningIntegSuite) dispatchNewWorkflowStartWorkerFirst() {
s.Equal("done!", out)
}

func (s *versioningIntegSuite) TestDisableLoadUserDataDefaultTasksBecomeUnversioned() {
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueWritePartitions)

tq := s.randomizeStr(s.T().Name())
v0 := s.prefixed("v0")

// Register a versioned "v0" worker to execute a single workflow task to constrain a workflow on the task queue to a
// compatible set.
ch := make(chan struct{}, 1)
wf1 := func(ctx workflow.Context) (string, error) {
close(ch)
workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil)
return "done!", nil
}

w1 := worker.New(s.sdkClient, tq, worker.Options{
BuildID: v0,
UseBuildIDForVersioning: true,
MaxConcurrentWorkflowTaskPollers: numPollers,
})
w1.RegisterWorkflow(wf1)
s.NoError(w1.Start())
defer w1.Stop()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s.addNewDefaultBuildId(ctx, tq, v0)
s.waitForPropagation(ctx, tq, v0)

// Start the first workflow while the task queue is still considered versioned.
// We want to verify that if a spooled task with a "compatible" versioning directive doesn't block a spooled task
// with a "default" directive.
// This should never happen in practice since we dispatch "default" tasks to the unversioned task queue but the test
// verifies this at a functional level.
run1, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, wf1)
s.NoError(err)

// Wait for first WFT and stop the worker
<-ch
w1.Stop()

// Generate a second workflow task with a "compatible" directive, it should be spooled in the versioned task queue.
s.NoError(s.sdkClient.SignalWorkflow(ctx, run1.GetID(), run1.GetRunID(), "unblock", nil))

wf2 := func(ctx workflow.Context) (string, error) {
return "done!", nil
}
run2, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, wf2)
s.NoError(err)

// Wait a bit and allow tasks to be spooled.
time.Sleep(time.Second * 3)

// Disable user data and unload the task queue.
dc.OverrideValue(dynamicconfig.MatchingLoadUserData, false)
defer dc.RemoveOverride(dynamicconfig.MatchingLoadUserData)
s.unloadTaskQueue(ctx, tq)

// Start an unversioned worker and verify that the second workflow completes.
w2 := worker.New(s.sdkClient, tq, worker.Options{
MaxConcurrentWorkflowTaskPollers: numPollers,
})
w2.RegisterWorkflow(wf2)
s.NoError(w2.Start())
defer w2.Stop()

var out string
s.NoError(run2.Get(ctx, &out))
s.Equal("done!", out)
}

func (s *versioningIntegSuite) TestDispatchUnversionedRemainsUnversioned() {
s.testWithMatchingBehavior(s.dispatchUnversionedRemainsUnversioned)
}
Expand Down

0 comments on commit 20baa97

Please sign in to comment.