New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor tqname package #5329
Refactor tqname package #5329
Conversation
common/taskqueue/task_queue.go
Outdated
type ( | ||
|
||
// TaskQueue can be NormalTaskQueue or StickyTaskQueue. | ||
TaskQueue interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love if we could avoid repetitive names like taskqueue.TaskQueue
, what about names like taskqueue.Interface
with implementations taskqueue.Normal
and taskqueue.Sticky
? You can see discussion of this on the official go blog:
Avoid repetition. Since client code uses the package name as a prefix when referring to the package contents, the names for those contents need not repeat the package name. The HTTP server provided by the http package is called Server, not HTTPServer. Client code refers to this type as http.Server, so there is no ambiguity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great feedback @tdeebswihart. I'm going to rename the types as you suggested, except I think I like to keep taskqueue.TaskQueue
as is, renaming to taskqueue.Interface
feels a little awkward to me. pkg.Pkg
is mentioned in the link you referred and not discouraged. Many packages use that naming, e.g. time.Time
.
There's a bunch of unrelated stuff on this pr so it'll take a while, or should be split. First just high level comments:
The name was chosen deliberately since the package does only name mangling. It should stay that way, and possibly additional functionality should be in another package (I'll have to see the details when I get into the code). Also, we shouldn't name a package
This at least should be in a separate PR. |
a1c1c27
to
13b3f3e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a partial review, everything outside of service/matching
client/matching/metric_client.go
Outdated
if err != nil { | ||
c.logger.Info("invalid tq name "+taskQueue.String(), tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a tag for the name instead of string concatenation
cmd/tools/rpcwrappers/main.go
Outdated
return fmt.Sprintf("client, err := c.getClientForTaskqueue(%s, %s, %s)", nsID.path, tq.path, tqt.path) | ||
|
||
return fmt.Sprintf( | ||
`tqPrtn, err := tqid.FromProto(%s, %s, %s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tq
is an okay abbreviation but maybe spell out Partition
? or I don't even mind p
here actually, just "Prtn" looks weird
common/tqid/task_queue_id.go
Outdated
NonRootPartitionPrefix = "/_sys/" | ||
PartitionDelimiter = "/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep these private? That's the whole point of consolidating the manipulation in one package, and it took quite a bit of effort. I don't like seeing that reverting.
common/tqid/task_queue_id.go
Outdated
) | ||
|
||
type ( | ||
// TaskQueue represents the high-level task queue that user create by explicitly providing a TaskQueue name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TaskQueue represents the high-level task queue that user create by explicitly providing a TaskQueue name | |
// TaskQueue represents the high-level task queue that user creates by explicitly providing a TaskQueue name |
} | ||
name, err := tqname.Parse(tq.Name) | ||
prtn, err := tqid.FromProto(proto, "", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here with an enum.. I don't like these unused fields though :(
} | ||
name, err := tqname.Parse(tq.Name) | ||
prtn, err := tqid.FromProto(proto, "", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spell out partition
tests/versioning.go
Outdated
@@ -66,7 +66,7 @@ type VersioningIntegSuite struct { | |||
|
|||
const ( | |||
partitionTreeDegree = 3 | |||
longPollTime = 5 * time.Second | |||
longPollTime = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this make the test take longer? it's already pretty long to run this whole suite locally and I wouldn't want it to get any longer
tests/versioning.go
Outdated
s.NoError(err) | ||
partName = partName.WithPartition(pt.part) | ||
partition := taskQueue.NormalPartition(0, pt.part) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't you pass pt.tp in here? if the partition now holds the type, it should hold the correct type
tests/versioning.go
Outdated
// Use lower-level GetTaskQueueUserData instead of GetWorkerBuildIdCompatibility | ||
// here so that we can target activity queues. | ||
res, err := s.testCluster.host.matchingClient.GetTaskQueueUserData( | ||
ctx, | ||
&matchingservice.GetTaskQueueUserDataRequest{ | ||
NamespaceId: nsId, | ||
TaskQueue: partName.FullName(), | ||
TaskQueue: partition.RpcName(), | ||
TaskQueueType: pt.tp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and then you can use partition.TaskType()
here? (except please rename that to TaskQueueType)
common/tqid/task_queue_id.go
Outdated
|
||
// FromFamilyName takes a user-provided task queue name (aka family name) and returns a TaskQueueFamily. Returns an | ||
// error if name looks like a mangled name. | ||
func FromFamilyName(namespaceId string, name string) (*TaskQueueFamily, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should incorporate TaskQueueFamily
into function name. Unless namespace will be named TaskQueueFamily
. When I read code tqid.FromFamilyName
there is no way I can understand that we create a new TaskQueueFamily
.
common/tqid/task_queue_id.go
Outdated
return &TaskQueueFamily{namespace.ID(namespaceId), name} | ||
} | ||
|
||
func FromProto(proto *taskqueuepb.TaskQueue, namespaceId string, taskType enumspb.TaskQueueType) (Partition, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with TaskQueueFamily. We should incorporate Partition
into function name. It sould be PartitionFromProto
or something like this.
common/tqid/task_queue_id.go
Outdated
return n.name | ||
} | ||
|
||
func (n *TaskQueueFamily) NamespaceID() namespace.ID { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have agreement on proper case for id
? Id is a short of Identification. It is not abbreviation. So I vote for Id
(capital I
lower case d
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'm also not sure what is the convention here. It seems both styles are used in different places of the code. I also like NamespaceId
better. So changing it according to your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alex agreed that ID
was a mistake. We are shifting to Id
where possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got through a big chunk of the rest, but not quite all of it. sending comments-so-far
client/matching/client.go
Outdated
return nil, err | ||
// We preserve the old logic (not returning error in case of invalid proto info) until it's verified that | ||
// clients are not sending invalid names. | ||
c.logger.Info("invalid tq partition", tag.Error(err), tag.NewStringsTag("proto", []string{proto.String()})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why strings?
c.logger.Info("invalid tq partition", tag.Error(err), tag.NewStringsTag("proto", []string{proto.String()})) | |
c.logger.Info("invalid tq partition", tag.Error(err), tag.NewStringTag("proto", proto.String())) |
@@ -36,7 +36,7 @@ import ( | |||
// Guidelines for creating new special UUID constants | |||
// Each UUID should be of the form: E0000000-R000-f000-f000-00000000000x | |||
// Where x is any hexadecimal value, E represents the entity type valid values are: | |||
// E = {NamespaceID = 1, WorkflowID = 2, RunID = 3} | |||
// E = {NamespaceId = 1, WorkflowID = 2, RunID = 3} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok but weird to have this change in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes my IDE rename things that it should not touch :/
common/tqid/task_queue_id.go
Outdated
// TaskQueueFamily represents the high-level "task queue" that user creates by explicitly providing a task queue name | ||
// when starting a worker or a workflow. A task queue family consists of separate TaskQueue's for different types of | ||
// task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TaskQueueFamily represents the high-level "task queue" that user creates by explicitly providing a task queue name | |
// when starting a worker or a workflow. A task queue family consists of separate TaskQueue's for different types of | |
// task. | |
// TaskQueueFamily represents the high-level "task queue" that user creates by explicitly providing a task queue name | |
// when starting a worker or a workflow. A task queue family consists of separate TaskQueues for different types of | |
// task (e.g. Workflow, Activity). |
// TaskQueue represents a logical task queue for a type of tasks (e.g. Activity or Workflow). Under the hood, | ||
// a TaskQueue can be broken down to multiple sticky or normal partitions. | ||
TaskQueue struct { | ||
family TaskQueueFamily |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider embedding TaskQueueFamily
directly? does that make any code simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider that, but decided not to do it because a task queue is not a family. some methods in family do not make sense to be added to TaskQueue.
} | ||
|
||
switch kind { | ||
case enumspb.TASK_QUEUE_KIND_STICKY: | ||
if partition != 0 { | ||
return nil, fmt.Errorf("%w. base name: %s, normal name: %s", ErrNonZeroSticky, baseName, normalName) | ||
} | ||
tq := &TaskQueue{namespace.ID(namespaceId), normalName} | ||
tq := &TaskQueue{TaskQueueFamily{namespace.ID(namespaceId), normalName}, taskType} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised the linter doesn't complain about using positional args for struct initialization...
I'd name the fields, but also if you embed TaskQueueFamily it's a little nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
service/matching/db_task_queue.go
Outdated
} | ||
|
||
// unversioned DB queues use the RPC name of their partition | ||
return dbq.partition.RpcName() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the whole point of copying those prefix+delimiter strings into this package is to decouple the rpc naming from persistence key naming (even though they're identical for unversioned). so just construct the string with this package's delimiters here, don't rely on RpcName
service/matching/db_task_queue.go
Outdated
// | ||
// All versioned DB queues use mangled names, using the following format: | ||
// | ||
// with build ID: /_sys/<base name>/<build ID base64 URL encoded>;<partition id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// with build ID: /_sys/<base name>/<build ID base64 URL encoded>;<partition id> | |
// with build ID: /_sys/<base name>/<build ID base64 URL encoded>#<partition id> |
if err != nil { | ||
return nil, err | ||
} | ||
tqMgr, err := e.getTaskQueuePartitionManager(ctx, taskQueue, normalStickyInfo, true) | ||
tqMgr, err := e.getTaskQueuePartitionManager(ctx, taskQueueFamily.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).RootPartition(), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks weird to be just assuming workflow+root here.. I see it's correct because of how we route this in client, but I think a comment would be nice to assure readers
service/matching/matching_engine.go
Outdated
if err != nil { | ||
return nil, err | ||
} | ||
tqm, err := e.getTaskQueuePartitionManager(ctx, taskQueue, normalStickyInfo, false) | ||
pm, err := e.getTaskQueuePartitionManager(ctx, taskQueueFamily.TaskQueue(req.TaskQueueType).RootPartition(), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one seems wrong, though.. why are we assuming the root here? shouldn't this work on any partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. fixed.
service/matching/task_writer.go
Outdated
@@ -65,9 +65,9 @@ type ( | |||
// taskWriter writes tasks sequentially to persistence | |||
taskWriter struct { | |||
status int32 | |||
tlMgr *taskQueueManagerImpl | |||
tlMgr *dbQueueManagerImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to change it, I just wanted to say that I'm amused that we're changing the type but keep the tl
in the name from pre-temporal :)
// to/from the persistence layer passes through userDataManager of the owning partition. | ||
// All other partitions long-poll the latest user data from the owning partition. | ||
userDataManager struct { | ||
sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't embed a mutex, name it. (so that clients can't just call Lock.. it should be internal. yes it's all in the same package but still)
errTaskQueueClosed = serviceerror.NewUnavailable("task queue closed") | ||
) | ||
|
||
func newUserDataManager(store persistence.TaskManager, matchingClient matchingservice.MatchingServiceClient, partition tqid.Partition, config *taskQueueConfig, logger log.Logger, registry namespace.Registry) *userDataManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func newUserDataManager(store persistence.TaskManager, matchingClient matchingservice.MatchingServiceClient, partition tqid.Partition, config *taskQueueConfig, logger log.Logger, registry namespace.Registry) *userDataManager { | |
func newUserDataManager( | |
store persistence.TaskManager, | |
matchingClient matchingservice.MatchingServiceClient, | |
partition tqid.Partition, | |
config *taskQueueConfig, | |
logger log.Logger, | |
registry namespace.Registry, | |
) *userDataManager { |
} | ||
|
||
// Stop does not unload the partition from matching engine. It is intended to be called by matching engine when | ||
// unloading the partition. For stopping and unloading a partition call unloadFromEngine instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment isn't right here, there's no unloadFromEngine
// Always set state enabled/disabled even if we're not setting the future since we only set | ||
// the future once but the enabled/disabled state may change over time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is obsolete.. we don't have a disabled state anymore
return p, nil | ||
} else if err != nil { | ||
// invalid degree | ||
return nil, err | ||
} | ||
return parent, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be cleaner to replace the type field here, and then the caller should use the type from the return value (instead of assuming workflow)
@@ -228,6 +199,12 @@ func (pm *taskQueuePartitionManagerImpl) AddTask( | |||
// default queue should stay alive even if requests go to other queues | |||
pm.defaultQueue.MarkAlive() | |||
} | |||
|
|||
if pm.partition.IsRoot() && !pm.HasPollerAfter(time.Now().Add(-noPollerThreshold)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a plan for how to count no-pollers for individual build ids? (of course no need to fix it here)
if err != nil { | ||
pm.unloadFromEngine() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do this here? in general the pattern has been to let the thing that got the persistence error or whatever unload itself
@@ -40,6 +40,7 @@ import ( | |||
"github.com/stretchr/testify/require" | |||
"github.com/stretchr/testify/suite" | |||
"github.com/uber-go/tally/v4" | |||
"go.temporal.io/server/common/tqid" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to server section
client/matching/client.go
Outdated
@@ -77,18 +86,16 @@ func (c *clientImpl) AddActivityTask( | |||
ctx context.Context, | |||
request *matchingservice.AddActivityTaskRequest, | |||
opts ...grpc.CallOption) (*matchingservice.AddActivityTaskResponse, error) { | |||
partition := c.loadBalancer.PickWritePartition( | |||
namespace.ID(request.GetNamespaceId()), | |||
partition, err := c.pickPartitionForWrite( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions AddActivity, AddWorkflow, etc. all look the same with a little variance. You can encapsulate differences and pass them into one common function.
client/matching/client.go
Outdated
} | ||
} | ||
|
||
func (c *clientImpl) pickPartitionForRead(proto *taskqueuepb.TaskQueue, nsid string, taskType enumspb.TaskQueueType, forwardedFrom string) (prtn tqid.Partition, release func(), err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same is about pickPartitionForWrite
and pickPartitionForRead
. You could have a common generic function which calls something (received as an argument) on p.TaskQueue() and return result.
## What changed? <!-- Describe what has changed in this PR --> - Replaced package `tqname` with `tqid` containing the following types - `TaskQueueFamily` struct including namespace ID and user-given TQ name - `TaskQueue` struct representing a high level task queue for a particular task type (activity vs workflow) - `Partition` interface which can be either a `NormalPartition` or a `StickyPartition` - Replaced `taskQueueID` with `PhysicalTaskQueueKey` to represent a DB-level task queue - Delete `stickyInfo` - Replaced all usage of `tqname` and `taskQueueID` with new types depending on the exact entity that we are dealing with (TQ vs Sticky TQ partition vs normal TQ partition vs physical queue) - Created `userDataManager` responsible for fetching/loading and updating user data. Extracted all the related logic from `taskQueueDB` and `taskQueuePartitionManger` into `userDataManager`. - Renamed `taskQueueManager` to `physicalTaskQueueManager` ## Why? <!-- Tell your future self why have you made these changes --> This is an overdue refactoring, we need proper distinction between all different entities mentioned above that are called "task queue". ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Mainly existing unit tests. Added few new ones where needed. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> No known behavioral change is introduced, but need to keep an eye on this to make sure the refactoring has not change anything unintentionally. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No.
What changed?
tqname
withtqid
containing the following typesTaskQueueFamily
struct including namespace ID and user-given TQ nameTaskQueue
struct representing a high level task queue for a particular task type (activity vs workflow)Partition
interface which can be either aNormalPartition
or aStickyPartition
taskQueueID
withPhysicalTaskQueueKey
to represent a DB-level task queuestickyInfo
tqname
andtaskQueueID
with new types depending on the exact entity that we are dealing with (TQ vs Sticky TQ partition vs normal TQ partition vs physical queue)userDataManager
responsible for fetching/loading and updating user data. Extracted all the related logic fromtaskQueueDB
andtaskQueuePartitionManger
intouserDataManager
.taskQueueManager
tophysicalTaskQueueManager
Why?
This is an overdue refactoring, we need proper distinction between all different entities mentioned above that are called "task queue".
How did you test it?
Mainly existing unit tests. Added few new ones where needed.
Potential risks
No known behavioral change is introduced, but need to keep an eye on this to make sure the refactoring has not change anything unintentionally.
Is hotfix candidate?
No.