Skip to content

Commit

Permalink
fairqueuing implementation with unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-prindle committed Nov 12, 2019
1 parent 0968636 commit 24065cf
Show file tree
Hide file tree
Showing 19 changed files with 1,879 additions and 2 deletions.
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions staging/src/k8s.io/apiserver/BUILD
Expand Up @@ -42,6 +42,9 @@ filegroup(
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
Expand Down
3 changes: 2 additions & 1 deletion staging/src/k8s.io/apiserver/go.mod
Expand Up @@ -26,8 +26,9 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.1
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spf13/pflag v1.0.5
Expand Down
23 changes: 23 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/BUILD
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter",
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
@@ -0,0 +1,33 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package counter

// GoRoutineCounter keeps track of the number of active goroutines
// working on/for something. This is a utility that makes such code more
// testable. The code uses this utility to report the number of active
// goroutines to the test code, so that the test code can advance a fake
// clock when and only when the code being tested has finished all
// the work that is ready to do at the present time.
type GoRoutineCounter interface {
// Add adds the given delta to the count of active goroutines.
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
// just before a `select`).
// Call Add(1) just before doing something that unblocks a goroutine that is
// waiting on that something.
Add(delta int)
}
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = [
"interface.go",
"types.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
@@ -0,0 +1,88 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fairqueuing

import (
"context"
"time"
)

// QueueSetFactory is used to create QueueSet objects.
type QueueSetFactory interface {
NewQueueSet(config QueueSetConfig) (QueueSet, error)
}

// QueueSet is the abstraction for the queuing and dispatching
// functionality of one non-exempt priority level. It covers the
// functionality described in the "Assignment to a Queue", "Queuing",
// and "Dispatching" sections of
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
// . Some day we may have connections between priority levels, but
// today is not that day.
type QueueSet interface {
// SetConfiguration updates the configuration
SetConfiguration(QueueSetConfig) error

// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
Quiesce(EmptyHandler)

// Wait uses the given hashValue as the source of entropy
// as it shuffle-shards a request into a queue and waits for
// a decision on what to do with that request. If tryAnother==true
// at return then the QueueSet has become undesirable and the client
// should try to find a different QueueSet to use; execute and
// afterExecution are irrelevant in this case. Otherwise, if execute
// then the client should start executing the request and, once the
// request finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the
// request and afterExecution is irrelevant.
Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func())
}

// QueueSetConfig defines the configuration of a QueueSet.
type QueueSetConfig struct {
// Name is used to identify a queue set, allowing for descriptive information about its intended use
Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says should exist now
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration
}

// EmptyHandler is used to notify the callee when all the queues
// of a QueueSet have been drained.
type EmptyHandler interface {
// HandleEmpty is called to deliver the notification
HandleEmpty()
}
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["queueset.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["queueset_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

0 comments on commit 24065cf

Please sign in to comment.