-
Notifications
You must be signed in to change notification settings - Fork 40.9k
KEP-5229: Implement API dispatcher #132523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This issue is currently awaiting triage. If a SIG or subproject determines this is a relevant issue, they will accept it by applying the The Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/hold |
/cc @sanposhiho @dom4ha This PR is not yet completed but the discussion could start. |
f9f67a7
to
31f084d
Compare
Are we 100% sure this doesn't need a changelog entry? I think it's useful to changelog this PR in its own right. |
Yes, it needs an entry. I'll add it when the PR won't be WIP. |
31f084d
to
f844906
Compare
f844906
to
b1caf2d
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: macsko The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
b1caf2d
to
153b9e8
Compare
153b9e8
to
99fb477
Compare
99fb477
to
e524109
Compare
// removeFromQueue removes the objectUID from the queue and returns the recreated queue. | ||
func removeFromQueue(queue *buffer.Ring[types.UID], objectUID types.UID) *buffer.Ring[types.UID] { | ||
newQueue := buffer.NewRing[types.UID](buffer.RingOptions{ | ||
InitialSize: queue.Len(), | ||
NormalSize: queue.Cap(), | ||
}) | ||
for { | ||
uid, ok := queue.ReadOne() | ||
if !ok { | ||
break | ||
} | ||
if uid != objectUID { | ||
newQueue.WriteOne(uid) | ||
} | ||
} | ||
return newQueue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The choice of data structure for the queue involves a trade-off based on these requirements:
- Fast, FIFO-ordered inserts and pops.
- Efficient removal of an occasional item from the middle of the queue.
We have a few options:
- Queue based on slice (buffer.Ring): Offers the best performance for standard FIFO operations. But, removing an element from the middle requires recreating (in buffer.Ring) or shifting (if we do our own) the underlying slice.
- Linked list ( container/list): Provides efficient deletion from the middle of the queue. But, standard push/pop operations have lower performance.
- Something other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call removals should be fairly rare as they will happen only for NoOps (due to merge or update).
If we really care about performance in such cases, we could leave NoOps in the queue and either check for it while we pop or annotate them instead of removing from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we really care about performance in such cases, we could leave NoOps in the queue and either check for it while we pop or annotate them instead of removing from the queue.
I also thought about this - we could just check on pop if the UID is in the apiCalls
map, but I don't have a strong opinion on any approach.
e524109
to
feede49
Compare
feede49
to
43b022b
Compare
/retest |
|
||
// Add adds an API call to the dispatcher's queue. It returns an error if the call is not enqueued | ||
// (e.g., if it's skipped). The caller should handle ErrCallSkipped if returned. | ||
func (ad *APIDispatcher) Add(incomingAPICall fwk.APICall, opts fwk.APICallOptions) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it rather outgoingAPICall?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"incoming" to the dispatcher. I renamed it to just newAPICall
.
if oldAPICall.CallType() != apiCall.CallType() { | ||
if cc.isEquallyRelevant(oldAPICall.CallType(), apiCall.CallType()) { | ||
// If call types are different, but are equally relevant, the relevance is misconfigured. | ||
return fmt.Errorf("relevance misconfiguration: call types %q and %q have the same relevance, but must be different for the same object", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we validate against misconfiguration on callController creation instead of doing it at runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
APICallRelevances map doesn't have enough information to detect this. This probably should be moved even to the upper level (configuration?). For now, I removed this validation from here.
// The receiver should incorporate all necessary information from oldCall, as oldCall will be discarded. | ||
// After this method is called, IsNoOp() should be checked to see if the call can be skipped. | ||
Merge(oldCall APICall) error | ||
// Update applies changes from this API call to the given object, potentially storing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description and the name of the method is not super clear. Maybe rename it to Sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed Update to Sync and UpdateObject to SyncObject, thanks
onFinish chan<- error | ||
// timestamp holds the time when the call was added to the APIDispatcher | ||
// and is used to verify if the call details have been updated while this call was in-flight. | ||
timestamp time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of the timestamp
term is a bit misleading, as it suggests that this is the ordering element of the api calls queue, while it's for identifiaction of the api call changes. Maybe we could call it fingerprint
or similar to denote it's just used for distinguishing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, I changed it to callID and hold callIDCounter that is incremented on each addition to the controller.
|
||
// Acquire a goroutine before popping a call. This ordering prevents a popped | ||
// call from waiting (being in in-flight) for a long time. | ||
ok := ad.goroutinesLimiter.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider returning some Runner object instead of true/false, and use it to run an api call:
runner := ad.goroutineLimiter.acquire()
apiCall := ad.callController.pop()
runner.Run(apiCall)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Now acquire() returns an interface with run() method.
65b3d9b
to
ef096a2
Compare
/unassign |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions and initial reviews to start off
return cc.apiCallRelevances[newCall] < cc.apiCallRelevances[oldCall] | ||
} | ||
|
||
// reconcile compares a new API call with an existing one for the same object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the function does is different from how I feel like from the word reconcile
in K8s world.
Maybe just mergeAPICalls
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the same reason, I feel like the naming callController is a bit weird too. callQueue
or something would be more straightforward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the function does is different from how I feel like from the word
reconcile
in K8s world.
Maybe justmergeAPICalls
or something?
But, if it's scheduler-internal-specific function, used only by the callController itself, do we need to care of similar name that is somewhere else in the k8s world? I thought about the merge
name, but merging is only a part of this function - it could also skip or overwrite completely, what doesn't have much with merging (and calling merge on a call as well).
For the same reason, I feel like the naming callController is a bit weird too.
callQueue
or something would be more straightforward?
Again, it does more than just queueing, but if you feel the controller
is too indirect, I can rename it to queue
.
if cc.isLessRelevant(oldAPICall.CallType(), apiCall.CallType()) { | ||
// The new API call is less relevant than the existing one, so skip it. | ||
err := fmt.Errorf("a more relevant call is already enqueued for this object: %w", fwk.ErrCallSkipped) | ||
apiCall.sendOnFinish(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: how should users (= who made this API call) react to this error? What would they want to do, for example?
If they can just ignore the errors, then we can just log it here, and no need to send err to the channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to just return nil (== everything went fine) if the call was overwritten or skipped. In that case, the newer call wasn't even executed yet, so the logic that would handle that result, might have to behave differently.
For example, cache processing successful update, might just store the update's result. But, if it receives an overwrite, it would mean that the cache already started tracking the more relevant call (e.g. binding) and should drop the previous one (e.g. status update).
Similarly, skipped should mean that the update won't be tracked, because the more important call is already enqueued.
} | ||
|
||
// Add adds an API call to the dispatcher's queue. It returns an error if the call is not enqueued | ||
// (e.g., if it's skipped). The caller should handle ErrCallSkipped if returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe similar to the question in another thread, but what would the caller do with this error?
if oldAPICall.CallType() != apiCall.CallType() { | ||
// API call types don't match, so we overwrite the old one. | ||
oldAPICall.sendOnFinish(fmt.Errorf("a more relevant call was enqueued for this object: %w", fwk.ErrCallOverwritten)) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
So, can we always overwrite the old one with the new one if the call type is different? When we define a new API call, or users define a new custom API call, we/they have to make sure one can overwrite the other if the API call type is different? If Yes, that should be clearly mentioned on the CallType() fn comment in the interface. I think that's a strong factor when defining CallType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we overwrite the old one if the new call is more relevant. More advanced logic could be added in the next releases. Now, if someone adds an API call for an object and configure its relevance, they have to make sure it can be overwritten.
The logic is described in the APICallRelevances
comment. If you feel it's not enough, I could elaborate more there.
// APICallRelevances maps all possible APICallTypes to a relevance value. | |
// A more relevant API call should overwrite a less relevant one for the same object. | |
// Types of the same relevance should only be defined for different object types. | |
type APICallRelevances map[APICallType]int |
|
||
// Run starts the main processing loop of the APIDispatcher, which pops calls | ||
// from the queue and dispatches them to worker goroutines for execution. | ||
func (ad *APIDispatcher) Run(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the caller have to do two things when shutting down, if having ctx
on the func arg while having Close()
(they have to cancel ctx, and then call Close()
).
I think we should use ctx.Done() instead of ad.stop
. (assuming ctx
is cancel()ed when the scheduler is shutting down). When ctx is Done(), we should also do what we do in Close()
right now.
Or, if you prefer to use Close()
with ad.stop
, we should get logger only from the function arg, initialize ctx
in Run()
, and cancel it when ad.stop
is called (so that apiCall.Execute()
is properly canceled).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I changed the Run to take the logger
, but the context is initialized inside the func and canceled on ad.stop
.
} | ||
|
||
// Close shuts down the APIDispatcher. | ||
func (ad *APIDispatcher) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's have some info log when closing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's needed. We don't log in any Close
method in the scheduler. It's called only on the kube-scheduler termination, so I don't see a point of logging it anyway.
return err != nil && !errors.Is(err, ErrCallSkipped) && !errors.Is(err, ErrCallOverwritten) | ||
} | ||
|
||
// APICallType defines a call type name. Each APICall implementation must have a unique type within a given dispatcher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g., CallType: status_update
for Pod and CallType: status_update
for Node can both be defined, right?
// APICallType defines a call type name. Each APICall implementation must have a unique type within a given dispatcher. | |
// APICallType defines a call type name. | |
// APICall implementations for the same type must have a unique type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, CallType should be strictly for a specific object. A few days ago I even renamed status_update
to pod_status_update
to emphasize it.
Obviously, having a one CallType for multiple different objects might work, but the validation would be harder and I don't see a point of allowing it at all.
ef096a2
to
28da7ac
Compare
28da7ac
to
28d2eef
Compare
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR adds an
APIDispatcher
component that will be responsible for queueing and asynchronous execution of API calls during scheduling. This component could be then connected to scheduler's cache or used directly, if needed.This PR focuses on adding the dispatcher. Further PRs will connect the dispatcher to scheduler and make use of it.
Customizability is achieved by using
APICall
interface that could be implemented in any way.apiCallRelevances
passed to the dispatcher's constructor will allow to set any relevance hierarchy.Which issue(s) this PR is related to:
Fixes: #132489
KEP: kubernetes/enhancements#5229
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: