From f58c2ae62d1988d82386cc542899b71cc164e141 Mon Sep 17 00:00:00 2001 From: zhouhaibing089 Date: Thu, 27 Dec 2018 15:37:15 -0800 Subject: [PATCH] resourcequota: use dynamic informer The resource quota controller should use a dynamic informer so it can create informer for custom resources. --- cmd/kube-controller-manager/app/BUILD | 1 + .../app/controllermanager.go | 31 +++-- cmd/kube-controller-manager/app/core.go | 5 +- cmd/kube-controller-manager/app/core_test.go | 8 +- pkg/controller/BUILD | 3 + pkg/controller/informer_factory.go | 56 +++++++++ pkg/controller/resourcequota/BUILD | 1 - .../resource_quota_controller.go | 9 +- .../resourcequota/resource_quota_monitor.go | 4 +- test/e2e/apimachinery/resource_quota.go | 111 ++++++++++++++++++ 10 files changed, 202 insertions(+), 27 deletions(-) create mode 100644 pkg/controller/informer_factory.go diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 1cfd0bd82a8d..edf90b2b29d4 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -122,6 +122,7 @@ go_library( "//staging/src/k8s.io/client-go/discovery/cached:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7d89676d4058..447b216b7b33 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -43,6 +43,8 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/term" cacheddiscovery "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -234,6 +236,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { } controllerContext.InformerFactory.Start(controllerContext.Stop) + controllerContext.GenericInformerFactory.Start(controllerContext.Stop) close(controllerContext.InformersStarted) select {} @@ -288,6 +291,10 @@ type ControllerContext struct { // InformerFactory gives access to informers for the controller. InformerFactory informers.SharedInformerFactory + // GenericInformerFactory gives access to informers for typed resources + // and dynamic resources. + GenericInformerFactory controller.InformerFactory + // ComponentConfig provides access to init options for a given controller ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration @@ -433,6 +440,9 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien versionedClient := rootClientBuilder.ClientOrDie("shared-informers") sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) + dynamicClient := dynamic.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("dynamic-informers")) + dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, ResyncPeriod(s)()) + // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil { @@ -459,16 +469,17 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien } ctx := ControllerContext{ - ClientBuilder: clientBuilder, - InformerFactory: sharedInformers, - ComponentConfig: s.ComponentConfig, - RESTMapper: restMapper, - AvailableResources: availableResources, - Cloud: cloud, - LoopMode: loopMode, - Stop: stop, - InformersStarted: make(chan struct{}), - ResyncPeriod: ResyncPeriod(s), + ClientBuilder: clientBuilder, + InformerFactory: sharedInformers, + GenericInformerFactory: controller.NewInformerFactory(sharedInformers, dynamicInformers), + ComponentConfig: s.ComponentConfig, + RESTMapper: restMapper, + AvailableResources: availableResources, + Cloud: cloud, + LoopMode: loopMode, + Stop: stop, + InformersStarted: make(chan struct{}), + ResyncPeriod: ResyncPeriod(s), } return ctx, nil } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 7d9d3bfdfd14..bd504b7c33c7 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -23,13 +23,12 @@ package app import ( "fmt" "net" + "net/http" "strings" "time" "k8s.io/klog" - "net/http" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -295,7 +294,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er QuotaClient: resourceQuotaControllerClient.CoreV1(), ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(), ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration), - InformerFactory: ctx.InformerFactory, + InformerFactory: ctx.GenericInformerFactory, ReplenishmentResyncPeriod: ctx.ResyncPeriod, DiscoveryFunc: discoveryFunc, IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, diff --git a/cmd/kube-controller-manager/app/core_test.go b/cmd/kube-controller-manager/app/core_test.go index 3b410e03ae7e..38aef728c7b6 100644 --- a/cmd/kube-controller-manager/app/core_test.go +++ b/cmd/kube-controller-manager/app/core_test.go @@ -121,10 +121,12 @@ func TestController_DiscoveryError(t *testing.T) { testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources} testClientset := NewFakeClientset(testDiscovery) testClientBuilder := TestClientBuilder{clientset: testClientset} + testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)) ctx := ControllerContext{ - ClientBuilder: testClientBuilder, - InformerFactory: informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)), - InformersStarted: make(chan struct{}), + ClientBuilder: testClientBuilder, + InformerFactory: testInformerFactory, + GenericInformerFactory: testInformerFactory, + InformersStarted: make(chan struct{}), } for funcName, controllerInit := range controllerInitFuncMap { _, _, err := controllerInit(ctx) diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index ee926ba27f1f..f16eeee6238a 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -47,6 +47,7 @@ go_library( "controller_ref_manager.go", "controller_utils.go", "doc.go", + "informer_factory.go", "lookup_cache.go", ], importpath = "k8s.io/kubernetes/pkg/controller", @@ -79,6 +80,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", + "//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", diff --git a/pkg/controller/informer_factory.go b/pkg/controller/informer_factory.go new file mode 100644 index 000000000000..f6fb65288d9e --- /dev/null +++ b/pkg/controller/informer_factory.go @@ -0,0 +1,56 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/informers" +) + +// InformerFactory creates informers for each group version resource. +type InformerFactory interface { + ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) + Start(stopCh <-chan struct{}) +} + +type informerFactory struct { + typedInformerFactory informers.SharedInformerFactory + dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory +} + +func (i *informerFactory) ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) { + informer, err := i.typedInformerFactory.ForResource(resource) + if err != nil { + return i.dynamicInformerFactory.ForResource(resource), nil + } + return informer, nil +} + +func (i *informerFactory) Start(stopCh <-chan struct{}) { + i.typedInformerFactory.Start(stopCh) + i.dynamicInformerFactory.Start(stopCh) +} + +// NewInformerFactory creates a new InformerFactory which works with both typed +// resources and dynamic resources +func NewInformerFactory(typedInformerFactory informers.SharedInformerFactory, dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory) InformerFactory { + return &informerFactory{ + typedInformerFactory: typedInformerFactory, + dynamicInformerFactory: dynamicInformerFactory, + } +} diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index cb2d1beac030..b378e4156829 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -32,7 +32,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index bac6b9ebdae2..4c32eb4896f4 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" - "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" @@ -52,12 +51,6 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error) // that may require quota to be recalculated. type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string) -// InformerFactory is all the quota system needs to interface with informers. -type InformerFactory interface { - ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) - Start(stopCh <-chan struct{}) -} - // ResourceQuotaControllerOptions holds options for creating a quota controller type ResourceQuotaControllerOptions struct { // Must have authority to list all quotas, and update quota status @@ -75,7 +68,7 @@ type ResourceQuotaControllerOptions struct { // InformersStarted knows if informers were started. InformersStarted <-chan struct{} // InformerFactory interfaces with informers. - InformerFactory InformerFactory + InformerFactory controller.InformerFactory // Controls full resync of objects monitored for replenishment. ReplenishmentResyncPeriod controller.ResyncPeriodFunc } diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index 08dcc57e6948..34fde92da87a 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -86,7 +86,7 @@ type QuotaMonitor struct { resourceChanges workqueue.RateLimitingInterface // interfaces with informers - informerFactory InformerFactory + informerFactory controller.InformerFactory // list of resources to ignore ignoredResources map[schema.GroupResource]struct{} @@ -101,7 +101,7 @@ type QuotaMonitor struct { registry quota.Registry } -func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor { +func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory controller.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor { return &QuotaMonitor{ informersStarted: informersStarted, informerFactory: informerFactory, diff --git a/test/e2e/apimachinery/resource_quota.go b/test/e2e/apimachinery/resource_quota.go index 0bb69bbd02e1..4fa0e03a0723 100644 --- a/test/e2e/apimachinery/resource_quota.go +++ b/test/e2e/apimachinery/resource_quota.go @@ -27,11 +27,13 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/quota/v1/evaluator/core" "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/utils/crd" imageutils "k8s.io/kubernetes/test/utils/image" . "github.com/onsi/ginkgo" @@ -487,6 +489,89 @@ var _ = SIGDescribe("ResourceQuota", func() { Expect(err).NotTo(HaveOccurred()) }) + It("should create a ResourceQuota and capture the life of a custom resource.", func() { + By("Creating a Custom Resource Definition") + testcrd, err := crd.CreateTestCRD(f) + Expect(err).NotTo(HaveOccurred()) + defer testcrd.CleanUp() + countResourceName := "count/" + testcrd.Crd.Spec.Names.Plural + "." + testcrd.Crd.Spec.Group + // resourcequota controller needs to take 30 seconds at most to detect the new custom resource. + // in order to make sure the resourcequota controller knows this resource, we create one test + // resourcequota object, and triggering updates on it until the status is updated. + quotaName := "quota-for-" + testcrd.Crd.Spec.Names.Plural + resourceQuota, err := createResourceQuota(f.ClientSet, f.Namespace.Name, &v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: quotaName}, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourceName(countResourceName): resource.MustParse("0"), + }, + }, + }) + err = updateResourceQuotaUntilUsageAppears(f.ClientSet, f.Namespace.Name, quotaName, v1.ResourceName(countResourceName)) + Expect(err).NotTo(HaveOccurred()) + err = f.ClientSet.CoreV1().ResourceQuotas(f.Namespace.Name).Delete(quotaName, nil) + Expect(err).NotTo(HaveOccurred()) + + By("Counting existing ResourceQuota") + c, err := countResourceQuota(f.ClientSet, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + + By("Creating a ResourceQuota") + quotaName = "test-quota" + resourceQuota = newTestResourceQuota(quotaName) + resourceQuota.Spec.Hard[v1.ResourceName(countResourceName)] = resource.MustParse("1") + resourceQuota, err = createResourceQuota(f.ClientSet, f.Namespace.Name, resourceQuota) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring resource quota status is calculated") + usedResources := v1.ResourceList{} + usedResources[v1.ResourceQuotas] = resource.MustParse(strconv.Itoa(c + 1)) + usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Creating a custom resource") + resourceClient := testcrd.GetV1DynamicClient() + testcr, err := instantiateCustomResource(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0], + "kind": testcrd.Crd.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": "test-cr-1", + }, + }, + }, resourceClient, testcrd.Crd) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring resource quota status captures custom resource creation") + usedResources = v1.ResourceList{} + usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("1") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Creating a second custom resource") + _, err = instantiateCustomResource(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0], + "kind": testcrd.Crd.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": "test-cr-2", + }, + }, + }, resourceClient, testcrd.Crd) + // since we only give one quota, this creation should fail. + Expect(err).To(HaveOccurred()) + + By("Deleting a custom resource") + err = deleteCustomResource(resourceClient, testcr.GetName()) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring resource quota status released usage") + usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + }) + It("should verify ResourceQuota with terminating scopes.", func() { By("Creating a ResourceQuota with terminating scope") quotaTerminatingName := "quota-terminating" @@ -1524,3 +1609,29 @@ func waitForResourceQuota(c clientset.Interface, ns, quotaName string, used v1.R return true, nil }) } + +// updateResourceQuotaUntilUsageAppears updates the resource quota object until the usage is populated +// for the specific resource name. +func updateResourceQuotaUntilUsageAppears(c clientset.Interface, ns, quotaName string, resourceName v1.ResourceName) error { + return wait.Poll(framework.Poll, 1*time.Minute, func() (bool, error) { + resourceQuota, err := c.CoreV1().ResourceQuotas(ns).Get(quotaName, metav1.GetOptions{}) + if err != nil { + return false, err + } + // verify that the quota shows the expected used resource values + _, ok := resourceQuota.Status.Used[resourceName] + if ok { + return true, nil + } + + current := resourceQuota.Spec.Hard[resourceName] + current.Add(resource.MustParse("1")) + resourceQuota.Spec.Hard[resourceName] = current + _, err = c.CoreV1().ResourceQuotas(ns).Update(resourceQuota) + // ignoring conflicts since someone else may already updated it. + if errors.IsConflict(err) { + return false, nil + } + return false, err + }) +}