-
Notifications
You must be signed in to change notification settings - Fork 780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spool tasks with "default" versioning directive along with unversioned tasks #4512
Spool tasks with "default" versioning directive along with unversioned tasks #4512
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some suggestions but feel free to merge when you want and I can fix them later
@@ -323,9 +323,9 @@ func (e *matchingEngineImpl) AddWorkflowTask( | |||
// We don't need the userDataChanged channel here because: | |||
// - if we sync match or sticky worker unavailable, we're done | |||
// - if we spool to db, we'll re-resolve when it comes out of the db | |||
taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, addRequest.VersionDirective, stickyInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of returning it, I'd rather have callers get the unversioned tqm and pass it into redirectToVersionedQueueForAdd. actually I already made that change in a pending PR (not ready for review yet)
also, I think calling it baseTqm
is a little nicer
service/matching/matchingEngine.go
Outdated
ctx, unversionedOrigTaskQueue, directive, stickyInfo) | ||
if err != nil { | ||
return err | ||
if errors.Is(err, errUserDataDisabled) && directive.GetBuildId() == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
directive.GetBuildId() == ""
reads weird to me.. it's a tri-state and that condition will match two of the three types (UseDefault and unversioned). I guess that is what you want? maybe comment it though
service/matching/matchingEngine.go
Outdated
ctx, unversionedOrigTaskQueue, directive, stickyInfo) | ||
if err != nil { | ||
return err | ||
if errors.Is(err, errUserDataDisabled) && directive.GetBuildId() == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could put this if errors.Is(...) && ... { err = nil }
above the if err != nil
also
service/matching/matchingEngine.go
Outdated
ctx, unversionedOrigTaskQueue, directive, stickyInfo) | ||
if err != nil { | ||
return err | ||
if errors.Is(err, errUserDataDisabled) && directive.GetBuildId() == "" { | ||
// Only fail tasks with compatiblity constraints when user data is disabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you reword this? "failing a task" is something kind of specific, that's not what we're doing here. we're just punting it back to the taskReader loop to be retried a second later
service/matching/taskQueueManager.go
Outdated
@@ -127,6 +129,8 @@ type ( | |||
// maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched | |||
// from this task queue to pollers | |||
GetTask(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error) | |||
// SpoolTask spools a task to persistence to be matched asynchronously when a poller is availalble. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// SpoolTask spools a task to persistence to be matched asynchronously when a poller is availalble. | |
// SpoolTask spools a task to persistence to be matched asynchronously when a poller is available. |
service/matching/taskQueueManager.go
Outdated
// The task queue default set is dynamic and applies only at dispatch time. Putting "default" tasks into version set | ||
// specific queues could cause them to get stuck behind "compatible" tasks when they should be able to progress | ||
// independently. | ||
if _, ok := taskInfo.VersionDirective.GetValue().(*taskqueuespb.TaskVersionDirective_UseDefault); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if _, ok := taskInfo.VersionDirective.GetValue().(*taskqueuespb.TaskVersionDirective_UseDefault); ok { | |
if taskInfo.VersionDirective.GetUseDefault() != nil { |
service/matching/taskQueueManager.go
Outdated
// specific queues could cause them to get stuck behind "compatible" tasks when they should be able to progress | ||
// independently. | ||
if _, ok := taskInfo.VersionDirective.GetValue().(*taskqueuespb.TaskVersionDirective_UseDefault); ok { | ||
err = params.unversionedTqm.SpoolTask(params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think this might be cleaner if we moved the fallback into matchingEngine? I guess there are a bunch of call sites so you'd have to factor it out...
it's okay for now, I can clean it up later (I'm already doing some related cleanups in upcoming PRs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it would be cleaner but I'd rather avoid a big last minute change.
@@ -397,6 +397,82 @@ func (s *versioningIntegSuite) dispatchNewWorkflowStartWorkerFirst() { | |||
s.Equal("done!", out) | |||
} | |||
|
|||
func (s *versioningIntegSuite) TestDisableLoadUserDataDefaultTasksBecomeUnversioned() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to have a test for the situation this addresses even with user data (set v1 as default, spool task 1 with build id on v1, spool "use default" task 2, set v2 as default, start workers just for v2 but not v1, test that task 2 gets dispatched to v2). I can work on it after I get back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, would you mind adding later?
|
||
// Wait for first WFT and stop the worker | ||
<-ch | ||
w1.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a tiny sleep here to avoid flakiness? <-ch
only hits that the workflow code hit that line, not that the worker completed the respondwftcompleted rpc
45acd78
to
06e87d0
Compare
Ensure that tasks with the "default" versioning directive get spooled in the unversioned queue as they not
associated with any version set until their execution is touched by a version specific worker.
"compatible" tasks OTOH are associated with a specific version set and should be stored along with all tasks for
that version set.
The task queue default set is dynamic and applies only at dispatch time. Putting "default" tasks into version set
specific queues could cause them to get stuck behind "compatible" tasks when they should be able to progress
independently.