feat: add GetTaskQueueUserData RPC to admin service#9934
Conversation
bfb53ca to
ba8f70b
Compare
Exposes per-type task queue user data via the admin service, proxying to the existing matching service RPC. Accepts namespace name + task queue + type + optional partition_id, resolving the partition to its wire-format RPC name for consistent-hash routing to the correct matching host. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ba8f70b to
347c4e1
Compare
|
|
||
| // Fetch the user data currently loaded by the target partition. | ||
| // LastKnownUserDataVersion=0: no cached version, always return current data. | ||
| // LastKnownEphemeralDataVersion=-1: skip ephemeral data; we only need persisted per-type data. |
There was a problem hiding this comment.
as a potential follow up, I wonder if we would ever need a lens to look at the ephemeral user data given that this data is not replicated between partitions.
maybe a question for david/kannan though and is def not a blocker for this PR
There was a problem hiding this comment.
yes, I would like to see the ephemeral data returned through this rpc. that'll be useful for debugging. (and it is replicated between partitions)
| if len(request.Namespace) == 0 { | ||
| return nil, errNamespaceNotSet | ||
| } | ||
|
|
There was a problem hiding this comment.
I think we should have a check here to see if the task queue being passed is of valid length or not, otherwise, we would make a RPC call to matching and waste that trip
There was a problem hiding this comment.
slightly overkill, but to add on we should also probably check for partition ID's being non-negative since matching would return an error in those cases.
#9935) ## What changed? This PR adds a `tdbg taskqueue get-user-data` CLI command to `tdbg` to get the `TaskQueueUserData` for a particular task queue partition. This PR depends on #9934. See #9934 for more context. ## Files changed | File | Change | |---|---| | `tools/tdbg/tdbg_commands.go` | Registered `get-user-data` subcommand under `taskqueue` with `--namespace`, `--task-queue`,`--task-queue-type`, and `--partition-id` flags | | `tools/tdbg/task_queue_commands.go` | Implemented `AdminGetTaskQueueUserData`: reads flags, calls `AdminService.GetTaskQueueUserData`, pretty-prints response | | `tools/tdbg/task_queue_commands_test.go` | Added unit tests | ## How did you test it? - [x] built - [x] run locally and tested manually - [x] added new unit test(s) - [x] added new integration test(s) — not applicable - [x] added new functional test(s) — not applicable (CLI change) ### Unit tests | Test case | Input | Expected | |---|---|---| | Missing namespace | `--task-queue my-queue` only | Error before RPC is called | | Missing task queue | `--namespace default` only | Error before RPC is called | | Invalid task queue type | `--task-queue-type INVALID` | `StringToEnum` returns error before RPC is called | | Unspecified task queue type | `--task-queue-type TASK_QUEUE_TYPE_UNSPECIFIED` | Defaults to `TASK_QUEUE_TYPE_WORKFLOW`, succeeds | | Root partition (default) | Valid flags, no `--partition-id` | Calls RPC with `partition_id=0`, prints response | | Non-root partition | `--partition-id 1` | Calls RPC with `partition_id=1`, prints response | ### Manual tests **Setup** ``` make start-sqlite temporal operator namespace create -n default temporal task-queue versioning insert-assignment-rule --namespace default --task-queue my-queue --build-id "test-build-1" --rule-index 0 --yes temporal task-queue config set --namespace default --task-queue my-queue --task-queue-type activity --queue-rps-limit 50 --queue-rps-limit-reason "manual test" ``` **Root partition, workflow type** ``` $ ./tdbg taskqueue get-user-data --namespace default --task-queue my-queue --task-queue-type TASK_QUEUE_TYPE_WORKFLOW { "version": "2" } ``` `version=2` reflects writes from the assignment rule; `user_data` absent as expected (versioning rules live in `versioning_data`, not `per_type`). **Root partition, activity type** ``` $ ./tdbg taskqueue get-user-data --namespace default --task-queue my-queue --task-queue-type TASK_QUEUE_TYPE_ACTIVITY { "version": "2", "user_data": { "config": { "queueRateLimit": { "rateLimit": { "requestsPerSecond": 50 }, "metadata": { "reason": "manual test", "updateTime": "..." } } } } } ``` `user_data` populated with the rate limit config set via `UpdateTaskQueueConfig`. **Non-root partition, workflow type** ``` $ ./tdbg taskqueue get-user-data --namespace default --task-queue my-queue --task-queue-type TASK_QUEUE_TYPE_WORKFLOW --partition-id 1 { "version": "2" } ``` Same version as root — replication is working. **Non-root partition, activity type** ``` $ ./tdbg taskqueue get-user-data --namespace default --task-queue my-queue --task-queue-type TASK_QUEUE_TYPE_ACTIVITY --partition-id 1 { "userData": { "config": { "queueRateLimit": { "rateLimit": { "requestsPerSecond": 50 }, "metadata": { "reason": "manual test", "updateTime": "2026-04-13T21:40:39.888Z" } } } }, "version": "2" } ``` **Unspecified type defaults to workflow** ``` $ ./tdbg taskqueue get-user-data --namespace default --task-queue my-queue --task-queue-type TASK_QUEUE_TYPE_UNSPECIFIED { "version": "2" } ``` Passing `TASK_QUEUE_TYPE_UNSPECIFIED` defaults to workflow type instead of erroring — output matches the workflow-type test above. --------- Co-authored-by: Veeral Patel <veeralpatel@Veerals-MacBook-Pro.local> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
|
|
||
| // Fetch the user data currently loaded by the target partition. | ||
| // LastKnownUserDataVersion=0: no cached version, always return current data. | ||
| // LastKnownEphemeralDataVersion=-1: skip ephemeral data; we only need persisted per-type data. |
There was a problem hiding this comment.
yes, I would like to see the ephemeral data returned through this rpc. that'll be useful for debugging. (and it is replicated between partitions)
| // partition_id=0 (root) → bare task queue name, e.g. "my-queue". | ||
| // partition_id=N → mangled name, e.g. "/_sys/my-queue/N". | ||
| // The matching client uses this name for consistent-hash routing to the correct host, | ||
| // and the matching engine parses it to find the right in-memory partition manager. | ||
| // namespaceID is passed for correctness even though RpcName() only uses the task queue name. |
There was a problem hiding this comment.
these comments are just repeating information that's already in the tqid library, they're not necessary here
| // User data is a family-level map keyed by TaskQueueType (int32). | ||
| // Extract only the entry for the requested type and return it alongside the version, | ||
| // so callers can compare versions across partitions to check replication lag. | ||
| perType := resp.GetUserData().GetData().GetPerType() | ||
| return &adminservice.GetTaskQueueUserDataResponse{ | ||
| UserData: perType[int32(request.GetTaskQueueType())], | ||
| Version: resp.GetUserData().GetVersion(), | ||
| }, nil |
There was a problem hiding this comment.
I think this rpc should return the full userdata. there's no reason at all to limit things to just the per-type data. the caller can drill down if they want.
| s.ErrorAs(err, ¬FoundErr) | ||
| } | ||
|
|
||
| func (s *adminHandlerSuite) TestGetTaskQueueUserData() { |
There was a problem hiding this comment.
these test cases are pretty low-value and I don't think we need them. we don't need to test trivial validations, especially on admin handler (which is not accessible to users). we also shouldn't test the rpc name mangling, that's other code's responsibility. we can just have one or two cases for a successful and error call to matching.
What changed?
This PR adds a new
GetTaskQueueUserDataRPC to the admin service.Given a namespace, task queue name, task queue type, and optional partition ID (default is 0, which is
root), it returns the user data currently loaded by that partition.This PR wraps the existing
GetTaskQueueUserDataRPC in Matching Service. We will also create atdbgcommand which calls this Admin Service RPC, in a separate PR.Why?
Each task queue family has associated metadata, stored in TaskQueueUserData. Metadata related to worker versioning, queue rate limiting, fairness are all stored in
TaskQueueUserData.TaskQueueUserData is replicated from the root partition to all other partitions.
However, there is no admin-accessible way to read the user data loaded by a specific partition or compare versions across partitions to diagnose replication lag.
Files changed
proto/internal/.../adminservice/v1/request_response.protoGetTaskQueueUserDataRequestandGetTaskQueueUserDataResponsemessagesproto/internal/.../adminservice/v1/service.protoGetTaskQueueUserDataRPC toAdminServiceservice/frontend/admin_handler.goAdminHandler.GetTaskQueueUserData: validates request, resolves namespace → ID, builds partition RPC name viatqid, calls matching service, returns per-type entry + versionservice/frontend/admin_handler_test.goHow did you test it?
Unit tests
100% unit test coverage
request == nilerrRequestNotSetnamespace == ""errNamespaceNotSettask_queuestarts with/_sys/INVALID_ARGUMENTfromtqid.NewTaskQueueFamily; matching never calledpartition_id=0, workflow typemy-queueto matching; returns correctuser_dataandversionpartition_id=1, workflow type/_sys/my-queue/1to matchingper_typemapuser_datais nil;versionstill populatedFunctional tests
Manual tests
Setup
make temporal-server && make start-sqlitetemporal operator namespace create defaulttemporal task-queue versioning insert-assignment-ruleCase 1 — Root partition, workflow type
{ "version": "1" }Case 2 — Root partition, activity type
{ "version": "1" }Case 3 — Non-root partition, workflow type
{ "version": "1" }Case 4 — Non-root partition, activity type
{ "version": "1" }Case 5 — Non-root partition, activity type, with user data
Setup: Add rate limit config
{ "userData": { "config": { "queueRateLimit": { "rateLimit": { "requestsPerSecond": 50 }, "metadata": { "reason": "manual test", "updateTime": "2026-04-13T21:40:39.888Z" } } } }, "version": "2" }Case 6 — Namespace not found
NOT_FOUNDfrom namespace registry; matching never called.