Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a separate queue for initial quota calculation #29133

Merged
merged 1 commit into from Jul 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 52 additions & 13 deletions pkg/controller/resourcequota/resource_quota_controller.go
Expand Up @@ -63,6 +63,8 @@ type ResourceQuotaController struct {
rqController *framework.Controller
// ResourceQuota objects that need to be synchronized
queue workqueue.RateLimitingInterface
// missingUsageQueue holds objects that are missing the initial usage informatino
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: information

missingUsageQueue workqueue.RateLimitingInterface
// To allow injection of syncUsage for testing.
syncHandler func(key string) error
// function that controls full recalculation of quota usage
Expand All @@ -78,6 +80,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
rq := &ResourceQuotaController{
kubeClient: options.KubeClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []framework.ControllerInterface{},
Expand All @@ -101,7 +104,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
&api.ResourceQuota{},
rq.resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rq.enqueueResourceQuota,
AddFunc: rq.addQuota,
UpdateFunc: func(old, cur interface{}) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
Expand All @@ -116,7 +119,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
return
}
rq.enqueueResourceQuota(curResourceQuota)
rq.addQuota(curResourceQuota)
},
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
Expand Down Expand Up @@ -160,28 +163,63 @@ func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
rq.queue.Add(key)
}

func (rq *ResourceQuotaController) addQuota(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}

resourceQuota := obj.(*api.ResourceQuota)

// if we declared an intent that is not yet captured in status (prioritize it)
if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
rq.missingUsageQueue.Add(key)
return
}

// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
for constraint := range resourceQuota.Status.Hard {
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
matchedResources := []api.ResourceName{constraint}

for _, evaluator := range rq.registry.Evaluators() {
if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len(intersection) != 0 {
rq.missingUsageQueue.Add(key)
return
}
}
}
}

// no special priority, go in normal recalc queue
rq.queue.Add(key)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rq *ResourceQuotaController) worker() {
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
workFunc := func() bool {
key, quit := rq.queue.Get()
key, quit := queue.Get()
if quit {
return true
}
defer rq.queue.Done(key)
defer queue.Done(key)
err := rq.syncHandler(key.(string))
if err == nil {
rq.queue.Forget(key)
queue.Forget(key)
return false
}
utilruntime.HandleError(err)
rq.queue.AddRateLimited(key)
queue.AddRateLimited(key)
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down")
return

return func() {
for {
if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down")
return
}
}
}
}
Expand All @@ -196,7 +234,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
}
// the workers that chug through the quota calculation backlog
for i := 0; i < workers; i++ {
go wait.Until(rq.worker, time.Second, stopCh)
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this break some external contract? should a worker first look at the priority queue and if empty then fall back to the normal queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this break some external contract? should a worker first look at the priority queue and if empty then fall back to the normal queue?

I don't think so. It's not a strict "only start this number of threads", it's more a "only do this amount of threadiness".

There's no clean way to do what you've suggested with a work queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
}
// the timer for how often we do a full recalculation across all quotas
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
Expand Down
157 changes: 157 additions & 0 deletions pkg/controller/resourcequota/resource_quota_controller_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/util/sets"
)
Expand Down Expand Up @@ -308,3 +309,159 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
}

func TestAddQuota(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient),
GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)

delete(quotaController.registry.(*generic.GenericRegistry).InternalEvaluators, api.Kind("Service"))

testCases := []struct {
name string

quota *api.ResourceQuota
expectedPriority bool
}{
{
name: "no status",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
},
},
{
name: "status, no usage",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
},
},
{
name: "status, mismatch",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("6"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("0"),
},
},
},
},
{
name: "status, missing usage, but don't care",
expectedPriority: false,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceServices: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceServices: resource.MustParse("4"),
},
},
},
},
{
name: "ready",
expectedPriority: false,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("0"),
},
},
},
},
}

for _, tc := range testCases {
quotaController.addQuota(tc.quota)
if tc.expectedPriority {
if e, a := 1, quotaController.missingUsageQueue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
if e, a := 0, quotaController.queue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
} else {
if e, a := 0, quotaController.missingUsageQueue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
if e, a := 1, quotaController.queue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
}

for quotaController.missingUsageQueue.Len() > 0 {
key, _ := quotaController.missingUsageQueue.Get()
quotaController.missingUsageQueue.Done(key)
}
for quotaController.queue.Len() > 0 {
key, _ := quotaController.queue.Get()
quotaController.queue.Done(key)
}
}
}