Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GetWorkerTaskReachability API #4346

Conversation

@bergundy bergundy requested review from dnr and Sushisource May 16, 2023 06:27
@bergundy bergundy requested a review from a team as a code owner May 16, 2023 06:27
response, err := wh.getWorkerTaskReachabilityValidated(ctx, ns, request)
if err != nil {
wh.logger.Error("Failed getting worker task reachability", tag.Error(err))
return nil, serviceerror.NewInternal("Internal error")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is desirable and we should avoid leaking error details to the caller.

Query: query.str,
}

// TODO: is count more efficient than select with page size of 1?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think select with limit of one is more efficient, not sure... @rodrigozhou or @alexshtin could verify.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very possible the count is more efficient, esp. if it can be approximate. databases often keep summary stats to aid the query planner and the count might be able to be answered from those without touching any data at all

)

// Little helper to concurrently map a function over input and fail fast on error.
func raceMap[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is idiomatic but I found myself repeating this pattern a few times in this file and ended up refactoring to use this little helper. Maybe we have something similar in the codebase or a shared place we can put this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put little generic stuff like this in common/util/util.go

_, err := mdb.conn.ExecContext(ctx, query, params...)
if err == nil {
// TODO(bergundy)
panic("this should be properly tested once we support deletion")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional, I started implementing it but realized I can't test it properly and decided to leave this panic as a reminder that this has to be tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say just replace "this" with something slightly more specific

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I didn't think it'd matter much because it's a placeholder for when we do implement deletions.

Copy link
Member

@dnr dnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few high-level comments until the new version

cmd/tools/rpcwrappers/main.go Show resolved Hide resolved
)

// Little helper to concurrently map a function over input and fail fast on error.
func raceMap[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put little generic stuff like this in common/util/util.go

Query: query.str,
}

// TODO: is count more efficient than select with page size of 1?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very possible the count is more efficient, esp. if it can be approximate. databases often keep summary stats to aid the query planner and the count might be able to be answered from those without touching any data at all

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM but I'm no DB expert

Comment on lines +160 to 162
// Do not set this to a value higher than 255 for clusters using SQL based persistence due to predefined VARCHAR
// column width.
WorkerBuildIdSizeLimit = "limit.workerBuildIdSize"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good if we just enforced this limit at startup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. but this is dynamic config that technically can change at any time.
I do get the point though, I don't know if we support dynamic config validation.
I can check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

@@ -64,6 +64,25 @@ type (
DataEncoding string
}

AddBuildIdToTaskQueueMapping struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be plural?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... yeah...
My point was that you're adding to the mapping of build id to task queue.
But if it doesn't read right to you, I can change (in a followup PR because I have a couple stacked on top of this).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe AddToBuildIdToTaskQueueMapping? that reads okay to me though a little verbose

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like

BuildIds []string
}

