Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fair queuing for server requests #85192

Merged
merged 6 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ filegroup(
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",
Expand Down
3 changes: 2 additions & 1 deletion staging/src/k8s.io/apiserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.1
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spf13/pflag v1.0.5
Expand Down
23 changes: 23 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter",
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package counter

// GoRoutineCounter keeps track of the number of active goroutines
// working on/for something. This is a utility that makes such code more
// testable. The code uses this utility to report the number of active
// goroutines to the test code, so that the test code can advance a fake
// clock when and only when the code being tested has finished all
// the work that is ready to do at the present time.
type GoRoutineCounter interface {
// Add adds the given delta to the count of active goroutines.
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
// just before a `select`).
// Call Add(1) just before doing something that unblocks a goroutine that is
// waiting on that something.
Add(delta int)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fairqueuing

import (
"context"
"time"
)

// QueueSetFactory is used to create QueueSet objects.
type QueueSetFactory interface {
NewQueueSet(config QueueSetConfig) (QueueSet, error)
}

// QueueSet is the abstraction for the queuing and dispatching
// functionality of one non-exempt priority level. It covers the
// functionality described in the "Assignment to a Queue", "Queuing",
// and "Dispatching" sections of
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
// . Some day we may have connections between priority levels, but
// today is not that day.
type QueueSet interface {
// SetConfiguration updates the configuration
SetConfiguration(QueueSetConfig) error

// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but does not -> and does not

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised at this suggestion. I thought "but" was good because it highlights the contrast with the normal behavior. What is the thinking behind preferring "and"?

Copy link
Contributor

@tedyu tedyu Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is how I understand the two actions: draining implies not admitting any more request - otherwise the draining may never end.
Hence the 'and'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"but" reads much more naturally. The draining is normal, but the admission is not.

// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
Quiesce(EmptyHandler)

// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision
// on what to do with that request. The descr1 and descr2 values
// play no role in the logic but appear in log messages. If
// tryAnother==true at return then the QueueSet has become
// undesirable and the client should try to find a different
// QueueSet to use; execute and afterExecution are irrelevant in
// this case. Otherwise, if execute then the client should start
// executing the request and, once the request finishes execution
// or is canceled, call afterExecution(). Otherwise the client
// should not execute the request and afterExecution is
// irrelevant.
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
}

// QueueSetConfig defines the configuration of a QueueSet.
type QueueSetConfig struct {
// Name is used to identify a queue set, allowing for descriptive information about its intended use
Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says should exist now
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration
}

// EmptyHandler is used to notify the callee when all the queues
// of a QueueSet have been drained.
type EmptyHandler interface {
// HandleEmpty is called to deliver the notification
HandleEmpty()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"doc.go",
"queueset.go",
"types.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["queueset_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queueset

// This package implements a technique called "fair queuing for server
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order for this comment to show up in godoc, it must immediately preceed the "package" line, and it must be a single comment block (no blank lines without initial //).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing in upcoming PR.

// requests". One QueueSet is a set of queues operating according to
// this technique.

// Fair queuing for server requests is inspired by the fair queuing
// technique from the world of networking. You can find a good paper
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
// and there is an implementation outline in the Wikipedia article at
// https://en.wikipedia.org/wiki/Fair_queuing .

// Fair queuing for server requests differs from traditional fair
// queuing in three ways: (1) we are dispatching requests to be
// executed within a process rather than transmitting packets on a
// network link, (2) multiple requests can be executing at once, and
// (3) the service time (execution duration) is not known until the
// execution completes.

// The first two differences can easily be handled by straightforward
// adaptation of the concept called "R(t)" in the original paper and
// "virtual time" in the implementation outline. In that
// implementation outline, the notation now() is used to mean reading
// the virtual clock. In the original paper’s terms, "R(t)" is the
// number of "rounds" that have been completed at real time t, where a
// round consists of virtually transmitting one bit from every
// non-empty queue in the router (regardless of which queue holds the
// packet that is really being transmitted at the moment); in this
// conception, a packet is considered to be "in" its queue until the
// packet’s transmission is finished. For our problem, we can define a
// round to be giving one nanosecond of CPU to every non-empty queue
// in the apiserver (where emptiness is judged based on both queued
// and executing requests from that queue), and define R(t) = (server
// start time) + (1 ns) * (number of rounds since server start). Let
// us write NEQ(t) for that number of non-empty queues in the
// apiserver at time t. Let us also write C for the concurrency
// limit. In the original paper, the partial derivative of R(t) with
// respect to t is
//
// 1 / NEQ(t) .

// To generalize from transmitting one packet at a time to executing C
// requests at a time, that derivative becomes
//
// C / NEQ(t) .

// However, sometimes there are fewer than C requests available to
// execute. For a given queue "q", let us also write "reqs(q, t)" for
// the number of requests of that queue that are executing at that
// time. The total number of requests executing is sum[over q]
// reqs(q, t) and if that is less than C then virtual time is not
// advancing as fast as it would if all C seats were occupied; in this
// case the numerator of the quotient in that derivative should be
// adjusted proportionally. Putting it all together for fair queing
// for server requests: at a particular time t, the partial derivative
// of R(t) with respect to t is
//
// min( C, sum[over q] reqs(q, t) ) / NEQ(t) .
//
// In terms of the implementation outline, this is the rate at which
// virtual time is advancing at time t (in virtual nanoseconds per
// real nanosecond). Where the networking implementation outline adds
// packet size to a virtual time, in our version this corresponds to
// adding a service time (i.e., duration) to virtual time.

// The third difference is handled by modifying the algorithm to
// dispatch based on an initial guess at the request’s service time
// (duration) and then make the corresponding adjustments once the
// request’s actual service time is known. This is similar, although
// not exactly isomorphic, to the original paper’s adjustment by
// `$delta` for the sake of promptness.

// For implementation simplicity (see below), let us use the same
// initial service time guess for every request; call that duration
// G. A good choice might be the service time limit (1
// minute). Different guesses will give slightly different dynamics,
// but any positive number can be used for G without ruining the
// long-term behavior.

// As in ordinary fair queuing, there is a bound on divergence from
// the ideal. In plain fair queuing the bound is one packet; in our
// version it is C requests.

// To support efficiently making the necessary adjustments once a
// request’s actual service time is known, the virtual finish time of
// a request and the last virtual finish time of a queue are not
// represented directly but instead computed from queue length,
// request position in the queue, and an alternate state variable that
// holds the queue’s virtual start time. While the queue is empty and
// has no requests executing: the value of its virtual start time
// variable is ignored and its last virtual finish time is considered
// to be in the virtual past. When a request arrives to an empty queue
// with no requests executing, the queue’s virtual start time is set
// to the current virtual time. The virtual finish time of request
// number J in the queue (counting from J=1 for the head) is J * G +
// (queue's virtual start time). While the queue is non-empty: the
// last virtual finish time of the queue is the virtual finish time of
// the last request in the queue. While the queue is empty and has a
// request executing: the last virtual finish time is the queue’s
// virtual start time. When a request is dequeued for service the
// queue’s virtual start time is advanced by G. When a request
// finishes being served, and the actual service time was S, the
// queue’s virtual start time is decremented by G - S.
Loading