diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index a6b511f8dd9c..6e41d74175fe 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -47,7 +47,6 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs", - "//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD index 6707c197d4b3..fcd44292fdaf 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD @@ -19,6 +19,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs", ], diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 137907ecc45a..12378153562b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -70,7 +70,9 @@ type QueueSetConfig struct { Name string // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time ConcurrencyLimit int - // DesiredNumQueues is the number of queues that the API says should exist now + // DesiredNumQueues is the number of queues that the API says + // should exist now. This may be zero, in which case + // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. DesiredNumQueues int // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time QueueLengthLimit int diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/BUILD similarity index 70% rename from staging/src/k8s.io/apiserver/pkg/util/promise/BUILD rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/BUILD index dc7c60f537ed..df09dcc4d9c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/promise/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/BUILD @@ -3,8 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["interface.go"], - importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise", - importpath = "k8s.io/apiserver/pkg/util/promise", + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise", visibility = ["//visibility:public"], ) @@ -19,7 +19,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", - "//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go similarity index 100% rename from staging/src/k8s.io/apiserver/pkg/util/promise/interface.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/BUILD similarity index 77% rename from staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/BUILD index b05144f2507f..3ea0f8e2ed6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/BUILD @@ -3,12 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["lockingpromise.go"], - importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise/lockingpromise", - importpath = "k8s.io/apiserver/pkg/util/promise/lockingpromise", + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise", visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go similarity index 97% rename from staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go index a449df2c7886..5013abf3fd41 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go @@ -20,7 +20,7 @@ import ( "sync" "k8s.io/apiserver/pkg/util/flowcontrol/counter" - "k8s.io/apiserver/pkg/util/promise" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) // lockingPromise implements LockingMutable based on a condition diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go similarity index 100% rename from staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD index e29bab63f346..ca0cde93c531 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -15,9 +15,9 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go index d9431378ad37..840d78ea1816 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go @@ -14,33 +14,31 @@ See the License for the specific language governing permissions and limitations under the License. */ -package queueset - -// This package implements a technique called "fair queuing for server -// requests". One QueueSet is a set of queues operating according to -// this technique. - +// Package queueset implements a technique called "fair queuing for +// server requests". One QueueSet is a set of queues operating +// according to this technique. +// // Fair queuing for server requests is inspired by the fair queuing // technique from the world of networking. You can find a good paper // on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or // http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf // and there is an implementation outline in the Wikipedia article at // https://en.wikipedia.org/wiki/Fair_queuing . - +// // Fair queuing for server requests differs from traditional fair -// queuing in three ways: (1) we are dispatching requests to be -// executed within a process rather than transmitting packets on a -// network link, (2) multiple requests can be executing at once, and -// (3) the service time (execution duration) is not known until the -// execution completes. - +// queuing in three ways: (1) we are dispatching application layer +// requests to a server rather than transmitting packets on a network +// link, (2) multiple requests can be executing at once, and (3) the +// service time (execution duration) is not known until the execution +// completes. +// // The first two differences can easily be handled by straightforward // adaptation of the concept called "R(t)" in the original paper and // "virtual time" in the implementation outline. In that // implementation outline, the notation now() is used to mean reading // the virtual clock. In the original paper’s terms, "R(t)" is the -// number of "rounds" that have been completed at real time t, where a -// round consists of virtually transmitting one bit from every +// number of "rounds" that have been completed at real time t --- +// where a round consists of virtually transmitting one bit from every // non-empty queue in the router (regardless of which queue holds the // packet that is really being transmitted at the moment); in this // conception, a packet is considered to be "in" its queue until the @@ -55,12 +53,12 @@ package queueset // respect to t is // // 1 / NEQ(t) . - +// // To generalize from transmitting one packet at a time to executing C // requests at a time, that derivative becomes // // C / NEQ(t) . - +// // However, sometimes there are fewer than C requests available to // execute. For a given queue "q", let us also write "reqs(q, t)" for // the number of requests of that queue that are executing at that @@ -79,25 +77,25 @@ package queueset // real nanosecond). Where the networking implementation outline adds // packet size to a virtual time, in our version this corresponds to // adding a service time (i.e., duration) to virtual time. - +// // The third difference is handled by modifying the algorithm to // dispatch based on an initial guess at the request’s service time // (duration) and then make the corresponding adjustments once the // request’s actual service time is known. This is similar, although // not exactly isomorphic, to the original paper’s adjustment by -// `$delta` for the sake of promptness. - +// `$\delta$` for the sake of promptness. +// // For implementation simplicity (see below), let us use the same // initial service time guess for every request; call that duration // G. A good choice might be the service time limit (1 // minute). Different guesses will give slightly different dynamics, // but any positive number can be used for G without ruining the // long-term behavior. - +// // As in ordinary fair queuing, there is a bound on divergence from // the ideal. In plain fair queuing the bound is one packet; in our // version it is C requests. - +// // To support efficiently making the necessary adjustments once a // request’s actual service time is known, the virtual finish time of // a request and the last virtual finish time of a queue are not @@ -118,3 +116,5 @@ package queueset // queue’s virtual start time is advanced by G. When a request // finishes being served, and the actual service time was S, the // queue’s virtual start time is decremented by G - S. +// +package queueset diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 0d3762fd38d0..7b789e7ca148 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,8 +28,8 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" - "k8s.io/apiserver/pkg/util/promise/lockingpromise" "k8s.io/apiserver/pkg/util/shufflesharding" "k8s.io/klog" ) @@ -56,48 +56,62 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) // the QueueSet interface. The clock, GoRoutineCounter, and estimated // service time should not be changed; the fields listed after the // lock must be accessed only while holding the lock. +// This is not yet designed to support limiting concurrency without +// queuing (this will need to be added soon). type queueSet struct { clock clock.PassiveClock counter counter.GoRoutineCounter estimatedServiceTime float64 - lock sync.Mutex + lock sync.Mutex + + // config holds the current configuration. Its DesiredNumQueues + // may be less than the current number of queues. If its + // DesiredNumQueues is zero then its other queuing parameters + // retain the settings they had when DesiredNumQueues was last + // non-zero (if ever). config fq.QueueSetConfig // queues may be longer than the desired number, while the excess // queues are still draining. - queues []*queue - virtualTime float64 + queues []*queue + + // virtualTime is the number of virtual seconds since process startup + virtualTime float64 + + // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated lastRealTime time.Time // robinIndex is the index of the last queue dispatched robinIndex int - // numRequestsEnqueued is the number of requests currently waiting - // in a queue (eg: incremeneted on Enqueue, decremented on Dequue) - numRequestsEnqueued int + // totRequestsWaiting is the sum, over all the queues, of the + // number of requests waiting in that queue + totRequestsWaiting int + + // totRequestsExecuting is the total number of requests of this + // queueSet that are currently executing. That is the same as the + // sum, over all the queues, of the number of requests executing + // from that queue. + totRequestsExecuting int emptyHandler fq.EmptyHandler dealer *shufflesharding.Dealer } -// NewQueueSet creates a new QueueSet object +// NewQueueSet creates a new QueueSet object. // There is a new QueueSet created for each priority level. func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) - if err != nil { - return nil, errors.Wrap(err, "shuffle sharding dealer creation failed") - } - fq := &queueSet{ - config: config, - counter: qsf.counter, - queues: createQueues(config.DesiredNumQueues, 0), clock: qsf.clock, - virtualTime: 0, + counter: qsf.counter, estimatedServiceTime: 60, + config: config, lastRealTime: qsf.clock.Now(), - dealer: dealer, + } + err := fq.SetConfiguration(config) + if err != nil { + return nil, err } return fq, nil } @@ -106,7 +120,7 @@ func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, e func createQueues(n, baseIndex int) []*queue { fqqueues := make([]*queue, n) for i := 0; i < n; i++ { - fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)} + fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)} } return fqqueues } @@ -118,19 +132,26 @@ func createQueues(n, baseIndex int) []*queue { func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { qs.lockAndSyncTime() defer qs.lock.Unlock() + var dealer *shufflesharding.Dealer - dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) - if err != nil { - return errors.Wrap(err, "shuffle sharding dealer creation failed") - } - - // Adding queues is the only thing that requires immediate action - // Removing queues is handled by omitting indexes >DesiredNum from - // chooseQueueIndexLocked - numQueues := len(qs.queues) - if config.DesiredNumQueues > numQueues { - qs.queues = append(qs.queues, - createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + if config.DesiredNumQueues > 0 { + var err error + dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) + if err != nil { + return errors.Wrap(err, "shuffle sharding dealer creation failed") + } + // Adding queues is the only thing that requires immediate action + // Removing queues is handled by omitting indexes >DesiredNum from + // chooseQueueIndexLocked + numQueues := len(qs.queues) + if config.DesiredNumQueues > numQueues { + qs.queues = append(qs.queues, + createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + } + } else { + config.QueueLengthLimit = qs.config.QueueLengthLimit + config.HandSize = qs.config.HandSize + config.RequestWaitLimit = qs.config.RequestWaitLimit } qs.config = config @@ -162,12 +183,15 @@ func (qs *queueSet) Quiesce(eh fq.EmptyHandler) { qs.maybeForkEmptyHandlerLocked() } -// Values passed through a request's Decision +// A decision about a request +type requestDecision int + +// Values passed through a request's decision const ( - DecisionExecute = "execute" - DecisionReject = "reject" - DecisionCancel = "cancel" - DecisionTryAnother = "tryAnother" + decisionExecute requestDecision = iota + decisionReject + decisionCancel + decisionTryAnother ) // Wait uses the given hashValue as the source of entropy as it @@ -186,14 +210,26 @@ const ( // irrelevant. func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) { var req *request - decision := func() string { + decision := func() requestDecision { qs.lockAndSyncTime() defer qs.lock.Unlock() // A call to Wait while the system is quiescing will be rebuffed by // returning `tryAnother=true`. if qs.emptyHandler != nil { klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) - return DecisionTryAnother + return decisionTryAnother + } + + // ======================================================================== + // Step 0: + // Apply only concurrency limit, if zero queues desired + if qs.config.DesiredNumQueues < 1 { + if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit { + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit) + return decisionReject + } + req = qs.dispatchSansQueue(descr1, descr2) + return decisionExecute } // ======================================================================== @@ -209,7 +245,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i if req == nil { klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) metrics.AddReject(qs.config.Name, "queue-full") - return DecisionReject + return decisionReject } // ======================================================================== @@ -239,7 +275,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i select { case <-doneCh: klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) - req.Decision.Set(DecisionCancel) + req.decision.Set(decisionCancel) } qs.goroutineDoneOrBlocked() }() @@ -249,30 +285,30 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // Step 4: // The final step in Wait is to wait on a decision from // somewhere and then act on it. - decisionAny := req.Decision.GetLocked() - var decisionStr string - switch d := decisionAny.(type) { - case string: - decisionStr = d + decisionAny := req.decision.GetLocked() + var decision requestDecision + switch dec := decisionAny.(type) { + case requestDecision: + decision = dec default: klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) - decisionStr = DecisionExecute + decision = decisionExecute } - switch decisionStr { - case DecisionReject: + switch decision { + case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) metrics.AddReject(qs.config.Name, "time-out") - case DecisionCancel: + case decisionCancel: qs.syncTimeLocked() // TODO(aaron-prindle) add metrics to these two cases - if req.IsWaiting { + if req.isWaiting { klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) // remove the request from the queue as it has timed out - for i := range req.Queue.Requests { - if req == req.Queue.Requests[i] { + for i := range req.queue.requests { + if req == req.queue.requests[i] { // remove the request - req.Queue.Requests = append(req.Queue.Requests[:i], - req.Queue.Requests[i+1:]...) + req.queue.requests = append(req.queue.requests[:i], + req.queue.requests[i+1:]...) break } } @@ -284,17 +320,15 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) } } - return decisionStr + return decision }() switch decision { - case DecisionTryAnother: + case decisionTryAnother: return true, false, func() {} - case DecisionReject: - return false, false, func() {} - case DecisionCancel: + case decisionReject, decisionCancel: return false, false, func() {} default: - if decision != DecisionExecute { + if decision != decisionExecute { klog.Errorf("Impossible decision %q", decision) } return false, true, func() { @@ -317,9 +351,9 @@ func (qs *queueSet) lockAndSyncTime() { // lock and before modifying the state of any queue. func (qs *queueSet) syncTimeLocked() { realNow := qs.clock.Now() - timesincelast := realNow.Sub(qs.lastRealTime).Seconds() + timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() qs.lastRealTime = realNow - qs.virtualTime += timesincelast * qs.getVirtualTimeRatio() + qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatio() } // getVirtualTimeRatio calculates the rate at which virtual time has @@ -328,8 +362,8 @@ func (qs *queueSet) getVirtualTimeRatio() float64 { activeQueues := 0 reqs := 0 for _, queue := range qs.queues { - reqs += queue.RequestsExecuting - if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 { + reqs += queue.requestsExecuting + if len(queue.requests) > 0 || queue.requestsExecuting > 0 { activeQueues++ } } @@ -361,16 +395,16 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, // Create a request and enqueue req := &request{ - Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), - ArrivalTime: qs.clock.Now(), - Queue: queue, + decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), + arrivalTime: qs.clock.Now(), + queue: queue, descr1: descr1, descr2: descr2, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests)) + metrics.ObserveQueueLength(qs.config.Name, len(queue.requests)) return req } @@ -381,13 +415,13 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte bestQueueLen := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { - thisLen := len(qs.queues[queueIdx].Requests) + thisLen := len(qs.queues[queueIdx].requests) klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) if thisLen < bestQueueLen { bestQueueIdx, bestQueueLen = queueIdx, thisLen } }) - klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting) + klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -396,7 +430,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { timeoutIdx := -1 now := qs.clock.Now() - reqs := queue.Requests + reqs := queue.requests // reqs are sorted oldest -> newest // can short circuit loop (break) if oldest requests are not timing out // as newer requests also will not have timed out @@ -404,8 +438,8 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { // now - requestWaitLimit = waitLimit waitLimit := now.Add(-qs.config.RequestWaitLimit) for i, req := range reqs { - if waitLimit.After(req.ArrivalTime) { - req.Decision.SetLocked(DecisionReject) + if waitLimit.After(req.arrivalTime) { + req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i } else { @@ -417,19 +451,19 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { // timeoutIdx + 1 to remove the last timeout req removeIdx := timeoutIdx + 1 // remove all the timeout requests - queue.Requests = reqs[removeIdx:] + queue.requests = reqs[removeIdx:] // decrement the # of requestsEnqueued - qs.numRequestsEnqueued -= removeIdx + qs.totRequestsWaiting -= removeIdx } } // rejectOrEnqueueLocked rejects or enqueues the newly arrived request if // resource criteria isn't met func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { - queue := request.Queue - curQueueLength := len(queue.Requests) + queue := request.queue + curQueueLength := len(queue.requests) // rejects the newly arrived request if resource criteria not met - if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit && + if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit && curQueueLength >= qs.config.QueueLengthLimit { return false } @@ -440,28 +474,17 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { // enqueues a request into an queueSet func (qs *queueSet) enqueueLocked(request *request) { - queue := request.Queue - if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 { + queue := request.queue + if len(queue.requests) == 0 && queue.requestsExecuting == 0 { // the queue’s virtual start time is set to the virtual time. - queue.VirtualStart = qs.virtualTime + queue.virtualStart = qs.virtualTime if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) - qs.numRequestsEnqueued++ - metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) -} - -// getRequestsExecutingLocked gets the # of requests which are "executing": -// this is the # of requests which have been dispatched but have not -// finished (via the finishRequestLocked method invoked after service) -func (qs *queueSet) getRequestsExecutingLocked() int { - total := 0 - for _, queue := range qs.queues { - total += queue.RequestsExecuting - } - return total + qs.totRequestsWaiting++ + metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -471,50 +494,70 @@ func (qs *queueSet) getRequestsExecutingLocked() int { // queue, increment the count of the number executing, and send true // to the request's channel. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { - for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { - _, ok := qs.dispatchLocked() + for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit { + ok := qs.dispatchLocked() if !ok { break } } } -// dispatchLocked is a convenience method for dequeueing requests that -// require a message to be sent through the requests channel -// this is a required pattern for the QueueSet the queueSet supports -func (qs *queueSet) dispatchLocked() (*request, bool) { +func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request { + now := qs.clock.Now() + req := &request{ + startTime: now, + arrivalTime: now, + descr1: descr1, + descr2: descr2, + } + qs.totRequestsExecuting++ + if klog.V(5) { + klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) + } + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + return req +} + +// dispatchLocked uses the Fair Queuing for Server Requests method to +// select a queue and dispatch the oldest request in that queue. The +// return value indicates whether a request was dispatched; this will +// be false when there are no requests waiting in any queue. +func (qs *queueSet) dispatchLocked() bool { queue := qs.selectQueueLocked() if queue == nil { - return nil, false + return false } request, ok := queue.Dequeue() - if !ok { - return nil, false + if !ok { // This should never happen. But if it does... + return false } - request.StartTime = qs.clock.Now() + request.startTime = qs.clock.Now() // request dequeued, service has started - queue.RequestsExecuting++ - qs.numRequestsEnqueued-- + qs.totRequestsWaiting-- + qs.totRequestsExecuting++ + queue.requestsExecuting++ if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } - // When a request is dequeued for service -> qs.VirtualStart += G - queue.VirtualStart += qs.estimatedServiceTime - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting) - request.Decision.SetLocked(DecisionExecute) - return request, ok + // When a request is dequeued for service -> qs.virtualStart += G + queue.virtualStart += qs.estimatedServiceTime + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + request.decision.SetLocked(decisionExecute) + return ok } -/// selectQueueLocked selects the minimum virtualFinish time from the set of queues -// the starting queue is selected via roundrobin +// selectQueueLocked examines the queues in round robin order and +// returns the first one of those for which the virtual finish time of +// the oldest waiting request is minimal. func (qs *queueSet) selectQueueLocked() *queue { minVirtualFinish := math.Inf(1) var minQueue *queue var minIndex int + nq := len(qs.queues) for range qs.queues { - qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues) + qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] - if len(queue.Requests) != 0 { + if len(queue.requests) != 0 { currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish @@ -546,29 +589,39 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { // previously dispatched request has completed it's service. This // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { - S := qs.clock.Since(r.StartTime).Seconds() + qs.totRequestsExecuting-- + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + + if r.queue == nil { + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) + } + return + } + + S := qs.clock.Since(r.startTime).Seconds() // When a request finishes being served, and the actual service time was S, // the queue’s virtual start time is decremented by G - S. - r.Queue.VirtualStart -= qs.estimatedServiceTime - S + r.queue.virtualStart -= qs.estimatedServiceTime - S // request has finished, remove from requests executing - r.Queue.RequestsExecuting-- + r.queue.requestsExecuting-- if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) } - // Logic to remove quiesced queues - // >= as Index=25 is out of bounds for DesiredNum=25 [0...24] - if r.Queue.Index >= qs.config.DesiredNumQueues && - len(r.Queue.Requests) == 0 && - r.Queue.RequestsExecuting == 0 { - qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index) + // If there are more queues than desired and this one has no + // requests then remove it + if len(qs.queues) > qs.config.DesiredNumQueues && + len(r.queue.requests) == 0 && + r.queue.requestsExecuting == 0 { + qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) // decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues // is the index of the next queue after the one last dispatched from - if qs.robinIndex >= r.Queue.Index { + if qs.robinIndex >= r.queue.index { qs.robinIndex-- } @@ -580,18 +633,18 @@ func (qs *queueSet) finishRequestLocked(r *request) { } // removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice -// and then updates the 'Index' field of the queues to be correct +// and then updates the 'index' field of the queues to be correct func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { keptQueues := append(queues[:index], queues[index+1:]...) for i := index; i < len(keptQueues); i++ { - keptQueues[i].Index-- + keptQueues[i].index-- } return keptQueues } func (qs *queueSet) maybeForkEmptyHandlerLocked() { - if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 && - qs.getRequestsExecutingLocked() == 0 { + if qs.emptyHandler != nil && qs.totRequestsWaiting == 0 && + qs.totRequestsExecuting == 0 { qs.preCreateOrUnblockGoroutine() go func(eh fq.EmptyHandler) { defer runtime.HandleCrash() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index f8d0735ed6a9..024f7805a552 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -197,10 +197,34 @@ func TestDifferentFlows(t *testing.T) { exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, - {2002002002, 4, 15, time.Second, time.Second / 2}, + {2002002002, 5, 15, time.Second, time.Second / 2}, }, time.Second*20, true, true, clk, counter) } +func TestDifferentFlowsWithoutQueuing(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestDifferentFlowsWithoutQueuing", + ConcurrencyLimit: 4, + DesiredNumQueues: 0, + QueueLengthLimit: 6, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ + {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, + {2002002002, 4, 15, time.Second, 750 * time.Millisecond}, + }, time.Second*13, false, false, clk, counter) +} + func TestTimeout(t *testing.T) { now := time.Now() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 581954985cf1..5d0ce2b39e3d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,25 +19,25 @@ package queueset import ( "time" - "k8s.io/apiserver/pkg/util/promise" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// request is a temporary container for "requests" with additional tracking fields -// required for the functionality FQScheduler +// request is a temporary container for "requests" with additional +// tracking fields required for the functionality FQScheduler type request struct { - Queue *queue + queue *queue - // StartTime is the clock time when the request began executing - StartTime time.Time + // startTime is the real time when the request began executing + startTime time.Time - // Decision gets set to the decision about what to do with this request - Decision promise.LockingMutable + // decision gets set to the decision about what to do with this request + decision promise.LockingMutable - // ArrivalTime is when the request entered this system - ArrivalTime time.Time + // arrivalTime is the real time when the request entered this system + arrivalTime time.Time - // IsWaiting indicates whether the request is presently waiting in a queue - IsWaiting bool + // isWaiting indicates whether the request is presently waiting in a queue + isWaiting bool // descr1 and descr2 are not used in any logic but they appear in // log messages @@ -47,31 +47,32 @@ type request struct { // queue is an array of requests with additional metadata required for // the FQScheduler type queue struct { - Requests []*request + requests []*request - // VirtualStart is the virtual time when the oldest request in the - // queue (if there is any) started virtually executing - VirtualStart float64 + // virtualStart is the virtual time (virtual seconds since process + // startup) when the oldest request in the queue (if there is any) + // started virtually executing + virtualStart float64 - RequestsExecuting int - Index int + requestsExecuting int + index int } // Enqueue enqueues a request into the queue func (q *queue) Enqueue(request *request) { - request.IsWaiting = true - q.Requests = append(q.Requests, request) + request.isWaiting = true + q.requests = append(q.requests, request) } // Dequeue dequeues a request from the queue func (q *queue) Dequeue() (*request, bool) { - if len(q.Requests) == 0 { + if len(q.requests) == 0 { return nil, false } - request := q.Requests[0] - q.Requests = q.Requests[1:] + request := q.requests[0] + q.requests = q.requests[1:] - request.IsWaiting = false + request.isWaiting = false return request, true } @@ -81,7 +82,7 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 { // The virtual finish time of request number J in the queue // (counting from J=1 for the head) is J * G + (virtual start time). - // counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1 + // counting from J=1 for the head (eg: queue.requests[0] -> J=1) - J+1 jg := float64(J+1) * float64(G) - return jg + q.VirtualStart + return jg + q.virtualStart }