Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #112198: Call queueSet::boundNextDispatchLocked enough #112295

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ type queueSetCompleter struct {
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The fields listed before the lock
// should not be changed; the fields listed after the
// lock must be accessed only while holding the lock. The methods of
// this type follow the naming convention that the suffix "Locked"
// means the caller must hold the lock; for a method whose name does
// not end in "Locked" either acquires the lock or does not care about
// locking.
// lock must be accessed only while holding the lock.
//
// The methods of this type follow the naming convention that the
// suffix "Locked" means the caller must hold the lock; for a method
// whose name does not end in "Locked" either acquires the lock or
// does not care about locking.
//
// The methods of this type also follow the convention that the suffix
// "ToBoundLocked" means that the caller may have to follow up with a
// call to `boundNextDispatchLocked`. This is so for a method that
// changes what request is oldest in a queue, because that change means
// that the anti-windup hack in boundNextDispatchLocked needs to be
// applied wrt the revised oldest request in the queue.
type queueSet struct {
clock eventclock.Interface
estimatedServiceDuration time.Duration
Expand Down Expand Up @@ -396,7 +404,9 @@ func (req *request) wait() (bool, bool) {
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
// remove the request from the queue as it has timed out
queue := req.queue
if req.removeFromQueueLocked() != nil {
defer qs.boundNextDispatchLocked(queue)
qs.totRequestsWaiting--
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
Expand Down Expand Up @@ -521,7 +531,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)
// NOTE: currently timeout is only checked for each new request. This means that there can be
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
Expand All @@ -543,7 +553,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
queueNoteFn: queueNoteFn,
workEstimate: qs.completeWorkEstimate(workEstimate),
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
Expand Down Expand Up @@ -583,9 +593,9 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
return bestQueueIdx
}

// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
timeoutCount := 0
now := qs.clock.Now()
reqs := queue.requests
Expand Down Expand Up @@ -616,11 +626,11 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
}
}

// rejectOrEnqueueLocked rejects or enqueues the newly arrived
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
// request, which has been assigned to a queue. If up against the
// queue length limit and the concurrency limit then returns false.
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met
Expand All @@ -629,12 +639,12 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
return false
}

qs.enqueueLocked(request)
qs.enqueueToBoundLocked(request)
return true
}

// enqueues a request into its queue.
func (qs *queueSet) enqueueLocked(request *request) {
func (qs *queueSet) enqueueToBoundLocked(request *request) {
queue := request.queue
now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
Expand Down Expand Up @@ -693,7 +703,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
// be false when either all queues are empty or the request at the head
// of the next queue cannot be dispatched.
func (qs *queueSet) dispatchLocked() bool {
queue, request := qs.findDispatchQueueLocked()
queue, request := qs.findDispatchQueueToBoundLocked()
if queue == nil {
return false
}
Expand Down Expand Up @@ -729,6 +739,11 @@ func (qs *queueSet) dispatchLocked() bool {
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
if request.totalWork() > rDecrement/100 { // A single increment should never be so big
klog.Errorf("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v with implausibly high work %v from queue %d with start R %v",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR)
}
queue.nextDispatchR += request.totalWork()
return true
}
Expand Down Expand Up @@ -756,11 +771,12 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
return true
}

// findDispatchQueueLocked examines the queues in round robin order and
// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now.
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := fqrequest.MaxSeatSeconds
dsMin := fqrequest.MaxSeatSeconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
minQueueExpected = test.queues[queueIdx]
}

minQueueGot, reqGot := qs.findDispatchQueueLocked()
minQueueGot, reqGot := qs.findDispatchQueueToBoundLocked()
if minQueueExpected != minQueueGot {
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
}
Expand Down