New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fairqueuing implementation with unit tests #84544
Conversation
8b6d71e
to
148e965
Compare
/cc @MikeSpreitzer |
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/optionalwaitgroup.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/optionalwaitgroup.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/event_clock.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/dummy.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/event_clock.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/event_clock.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing_test.go
Outdated
Show resolved
Hide resolved
/cc @mars1024 @yue9944882 |
18a9a56
to
e175f42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I made it through the rest of this file.
func (qs *queueSet) enqueue(request *fq.Request) { | ||
queue := request.Queue | ||
queue.Enqueue(request) | ||
qs.updateQueueVirtualStartTime(request, queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected this update to happen when we start executing the request, not when we enqueue it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you are saying a pointer to the KEP is not good enough, we have to copy the reasoning from there into the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this can be explained in one sentence and that sentence would be super enlightening. Worst case, the sentence is "there's no possible short explanation of this, see the KEP"; that would tell me that there's something major missing in my mental model of how this code works.
But I expect that a useful sentence like "The virtual clock on a queue starts when the first request is queued (and e.g. not when the request begins to execute) because ____." can be written.
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching | ||
func (qs *queueSet) updateQueueVirtualStartTime(packet *fq.Request, queue *fq.Queue) { | ||
// When a request arrives to an empty queue with no requests executing: | ||
// len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the comment explain why this is to be done? ("it's in the KEP" is not an explanation!)
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Outdated
Show resolved
Hide resolved
} | ||
|
||
// dequeue dequeues a request from the queueSet | ||
func (qs *queueSet) dequeue() (*fq.Request, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+"Locked"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that you did not mention this on all internal methods that (a) must be called with the lock held and (b) do not have "Locked" in their name. What is your opinion on how to mark locking constraints on methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I didn't comment everywhere is simple: I reviewed in chunks and especially at first wasn't aware that there was a lock that some things needed to hold. It's best if the convention is universally applied.
if !ok { | ||
return nil, false | ||
} | ||
qs.counter.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why shouldn't the thread reading from the channel do this?
If the answer is some variation of "locking" then why shouldn't this be done by where queue.RequestsExecuting and qs.numRequestsEnqueued are updated in dequeue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "this" you mean increment the counter? That is for the same reason that the counter is incremented before forking a goroutine: (a) this is the code responsible for making another goroutine active and (b) doing the increment later might be too late.
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Outdated
Show resolved
Hide resolved
r.Queue.VirtualStart -= qs.estimatedServiceTime - S | ||
|
||
// request has finished, remove from requests executing | ||
r.Queue.RequestsExecuting-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we decrement qs.counter here? (or defer
such a decrement?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is no goroutine ending or going idle here.
76e8b25
to
61d93ff
Compare
I'm primarily bothered by two things, which may be the same thing:
|
} | ||
} | ||
|
||
// TestNoRestraint should fail because the dummy QueueSet exercises no control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly should fail? Not the whole test, or we wouldn't check it in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the first test function written, before we had a real implementation of QueueSet. Now that we do, this test is relatively uninteresting. To be fair, it does test the test.
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go
Outdated
Show resolved
Hide resolved
staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go
Show resolved
Hide resolved
exerciseQueueSetUniformScenario(t, qs, []uniformClient{ | ||
{1001001001, 5, 100, time.Second, time.Second}, | ||
}, time.Second*10, true, false, clk, counter) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we test changing config anywhere in this file? That seems important, both adding and removing queues, and whatever else is needed.
I would feel a lot more confident about the locking / counting if we could make a "thrash" test, e.g. use a real clock and simulate a bunch of random requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a randomized tester with config changes would be a good add. It would not need to use a real clock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some testing with a real clock adds assurance that there's no deadlocks hiding in the code.
57441fa
to
d1516b0
Compare
Here is a suggestion of a way to cleanly package up the union-of-unblocks logic. Define an internal (i.e., must be accessed only while holding the QueueSet's lock) abstraction like the following.
|
4866471
to
24065cf
Compare
/uncc |
BTW, since we are not super close on this one yet, I suggest we not do force-push, so that it is easier for reviewers to find the recent deltas. |
6619df1
to
8eaa6db
Compare
} | ||
} | ||
klog.V(5).Infof("request timed out after being enqueued\n") | ||
metrics.AddReject(qs.config.Name, "time-out") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to decrement the counter here? Wouldn't it have been incremented prior to sending something down the dequeue channel?
03aa546
to
4bc670b
Compare
I folded the latest changes from here into #85192 . |
…pec and typo in queueset_test.go to fix all presubmit tests
4bc670b
to
009a6bc
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: aaron-prindle The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@aaron-prindle: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
This should be closed, #85192 has merged. |
What type of PR is this?
/kind feature
What this PR does / why we need it:
Implement package for fair queuing algorithm, which will be used in KEP priority and fariness.
Special notes for your reviewer:
This PR includes work from the feature/rate-limiting-branch from these PRs:
#80786, #81621, #81707, #81788
This PR only adds the fair queuing logic and required libraries. The wiring up the fair queuing into a full request manager is not in this PR.
Does this PR introduce a user-facing change?:
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: