Skip to content

KEP-5229: Implement API dispatcher #132523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

macsko
Copy link
Member

@macsko macsko commented Jun 25, 2025

What type of PR is this?

/kind feature

What this PR does / why we need it:

This PR adds an APIDispatcher component that will be responsible for queueing and asynchronous execution of API calls during scheduling. This component could be then connected to scheduler's cache or used directly, if needed.

This PR focuses on adding the dispatcher. Further PRs will connect the dispatcher to scheduler and make use of it.

Customizability is achieved by using APICall interface that could be implemented in any way. apiCallRelevances passed to the dispatcher's constructor will allow to set any relevance hierarchy.

Which issue(s) this PR is related to:

Fixes: #132489
KEP: kubernetes/enhancements#5229

Special notes for your reviewer:

Does this PR introduce a user-facing change?

NONE

Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.:


@k8s-ci-robot k8s-ci-robot added release-note-none Denotes a PR that doesn't merit a release note. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. kind/feature Categorizes issue or PR as related to a new feature. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. do-not-merge/needs-sig Indicates an issue or PR lacks a `sig/foo` label and requires one. needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Jun 25, 2025
@k8s-ci-robot
Copy link
Contributor

This issue is currently awaiting triage.

If a SIG or subproject determines this is a relevant issue, they will accept it by applying the triage/accepted label and provide further guidance.

The triage/accepted label can be added by org members by writing /triage accepted in a comment.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@k8s-ci-robot k8s-ci-robot added needs-priority Indicates a PR lacks a `priority/foo` label and requires one. sig/scheduling Categorizes an issue or PR as relevant to SIG Scheduling. and removed do-not-merge/needs-sig Indicates an issue or PR lacks a `sig/foo` label and requires one. labels Jun 25, 2025
@k8s-ci-robot k8s-ci-robot requested review from dom4ha and sttts June 25, 2025 07:53
@k8s-ci-robot k8s-ci-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jun 25, 2025
@macsko macsko changed the title KEP-5229: Implement API dispatcher WIP: KEP-5229: Implement API dispatcher Jun 25, 2025
@k8s-ci-robot k8s-ci-robot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 25, 2025
@macsko
Copy link
Member Author

macsko commented Jun 25, 2025

/hold

@k8s-ci-robot k8s-ci-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jun 25, 2025
@macsko
Copy link
Member Author

macsko commented Jun 25, 2025

/cc @sanposhiho @dom4ha

This PR is not yet completed but the discussion could start.

@k8s-ci-robot k8s-ci-robot requested a review from sanposhiho June 25, 2025 07:55
@macsko macsko force-pushed the add_api_dispatcher branch from f9f67a7 to 31f084d Compare June 25, 2025 08:04
@lmktfy
Copy link

lmktfy commented Jun 26, 2025

Are we 100% sure this doesn't need a changelog entry? I think it's useful to changelog this PR in its own right.

@k8s-ci-robot k8s-ci-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 26, 2025
@macsko
Copy link
Member Author

macsko commented Jun 26, 2025

Are we 100% sure this doesn't need a changelog entry? I think it's useful to changelog this PR in its own right.

Yes, it needs an entry. I'll add it when the PR won't be WIP.

