Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcequota: use contextual logging #113315

Merged
merged 1 commit into from Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/kube-controller-manager/app/core.go
Expand Up @@ -411,6 +411,7 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte
}

func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "resourcequota-controller"))
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
Expand All @@ -429,14 +430,14 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
UpdateFilter: quotainstall.DefaultUpdateFilter(),
}
resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions)
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
if err != nil {
return nil, false, err
}
go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs))

// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)

return nil, true, nil
}
Expand Down
113 changes: 72 additions & 41 deletions pkg/controller/resourcequota/resource_quota_controller.go
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -43,6 +41,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
)

Expand All @@ -51,7 +50,7 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)

// ReplenishmentFunc is a signal that a resource changed in specified namespace
// that may require quota to be recalculated.
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
type ReplenishmentFunc func(ctx context.Context, groupResource schema.GroupResource, namespace string)

// ControllerOptions holds options for creating a quota controller
type ControllerOptions struct {
Expand Down Expand Up @@ -104,7 +103,7 @@ type Controller struct {
}

// NewController creates a quota controller with specified options
func NewController(options *ControllerOptions) (*Controller, error) {
func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) {
// build the resource quota controller
rq := &Controller{
rqClient: options.QuotaClient,
Expand All @@ -118,9 +117,13 @@ func NewController(options *ControllerOptions) (*Controller, error) {
// set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey

logger := klog.FromContext(ctx)

options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: rq.addQuota,
AddFunc: func(obj interface{}) {
rq.addQuota(logger, obj)
},
UpdateFunc: func(old, cur interface{}) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
Expand All @@ -135,12 +138,14 @@ func NewController(options *ControllerOptions) (*Controller, error) {
if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
return
}
rq.addQuota(curResourceQuota)
rq.addQuota(logger, curResourceQuota)
},
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rq.enqueueResourceQuota,
DeleteFunc: func(obj interface{}) {
rq.enqueueResourceQuota(logger, obj)
},
},
rq.resyncPeriod(),
)
Expand All @@ -167,20 +172,23 @@ func NewController(options *ControllerOptions) (*Controller, error) {
return nil, err
}

if err = qm.SyncMonitors(resources); err != nil {
if err = qm.SyncMonitors(ctx, resources); err != nil {
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
}

// only start quota once all informers synced
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool {
return qm.IsSynced(ctx)
})
}

return rq, nil
}

// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
func (rq *Controller) enqueueAll() {
defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
func (rq *Controller) enqueueAll(ctx context.Context) {
logger := klog.FromContext(ctx)
ncdc marked this conversation as resolved.
Show resolved Hide resolved
defer logger.V(4).Info("Resource quota controller queued all resource quota for full calculation of usage")
rqs, err := rq.rqLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
Expand All @@ -197,19 +205,19 @@ func (rq *Controller) enqueueAll() {
}

// obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
func (rq *Controller) enqueueResourceQuota(obj interface{}) {
func (rq *Controller) enqueueResourceQuota(logger klog.Logger, obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
logger.Error(err, "Couldn't get key", "object", obj)
return
}
rq.queue.Add(key)
}

func (rq *Controller) addQuota(obj interface{}) {
func (rq *Controller) addQuota(logger klog.Logger, obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
logger.Error(err, "Couldn't get key", "object", obj)
return
}

Expand Down Expand Up @@ -239,29 +247,37 @@ func (rq *Controller) addQuota(obj interface{}) {
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingInterface) func(context.Context) {
func (rq *Controller) worker(queue workqueue.RateLimitingInterface) func(context.Context) {
workFunc := func(ctx context.Context) bool {
key, quit := queue.Get()
if quit {
return true
}
defer queue.Done(key)

rq.workerLock.RLock()
defer rq.workerLock.RUnlock()

logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "queueKey", key)
ctx = klog.NewContext(ctx, logger)

err := rq.syncHandler(ctx, key.(string))
if err == nil {
queue.Forget(key)
return false
}

utilruntime.HandleError(err)
queue.AddRateLimited(key)

return false
}

return func(ctx context.Context) {
for {
if quit := workFunc(ctx); quit {
klog.Infof("resource quota controller worker shutting down")
klog.FromContext(ctx).Info("resource quota controller worker shutting down")
return
}
}
Expand All @@ -274,11 +290,13 @@ func (rq *Controller) Run(ctx context.Context, workers int) {
defer rq.queue.ShutDown()
defer rq.missingUsageQueue.ShutDown()

klog.Infof("Starting resource quota controller")
defer klog.Infof("Shutting down resource quota controller")
logger := klog.FromContext(ctx)

logger.Info("Starting resource quota controller")
defer logger.Info("Shutting down resource quota controller")

if rq.quotaMonitor != nil {
go rq.quotaMonitor.Run(ctx.Done())
go rq.quotaMonitor.Run(ctx)
}

if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) {
Expand All @@ -287,23 +305,27 @@ func (rq *Controller) Run(ctx context.Context, workers int) {

// the workers that chug through the quota calculation backlog
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, rq.worker(ctx, rq.queue), time.Second)
ncdc marked this conversation as resolved.
Show resolved Hide resolved
go wait.UntilWithContext(ctx, rq.worker(ctx, rq.missingUsageQueue), time.Second)
go wait.UntilWithContext(ctx, rq.worker(rq.queue), time.Second)
go wait.UntilWithContext(ctx, rq.worker(rq.missingUsageQueue), time.Second)
}
// the timer for how often we do a full recalculation across all quotas
if rq.resyncPeriod() > 0 {
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), ctx.Done())
go wait.UntilWithContext(ctx, rq.enqueueAll, rq.resyncPeriod())
} else {
klog.Warningf("periodic quota controller resync disabled")
logger.Info("periodic quota controller resync disabled")
}
<-ctx.Done()
}

// syncResourceQuotaFromKey syncs a quota key
func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
startTime := time.Now()

logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "key", key)

defer func() {
klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime))
logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand All @@ -312,11 +334,11 @@ func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string)
}
resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("Resource quota has been deleted %v", key)
logger.Info("Resource quota has been deleted", "key", key)
return nil
}
if err != nil {
klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
logger.Error(err, "Unable to retrieve resource quota from store", "key", key)
return err
}
return rq.syncResourceQuota(ctx, resourceQuota)
Expand Down Expand Up @@ -374,7 +396,7 @@ func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.R
}

// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespace string) {
func (rq *Controller) replenishQuota(ctx context.Context, groupResource schema.GroupResource, namespace string) {
// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
evaluator := rq.registry.Get(groupResource)
if evaluator == nil {
Expand All @@ -395,22 +417,24 @@ func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespa
return
}

logger := klog.FromContext(ctx)

// only queue those quotas that are tracking a resource associated with this kind.
for i := range resourceQuotas {
resourceQuota := resourceQuotas[i]
resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
rq.enqueueResourceQuota(resourceQuota)
rq.enqueueResourceQuota(logger, resourceQuota)
}
}
}

// Sync periodically resyncs the controller when new resources are observed from discovery.
func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResourcesFunc, period time.Duration) {
// Something has changed, so track the new state and perform a sync.
oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() {
wait.UntilWithContext(ctx, func(ctx context.Context) {
// Get the current resource list from discovery.
newResources, err := GetQuotableResources(discoveryFunc)
if err != nil {
Expand All @@ -427,9 +451,11 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du
}
}

logger := klog.FromContext(ctx)

// Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) {
klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
logger.V(4).Info("no resource updates from discovery, skipping resource quota sync")
return
}

Expand All @@ -439,28 +465,33 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du
defer rq.workerLock.Unlock()

// Something has changed, so track the new state and perform a sync.
if klogV := klog.V(2); klogV.Enabled() {
klogV.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
if loggerV := logger.V(2); loggerV.Enabled() {
loggerV.Info("syncing resource quota controller with updated resources from discovery", "diff", printDiff(oldResources, newResources))
}

// Perform the monitor resync and wait for controllers to report cache sync.
if err := rq.resyncMonitors(newResources); err != nil {
if err := rq.resyncMonitors(ctx, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
// wait for caches to fill for a while (our sync period).
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
if rq.quotaMonitor != nil &&
!cache.WaitForNamedCacheSync(
"resource quota",
waitForStopOrTimeout(ctx.Done(), period),
func() bool { return rq.quotaMonitor.IsSynced(ctx) },
) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
return
}

// success, remember newly synced resources
oldResources = newResources
klog.V(2).Infof("synced quota controller")
}, period, stopCh)
logger.V(2).Info("synced quota controller")
}, period)
}

// printDiff returns a human-readable summary of what resources were added and removed
Expand Down Expand Up @@ -495,15 +526,15 @@ func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan

// resyncMonitors starts or stops quota monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func (rq *Controller) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
func (rq *Controller) resyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
if rq.quotaMonitor == nil {
return nil
}

if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
if err := rq.quotaMonitor.SyncMonitors(ctx, resources); err != nil {
return err
}
rq.quotaMonitor.StartMonitors()
rq.quotaMonitor.StartMonitors(ctx)
return nil
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/resourcequota/resource_quota_controller_test.go
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/quota/v1/install"
)
Expand Down Expand Up @@ -123,7 +124,8 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister
InformersStarted: alwaysStarted,
InformerFactory: informerFactory,
}
qc, err := NewController(resourceQuotaControllerOptions)
_, ctx := ktesting.NewTestContext(t)
qc, err := NewController(ctx, resourceQuotaControllerOptions)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -976,7 +978,8 @@ func TestAddQuota(t *testing.T) {
}

for _, tc := range testCases {
qc.addQuota(tc.quota)
logger, _ := ktesting.NewTestContext(t)
qc.addQuota(logger, tc.quota)
if tc.expectedPriority {
if e, a := 1, qc.missingUsageQueue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
Expand Down Expand Up @@ -1075,7 +1078,8 @@ func TestDiscoverySync(t *testing.T) {
// The 1s sleep in the test allows GetQuotableResources and
// resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up.
go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync)
_, ctx := ktesting.NewTestContext(t)
go qc.Sync(ctx, fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond)

// Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second)
Expand Down