Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcequota: use dynamic informer #72384

Merged
merged 1 commit into from Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/BUILD
Expand Up @@ -122,6 +122,7 @@ go_library(
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
Expand Down
31 changes: 21 additions & 10 deletions cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -43,6 +43,8 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/term"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -234,6 +236,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
}

controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)

select {}
Expand Down Expand Up @@ -288,6 +291,10 @@ type ControllerContext struct {
// InformerFactory gives access to informers for the controller.
InformerFactory informers.SharedInformerFactory

// GenericInformerFactory gives access to informers for typed resources
// and dynamic resources.
GenericInformerFactory controller.InformerFactory

// ComponentConfig provides access to init options for a given controller
ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration

Expand Down Expand Up @@ -433,6 +440,9 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

dynamicClient := dynamic.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("dynamic-informers"))
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, ResyncPeriod(s)())

// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
Expand All @@ -459,16 +469,17 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
}

ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
GenericInformerFactory: controller.NewInformerFactory(sharedInformers, dynamicInformers),
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
return ctx, nil
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/kube-controller-manager/app/core.go
Expand Up @@ -23,13 +23,12 @@ package app
import (
"fmt"
"net"
"net/http"
"strings"
"time"

"k8s.io/klog"

"net/http"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -295,7 +294,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er
QuotaClient: resourceQuotaControllerClient.CoreV1(),
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
InformerFactory: ctx.InformerFactory,
InformerFactory: ctx.GenericInformerFactory,
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
DiscoveryFunc: discoveryFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
Expand Down
8 changes: 5 additions & 3 deletions cmd/kube-controller-manager/app/core_test.go
Expand Up @@ -121,10 +121,12 @@ func TestController_DiscoveryError(t *testing.T) {
testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources}
testClientset := NewFakeClientset(testDiscovery)
testClientBuilder := TestClientBuilder{clientset: testClientset}
testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1))
ctx := ControllerContext{
ClientBuilder: testClientBuilder,
InformerFactory: informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)),
InformersStarted: make(chan struct{}),
ClientBuilder: testClientBuilder,
InformerFactory: testInformerFactory,
GenericInformerFactory: testInformerFactory,
InformersStarted: make(chan struct{}),
}
for funcName, controllerInit := range controllerInitFuncMap {
_, _, err := controllerInit(ctx)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/BUILD
Expand Up @@ -47,6 +47,7 @@ go_library(
"controller_ref_manager.go",
"controller_utils.go",
"doc.go",
"informer_factory.go",
"lookup_cache.go",
],
importpath = "k8s.io/kubernetes/pkg/controller",
Expand Down Expand Up @@ -79,6 +80,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand Down
56 changes: 56 additions & 0 deletions pkg/controller/informer_factory.go
@@ -0,0 +1,56 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
)

// InformerFactory creates informers for each group version resource.
type InformerFactory interface {
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
Start(stopCh <-chan struct{})
}

type informerFactory struct {
typedInformerFactory informers.SharedInformerFactory
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
}

func (i *informerFactory) ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) {
informer, err := i.typedInformerFactory.ForResource(resource)
if err != nil {
return i.dynamicInformerFactory.ForResource(resource), nil
}
return informer, nil
}

func (i *informerFactory) Start(stopCh <-chan struct{}) {
i.typedInformerFactory.Start(stopCh)
i.dynamicInformerFactory.Start(stopCh)
}

// NewInformerFactory creates a new InformerFactory which works with both typed
// resources and dynamic resources
func NewInformerFactory(typedInformerFactory informers.SharedInformerFactory, dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory) InformerFactory {
return &informerFactory{
typedInformerFactory: typedInformerFactory,
dynamicInformerFactory: dynamicInformerFactory,
}
}
1 change: 0 additions & 1 deletion pkg/controller/resourcequota/BUILD
Expand Up @@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
Expand Down
9 changes: 1 addition & 8 deletions pkg/controller/resourcequota/resource_quota_controller.go
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -52,12 +51,6 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
// that may require quota to be recalculated.
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)

// InformerFactory is all the quota system needs to interface with informers.
type InformerFactory interface {
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
Start(stopCh <-chan struct{})
}

// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
Expand All @@ -75,7 +68,7 @@ type ResourceQuotaControllerOptions struct {
// InformersStarted knows if informers were started.
InformersStarted <-chan struct{}
// InformerFactory interfaces with informers.
InformerFactory InformerFactory
InformerFactory controller.InformerFactory
// Controls full resync of objects monitored for replenishment.
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/resourcequota/resource_quota_monitor.go
Expand Up @@ -86,7 +86,7 @@ type QuotaMonitor struct {
resourceChanges workqueue.RateLimitingInterface

// interfaces with informers
informerFactory InformerFactory
informerFactory controller.InformerFactory

// list of resources to ignore
ignoredResources map[schema.GroupResource]struct{}
Expand All @@ -101,7 +101,7 @@ type QuotaMonitor struct {
registry quota.Registry
}

func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory controller.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
return &QuotaMonitor{
informersStarted: informersStarted,
informerFactory: informerFactory,
Expand Down
111 changes: 111 additions & 0 deletions test/e2e/apimachinery/resource_quota.go
Expand Up @@ -27,11 +27,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/crd"
imageutils "k8s.io/kubernetes/test/utils/image"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -487,6 +489,89 @@ var _ = SIGDescribe("ResourceQuota", func() {
Expect(err).NotTo(HaveOccurred())
})

It("should create a ResourceQuota and capture the life of a custom resource.", func() {
By("Creating a Custom Resource Definition")
testcrd, err := crd.CreateTestCRD(f)
Expect(err).NotTo(HaveOccurred())
defer testcrd.CleanUp()
countResourceName := "count/" + testcrd.Crd.Spec.Names.Plural + "." + testcrd.Crd.Spec.Group
// resourcequota controller needs to take 30 seconds at most to detect the new custom resource.
// in order to make sure the resourcequota controller knows this resource, we create one test
// resourcequota object, and triggering updates on it until the status is updated.
quotaName := "quota-for-" + testcrd.Crd.Spec.Names.Plural
resourceQuota, err := createResourceQuota(f.ClientSet, f.Namespace.Name, &v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: quotaName},
Spec: v1.ResourceQuotaSpec{
Hard: v1.ResourceList{
v1.ResourceName(countResourceName): resource.MustParse("0"),
},
},
})
err = updateResourceQuotaUntilUsageAppears(f.ClientSet, f.Namespace.Name, quotaName, v1.ResourceName(countResourceName))
Expect(err).NotTo(HaveOccurred())
err = f.ClientSet.CoreV1().ResourceQuotas(f.Namespace.Name).Delete(quotaName, nil)
Expect(err).NotTo(HaveOccurred())

By("Counting existing ResourceQuota")
c, err := countResourceQuota(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())

By("Creating a ResourceQuota")
quotaName = "test-quota"
resourceQuota = newTestResourceQuota(quotaName)
resourceQuota.Spec.Hard[v1.ResourceName(countResourceName)] = resource.MustParse("1")
resourceQuota, err = createResourceQuota(f.ClientSet, f.Namespace.Name, resourceQuota)
Expect(err).NotTo(HaveOccurred())

By("Ensuring resource quota status is calculated")
usedResources := v1.ResourceList{}
usedResources[v1.ResourceQuotas] = resource.MustParse(strconv.Itoa(c + 1))
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())

By("Creating a custom resource")
resourceClient := testcrd.GetV1DynamicClient()
testcr, err := instantiateCustomResource(&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
"kind": testcrd.Crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": "test-cr-1",
},
},
}, resourceClient, testcrd.Crd)
Expect(err).NotTo(HaveOccurred())

By("Ensuring resource quota status captures custom resource creation")
usedResources = v1.ResourceList{}
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("1")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a test case that 'not allow creating more CRs that exceeds the remaining quota'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure, this is also added.

By("Creating a second custom resource")
_, err = instantiateCustomResource(&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
"kind": testcrd.Crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": "test-cr-2",
},
},
}, resourceClient, testcrd.Crd)
// since we only give one quota, this creation should fail.
Expect(err).To(HaveOccurred())

By("Deleting a custom resource")
err = deleteCustomResource(resourceClient, testcr.GetName())
Expect(err).NotTo(HaveOccurred())

By("Ensuring resource quota status released usage")
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())
})

It("should verify ResourceQuota with terminating scopes.", func() {
By("Creating a ResourceQuota with terminating scope")
quotaTerminatingName := "quota-terminating"
Expand Down Expand Up @@ -1524,3 +1609,29 @@ func waitForResourceQuota(c clientset.Interface, ns, quotaName string, used v1.R
return true, nil
})
}

// updateResourceQuotaUntilUsageAppears updates the resource quota object until the usage is populated
// for the specific resource name.
func updateResourceQuotaUntilUsageAppears(c clientset.Interface, ns, quotaName string, resourceName v1.ResourceName) error {
return wait.Poll(framework.Poll, 1*time.Minute, func() (bool, error) {
resourceQuota, err := c.CoreV1().ResourceQuotas(ns).Get(quotaName, metav1.GetOptions{})
if err != nil {
return false, err
}
// verify that the quota shows the expected used resource values
_, ok := resourceQuota.Status.Used[resourceName]
if ok {
return true, nil
}

current := resourceQuota.Spec.Hard[resourceName]
current.Add(resource.MustParse("1"))
liggitt marked this conversation as resolved.
Show resolved Hide resolved
resourceQuota.Spec.Hard[resourceName] = current
_, err = c.CoreV1().ResourceQuotas(ns).Update(resourceQuota)
// ignoring conflicts since someone else may already updated it.
if errors.IsConflict(err) {
return false, nil
}
return false, err
})
}