@macsko macsko force-pushed the add_api_dispatcher branch from 31f084d to f844906 Compare June 26, 2025 12:12
@k8s-ci-robot k8s-ci-robot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jun 26, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from f844906 to b1caf2d Compare June 26, 2025 12:16
@k8s-ci-robot k8s-ci-robot added area/release-eng Issues or PRs related to the Release Engineering subproject sig/release Categorizes an issue or PR as relevant to SIG Release. labels Jun 26, 2025
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: macsko
Once this PR has been reviewed and has the lgtm label, please assign sttts for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot removed the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jun 26, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from b1caf2d to 153b9e8 Compare June 26, 2025 12:22
@k8s-ci-robot k8s-ci-robot added needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. and removed needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. labels Jun 26, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from 153b9e8 to 99fb477 Compare June 27, 2025 07:53
@k8s-ci-robot k8s-ci-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 27, 2025
@macsko macsko changed the title WIP: KEP-5229: Implement API dispatcher KEP-5229: Implement API dispatcher Jun 27, 2025
@k8s-ci-robot k8s-ci-robot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 27, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from 99fb477 to e524109 Compare June 27, 2025 08:39
Comment on lines +161 to +188
// removeFromQueue removes the objectUID from the queue and returns the recreated queue.
func removeFromQueue(queue *buffer.Ring[types.UID], objectUID types.UID) *buffer.Ring[types.UID] {
newQueue := buffer.NewRing[types.UID](buffer.RingOptions{
InitialSize: queue.Len(),
NormalSize: queue.Cap(),
})
for {
uid, ok := queue.ReadOne()
if !ok {
break
}
if uid != objectUID {
newQueue.WriteOne(uid)
}
}
return newQueue
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice of data structure for the queue involves a trade-off based on these requirements:

  • Fast, FIFO-ordered inserts and pops.
  • Efficient removal of an occasional item from the middle of the queue.

We have a few options:

  1. Queue based on slice (buffer.Ring): Offers the best performance for standard FIFO operations. But, removing an element from the middle requires recreating (in buffer.Ring) or shifting (if we do our own) the underlying slice.
  2. Linked list ( container/list): Provides efficient deletion from the middle of the queue. But, standard push/pop operations have lower performance.
  3. Something other?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call removals should be fairly rare as they will happen only for NoOps (due to merge or update).

If we really care about performance in such cases, we could leave NoOps in the queue and either check for it while we pop or annotate them instead of removing from the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really care about performance in such cases, we could leave NoOps in the queue and either check for it while we pop or annotate them instead of removing from the queue.

I also thought about this - we could just check on pop if the UID is in the apiCalls map, but I don't have a strong opinion on any approach.

@k8s-ci-robot k8s-ci-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jul 2, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from e524109 to feede49 Compare July 2, 2025 07:19
@k8s-ci-robot k8s-ci-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jul 2, 2025
@macsko macsko force-pushed the add_api_dispatcher branch from feede49 to 43b022b Compare July 2, 2025 09:51
@macsko
Copy link
Member Author

macsko commented Jul 2, 2025

/retest


// Add adds an API call to the dispatcher's queue. It returns an error if the call is not enqueued
// (e.g., if it's skipped). The caller should handle ErrCallSkipped if returned.
func (ad *APIDispatcher) Add(incomingAPICall fwk.APICall, opts fwk.APICallOptions) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it rather outgoingAPICall?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"incoming" to the dispatcher. I renamed it to just newAPICall.

if oldAPICall.CallType() != apiCall.CallType() {
if cc.isEquallyRelevant(oldAPICall.CallType(), apiCall.CallType()) {
// If call types are different, but are equally relevant, the relevance is misconfigured.
return fmt.Errorf("relevance misconfiguration: call types %q and %q have the same relevance, but must be different for the same object",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate against misconfiguration on callController creation instead of doing it at runtime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APICallRelevances map doesn't have enough information to detect this. This probably should be moved even to the upper level (configuration?). For now, I removed this validation from here.

// The receiver should incorporate all necessary information from oldCall, as oldCall will be discarded.
// After this method is called, IsNoOp() should be checked to see if the call can be skipped.
Merge(oldCall APICall) error
// Update applies changes from this API call to the given object, potentially storing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description and the name of the method is not super clear. Maybe rename it to Sync?

Copy link
Member Author

@macsko macsko Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed Update to Sync and UpdateObject to SyncObject, thanks

onFinish chan<- error
// timestamp holds the time when the call was added to the APIDispatcher
// and is used to verify if the call details have been updated while this call was in-flight.
timestamp time.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of the timestamp term is a bit misleading, as it suggests that this is the ordering element of the api calls queue, while it's for identifiaction of the api call changes. Maybe we could call it fingerprint or similar to denote it's just used for distinguishing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I changed it to callID and hold callIDCounter that is incremented on each addition to the controller.


// Acquire a goroutine before popping a call. This ordering prevents a popped
// call from waiting (being in in-flight) for a long time.
ok := ad.goroutinesLimiter.acquire()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider returning some Runner object instead of true/false, and use it to run an api call:

runner := ad.goroutineLimiter.acquire()
apiCall := ad.callController.pop()
runner.Run(apiCall)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Now acquire() returns an interface with run() method.

@k8s-ci-robot k8s-ci-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jul 3, 2025
@macsko macsko force-pushed the add_api_dispatcher branch 2 times, most recently from 65b3d9b to ef096a2 Compare July 3, 2025 10:22
@k8s-ci-robot k8s-ci-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jul 3, 2025
@utam0k
Copy link
Member

utam0k commented Jul 3, 2025

/unassign /assign misoperation

Copy link
Member

@sanposhiho sanposhiho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions and initial reviews to start off

return cc.apiCallRelevances[newCall] < cc.apiCallRelevances[oldCall]
}

// reconcile compares a new API call with an existing one for the same object.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the function does is different from how I feel like from the word reconcile in K8s world.
Maybe just mergeAPICalls or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the same reason, I feel like the naming callController is a bit weird too. callQueue or something would be more straightforward?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the function does is different from how I feel like from the word reconcile in K8s world.
Maybe just mergeAPICalls or something?

But, if it's scheduler-internal-specific function, used only by the callController itself, do we need to care of similar name that is somewhere else in the k8s world? I thought about the merge name, but merging is only a part of this function - it could also skip or overwrite completely, what doesn't have much with merging (and calling merge on a call as well).

For the same reason, I feel like the naming callController is a bit weird too. callQueue or something would be more straightforward?

Again, it does more than just queueing, but if you feel the controller is too indirect, I can rename it to queue.

if cc.isLessRelevant(oldAPICall.CallType(), apiCall.CallType()) {
// The new API call is less relevant than the existing one, so skip it.
err := fmt.Errorf("a more relevant call is already enqueued for this object: %w", fwk.ErrCallSkipped)
apiCall.sendOnFinish(err)
Copy link
Member

@sanposhiho sanposhiho Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: how should users (= who made this API call) react to this error? What would they want to do, for example?
If they can just ignore the errors, then we can just log it here, and no need to send err to the channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to just return nil (== everything went fine) if the call was overwritten or skipped. In that case, the newer call wasn't even executed yet, so the logic that would handle that result, might have to behave differently.

For example, cache processing successful update, might just store the update's result. But, if it receives an overwrite, it would mean that the cache already started tracking the more relevant call (e.g. binding) and should drop the previous one (e.g. status update).

Similarly, skipped should mean that the update won't be tracked, because the more important call is already enqueued.

}

// Add adds an API call to the dispatcher's queue. It returns an error if the call is not enqueued
// (e.g., if it's skipped). The caller should handle ErrCallSkipped if returned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe similar to the question in another thread, but what would the caller do with this error?

Comment on lines +120 to +124
if oldAPICall.CallType() != apiCall.CallType() {
// API call types don't match, so we overwrite the old one.
oldAPICall.sendOnFinish(fmt.Errorf("a more relevant call was enqueued for this object: %w", fwk.ErrCallOverwritten))
return nil
}
Copy link
Member

@sanposhiho sanposhiho Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:
So, can we always overwrite the old one with the new one if the call type is different? When we define a new API call, or users define a new custom API call, we/they have to make sure one can overwrite the other if the API call type is different? If Yes, that should be clearly mentioned on the CallType() fn comment in the interface. I think that's a strong factor when defining CallType

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we overwrite the old one if the new call is more relevant. More advanced logic could be added in the next releases. Now, if someone adds an API call for an object and configure its relevance, they have to make sure it can be overwritten.

The logic is described in the APICallRelevances comment. If you feel it's not enough, I could elaborate more there.

// APICallRelevances maps all possible APICallTypes to a relevance value.
// A more relevant API call should overwrite a less relevant one for the same object.
// Types of the same relevance should only be defined for different object types.
type APICallRelevances map[APICallType]int


// Run starts the main processing loop of the APIDispatcher, which pops calls
// from the queue and dispatches them to worker goroutines for execution.
func (ad *APIDispatcher) Run(ctx context.Context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the caller have to do two things when shutting down, if having ctx on the func arg while having Close() (they have to cancel ctx, and then call Close()).

I think we should use ctx.Done() instead of ad.stop. (assuming ctx is cancel()ed when the scheduler is shutting down). When ctx is Done(), we should also do what we do in Close() right now.

Or, if you prefer to use Close() with ad.stop, we should get logger only from the function arg, initialize ctx in Run(), and cancel it when ad.stop is called (so that apiCall.Execute() is properly canceled).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I changed the Run to take the logger, but the context is initialized inside the func and canceled on ad.stop.

}

// Close shuts down the APIDispatcher.
func (ad *APIDispatcher) Close() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's have some info log when closing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's needed. We don't log in any Close method in the scheduler. It's called only on the kube-scheduler termination, so I don't see a point of logging it anyway.

return err != nil && !errors.Is(err, ErrCallSkipped) && !errors.Is(err, ErrCallOverwritten)
}

// APICallType defines a call type name. Each APICall implementation must have a unique type within a given dispatcher.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., CallType: status_update for Pod and CallType: status_update for Node can both be defined, right?

Suggested change
// APICallType defines a call type name. Each APICall implementation must have a unique type within a given dispatcher.
// APICallType defines a call type name.
// APICall implementations for the same type must have a unique type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, CallType should be strictly for a specific object. A few days ago I even renamed status_update to pod_status_update to emphasize it.

Obviously, having a one CallType for multiple different objects might work, but the validation would be harder and I don't see a point of allowing it at all.

@macsko macsko force-pushed the add_api_dispatcher branch from ef096a2 to 28da7ac Compare July 7, 2025 10:24
@macsko macsko force-pushed the add_api_dispatcher branch from 28da7ac to 28d2eef Compare July 7, 2025 10:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/release-eng Issues or PRs related to the Release Engineering subproject cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. kind/feature Categorizes issue or PR as related to a new feature. needs-priority Indicates a PR lacks a `priority/foo` label and requires one. needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. release-note-none Denotes a PR that doesn't merit a release note. sig/release Categorizes an issue or PR as relevant to SIG Release. sig/scheduling Categorizes an issue or PR as relevant to SIG Scheduling. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KEP-5229: Implement async API Queue
6 participants