RemoveBuildIdToTaskQueueMapping struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should be plural?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RemoveBuildIdToTaskQueueMapping struct {
RemoveFromBuildIdToTaskQueueMapping struct {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like

_, err := mdb.conn.ExecContext(ctx, query, params...)
if err == nil {
// TODO(bergundy)
panic("this should be properly tested once we support deletion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say just replace "this" with something slightly more specific


// MapConcurrent concurrently maps a function over input and fails fast on error.
func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) {
errorsCh := make(chan error, len(input))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have this be only 1 size by using select both where you put in it and where you take out of it, to avoid writing a bunch of nils on non-error cases. Not exactly a huge deal tho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to block the goroutines when the receiver exits after the first error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I mean by using a select on where you take out as well, you can make that nonblocking too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need to use select where I send the results.

build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "active" mean? Not deleted? Would be good to clarify

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, active is "not deleted", I can clarify.

}
}
if gotUnversionedRequest && len(request.GetTaskQueues()) == 0 {
return nil, serviceerror.NewInvalidArgument("Cannot get reachability of an unversioned worker without specifying at least one task queue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth mentioning here that we interpret empty string as unversioned

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, couldn't hurt

Comment on lines +1986 to +1996
_, err = s.engine.UpdateWorkerBuildIdCompatibility(ctx, &workflowservice.UpdateWorkerBuildIdCompatibilityRequest{
Namespace: s.namespace,
TaskQueue: tq1,
Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleBuildId{
AddNewCompatibleBuildId: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleVersion{
ExistingCompatibleBuildId: v0,
NewBuildId: v01,
},
},
})
s.Require().NoError(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This test reads slightly more obviously if this update happens after the first one so it's both tq1 ops, then tq2, then tq3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Copy link
Member Author

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Sushisource I'll address your comments after I merge my stacked PRs.

Route task queue user data writes via a single matching node per namespace
Merged David's recent changes
Added a reachability tests and search attribute tests
Search attribute is now propagated from source version stamp
Search attribute has versioned|unversioned prefix to give more accurate reachability results for unversioned workers
Merged stacked PR #2
@bergundy bergundy merged commit cd407aa into temporalio:worker-versioning May 25, 2023
9 checks passed
@bergundy bergundy deleted the worker-versioning-build-id-search-attribute branch May 25, 2023 20:42
bergundy added a commit to bergundy/temporal that referenced this pull request May 25, 2023
bergundy added a commit to bergundy/temporal that referenced this pull request May 25, 2023
bergundy added a commit that referenced this pull request May 25, 2023
* Only put messages in the ns replication queue for global namespaces

* Add BuildIdBasedVersioning to list of supported capabilities

* Address Spencer's comments on #4346
@@ -184,6 +184,17 @@ func makeGetMatchingClient(reqType reflect.Type) string {

var tqtPath string
switch t.Name() {
case "GetBuildIdTaskQueueMappingRequest":
// Pick a random node for this request, it's not associated with a specific task queue.
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not being double-formatted, this should work

Suggested change
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}"
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%s\", rand.Int())}"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right

}

if err := iter.Close(); err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetTaskQueuesByBuildId operation failed. Error: %v", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a wrapped error field to NewUnavailable since this comes up a lot. not now, though

@@ -64,6 +64,25 @@ type (
DataEncoding string
}

AddBuildIdToTaskQueueMapping struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe AddToBuildIdToTaskQueueMapping? that reads okay to me though a little verbose

BuildIds []string
}

RemoveBuildIdToTaskQueueMapping struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RemoveBuildIdToTaskQueueMapping struct {
RemoveFromBuildIdToTaskQueueMapping struct {

BuildID string
}

CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a separate type, not an alias

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will, just curious why

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise you could pass a value of one type as the other? I mean, yeah, it's not like it's a big problem, they are the same in some sense, it's just weird to be able to pass a *CountTaskQueuesByBuildIdRequest to GetTaskQueuesByBuildId

statusFilter := ""
switch reachabilityType {
case enumspb.TASK_REACHABILITY_OPEN_WORKFLOWS:
statusFilter = " AND ExecutionStatus = \"Running\""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
statusFilter = " AND ExecutionStatus = \"Running\""
statusFilter = ` AND ExecutionStatus = "Running"`

req := manager.CountWorkflowExecutionsRequest{
NamespaceID: ns.ID(),
Namespace: ns.Name(),
Query: fmt.Sprintf("TaskQueue = %q AND %s%s", taskQueue, buildIdsFilter, statusFilter),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use constant for search attribute name

@@ -115,6 +122,10 @@ type (
timeSource clock.TimeSource
// Only set if global namespaces are enabled on the cluster.
namespaceReplicationQueue persistence.NamespaceReplicationQueue
// Disables concurrent task queue user data updates and replication requests (due to a cassandra limitation)
namespaceUpdateLockMap map[string]*namespaceUpdateLocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using common/locks.IDMutex here. it takes some of the boilerplate out of this pattern. you can use separate keys for the update + replication locks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine TBH

// Force unloading a task queue. Used for testing only.
rpc ForceUnloadTaskQueue (ForceUnloadTaskQueueRequest) returns (ForceUnloadTaskQueueResponse) {}

// Update task queue user data in owning node for all updates in namespace.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds too tempting to use when you want to update user data. can we make the comment very explicit that these two functions should only be called by the matching implementation itself and that external callers like frontend should use UpdateWorkerBuildIdCompatibility (or some other function for other fields)?

it might even be good if the name starts with Internal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can change to Internal

@@ -488,29 +489,25 @@ func (c *taskQueueManagerImpl) GetUserData(ctx context.Context) (*persistencespb
}

//nolint:revive // control coupling
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this now that lint won't complain?

bergundy added a commit that referenced this pull request May 26, 2023
* Address David's comments

* go generate
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit to dnr/temporal that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit to dnr/temporal that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
dnr pushed a commit that referenced this pull request May 26, 2023
Note: This commit came from a feature branch and is not expected to build.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants