Skip to content

Commit

Permalink
make quota reusable
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Dec 13, 2017
1 parent 38e3351 commit 7dc7693
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 59 deletions.
57 changes: 33 additions & 24 deletions pkg/controller/resourcequota/resource_quota_controller.go
Expand Up @@ -148,28 +148,31 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*Resou
rq.resyncPeriod(),
)

qm := &QuotaMonitor{
informersStarted: options.InformersStarted,
informerFactory: options.InformerFactory,
ignoredResources: options.IgnoredResourcesFunc(),
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
resyncPeriod: options.ReplenishmentResyncPeriod,
replenishmentFunc: rq.replenishQuota,
registry: rq.registry,
}
rq.quotaMonitor = qm
if options.DiscoveryFunc != nil {
qm := &QuotaMonitor{
informersStarted: options.InformersStarted,
informerFactory: options.InformerFactory,
ignoredResources: options.IgnoredResourcesFunc(),
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
resyncPeriod: options.ReplenishmentResyncPeriod,
replenishmentFunc: rq.replenishQuota,
registry: rq.registry,
}

// do initial quota monitor setup
resources, err := GetQuotableResources(options.DiscoveryFunc)
if err != nil {
return nil, err
}
if err = qm.syncMonitors(resources); err != nil {
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
}
rq.quotaMonitor = qm

// do initial quota monitor setup
resources, err := GetQuotableResources(options.DiscoveryFunc)
if err != nil {
return nil, err
}
if err = qm.SyncMonitors(resources); err != nil {
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
}

// only start quota once all informers synced
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
// only start quota once all informers synced
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
}

return rq, nil
}
Expand Down Expand Up @@ -274,7 +277,9 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting resource quota controller")
defer glog.Infof("Shutting down resource quota controller")

go rq.quotaMonitor.Run(stopCh)
if rq.quotaMonitor != nil {
go rq.quotaMonitor.Run(stopCh)
}

if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
return
Expand Down Expand Up @@ -446,7 +451,7 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
if !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
}
}, period, stopCh)
Expand All @@ -455,10 +460,14 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
// resyncMonitors starts or stops quota monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
if err := rq.quotaMonitor.syncMonitors(resources); err != nil {
if rq.quotaMonitor == nil {
return nil
}

if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
return err
}
rq.quotaMonitor.startMonitors()
rq.quotaMonitor.StartMonitors()
return nil
}

Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/resourcequota/resource_quota_monitor.go
Expand Up @@ -100,6 +100,18 @@ type QuotaMonitor struct {
registry quota.Registry
}

func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
return &QuotaMonitor{
informersStarted: informersStarted,
informerFactory: informerFactory,
ignoredResources: ignoredResources,
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
resyncPeriod: resyncPeriod,
replenishmentFunc: replenishmentFunc,
registry: registry,
}
}

// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache.Controller
Expand Down Expand Up @@ -171,13 +183,13 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
}

// syncMonitors rebuilds the monitor set according to the supplied resources,
// SyncMonitors rebuilds the monitor set according to the supplied resources,
// creating or deleting monitors as necessary. It will return any error
// encountered, but will make an attempt to create a monitor for each resource
// instead of immediately exiting on an error. It may be called before or after
// Run. Monitors are NOT started as part of the sync. To ensure all existing
// monitors are started, call startMonitors.
func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
// monitors are started, call StartMonitors.
func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
qm.monitorLock.Lock()
defer qm.monitorLock.Unlock()

Expand Down Expand Up @@ -232,12 +244,12 @@ func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]s
return utilerrors.NewAggregate(errs)
}

// startMonitors ensures the current set of monitors are running. Any newly
// StartMonitors ensures the current set of monitors are running. Any newly
// started monitors will also cause shared informers to be started.
//
// If called before Run, startMonitors does nothing (as there is no stop channel
// If called before Run, StartMonitors does nothing (as there is no stop channel
// to support monitor/informer execution).
func (qm *QuotaMonitor) startMonitors() {
func (qm *QuotaMonitor) StartMonitors() {
qm.monitorLock.Lock()
defer qm.monitorLock.Unlock()

Expand Down Expand Up @@ -295,7 +307,7 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {

// Start monitors and begin change processing until the stop channel is
// closed.
qm.startMonitors()
qm.StartMonitors()
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)

// Stop any running monitors.
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/admission/resourcequota/BUILD
Expand Up @@ -59,6 +59,7 @@ go_test(
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/quota/generic:go_default_library",
"//pkg/quota/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//vendor/github.com/hashicorp/golang-lru:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion plugin/pkg/admission/resourcequota/admission.go
Expand Up @@ -27,6 +27,7 @@ import (
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
resourcequotaapiv1alpha1 "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/v1alpha1"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/validation"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (a *QuotaAdmission) SetInternalKubeInformerFactory(f informers.SharedInform

func (a *QuotaAdmission) SetQuotaConfiguration(c quota.Configuration) {
a.quotaConfiguration = c
a.evaluator = NewQuotaEvaluator(a.quotaAccessor, a.quotaConfiguration, nil, a.config, a.numEvaluators, a.stopCh)
a.evaluator = NewQuotaEvaluator(a.quotaAccessor, a.quotaConfiguration.IgnoredResources(), generic.NewRegistry(a.quotaConfiguration.Evaluators()), nil, a.config, a.numEvaluators, a.stopCh)
}

// ValidateInitialization ensures an authorizer is set.
Expand Down

0 comments on commit 7dc7693

Please sign in to comment.