Skip to content

Commit

Permalink
wire context
Browse files Browse the repository at this point in the history
  • Loading branch information
p0lyn0mial committed Mar 25, 2020
1 parent ee76f7c commit 1bc63ac
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 205 deletions.
10 changes: 7 additions & 3 deletions pkg/operator/latestrevisionclient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package operator

import (
"context"

operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,18 +24,20 @@ type OpenshiftDeploymentLatestRevisionClient struct {
var _ revisioncontroller.LatestRevisionClient = OpenshiftDeploymentLatestRevisionClient{}

func (c OpenshiftDeploymentLatestRevisionClient) GetLatestRevisionState() (*operatorv1.OperatorSpec, *operatorv1.OperatorStatus, int32, string, error) {
o, err := c.TypedClient.OpenShiftAPIServers().Get("cluster", metav1.GetOptions{})
ctx := context.TODO() // needs support in library-go
o, err := c.TypedClient.OpenShiftAPIServers().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return nil, nil, 0, "", err
}
return &o.Spec.OperatorSpec, &o.Status.OperatorStatus, o.Status.LatestAvailableRevision, o.ResourceVersion, nil
}

func (c OpenshiftDeploymentLatestRevisionClient) UpdateLatestRevisionOperatorStatus(latestAvailableRevision int32, updateFuncs ...v1helpers.UpdateStatusFunc) (*operatorv1.OperatorStatus, bool, error) {
ctx := context.TODO() // needs support in library-go
updated := false
var updatedOperatorStatus *operatorv1.OperatorStatus
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
old, err := c.TypedClient.OpenShiftAPIServers().Get("cluster", metav1.GetOptions{})
old, err := c.TypedClient.OpenShiftAPIServers().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -52,7 +56,7 @@ func (c OpenshiftDeploymentLatestRevisionClient) UpdateLatestRevisionOperatorSta
return nil
}

modified, err = c.TypedClient.OpenShiftAPIServers().UpdateStatus(modified)
modified, err = c.TypedClient.OpenShiftAPIServers().UpdateStatus(ctx, modified, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand Down
94 changes: 11 additions & 83 deletions pkg/operator/nsfinalizercontroller/finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsfinalizercontroller
import (
"context"
"fmt"
"github.com/openshift/library-go/pkg/controller/factory"
"reflect"
"time"

Expand All @@ -11,15 +12,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
appsv1lister "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

type finalizerController struct {
Expand All @@ -29,12 +25,6 @@ type finalizerController struct {
namespaceGetter v1.NamespacesGetter
podLister corev1listers.PodLister
deployLister appsv1lister.DeploymentLister
eventRecorder events.Recorder

preRunHasSynced []cache.InformerSynced

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}

// NewFinalizerController is here because
Expand All @@ -49,7 +39,7 @@ func NewFinalizerController(
kubeInformersForTargetNamespace kubeinformers.SharedInformerFactory,
namespaceGetter v1.NamespacesGetter,
eventRecorder events.Recorder,
) *finalizerController {
) factory.Controller {
fullname := "NamespaceFinalizerController_" + namespaceName
c := &finalizerController{
name: fullname,
Expand All @@ -58,23 +48,16 @@ func NewFinalizerController(
namespaceGetter: namespaceGetter,
podLister: kubeInformersForTargetNamespace.Core().V1().Pods().Lister(),
deployLister: kubeInformersForTargetNamespace.Apps().V1().Deployments().Lister(),
eventRecorder: eventRecorder.WithComponentSuffix("finalizer-controller"),

preRunHasSynced: []cache.InformerSynced{
kubeInformersForTargetNamespace.Core().V1().Pods().Informer().HasSynced,
kubeInformersForTargetNamespace.Apps().V1().Deployments().Informer().HasSynced,
},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fullname),
}

kubeInformersForTargetNamespace.Core().V1().Pods().Informer().AddEventHandler(c.eventHandler())
kubeInformersForTargetNamespace.Apps().V1().Deployments().Informer().AddEventHandler(c.eventHandler())

return c
return factory.New().WithInformers(
kubeInformersForTargetNamespace.Core().V1().Pods().Informer(),
kubeInformersForTargetNamespace.Apps().V1().Deployments().Informer(),
).ResyncEvery(time.Minute*5).WithSync(c.sync).ToController(fullname, eventRecorder.WithComponentSuffix("finalizer-controller"))
}

func (c finalizerController) sync() error {
ns, err := c.namespaceGetter.Namespaces().Get(c.namespaceName, metav1.GetOptions{})
func (c finalizerController) sync(ctx context.Context, syncContext factory.SyncContext) error {
ns, err := c.namespaceGetter.Namespaces().Get(ctx, c.namespaceName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}
Expand All @@ -89,7 +72,7 @@ func (c finalizerController) sync() error {
// TODO now that we have conditions, we may be able to check specific conditions
deletedMoreThanAMinute := ns.DeletionTimestamp.Time.Add(1 * time.Minute).Before(time.Now())
if !deletedMoreThanAMinute {
c.queue.AddAfter(c.namespaceName, 1*time.Minute)
syncContext.Queue().AddAfter(c.namespaceName, 1*time.Minute)
return nil
}

Expand Down Expand Up @@ -120,62 +103,7 @@ func (c finalizerController) sync() error {
}
ns.Spec.Finalizers = newFinalizers

c.eventRecorder.Event("NamespaceFinalization", fmt.Sprintf("clearing namespace finalizer on %q", c.namespaceName))
_, err = c.namespaceGetter.Namespaces().Finalize(ns)
syncContext.Recorder().Event("NamespaceFinalization", fmt.Sprintf("clearing namespace finalizer on %q", c.namespaceName))
_, err = c.namespaceGetter.Namespaces().Finalize(ctx, ns, metav1.UpdateOptions{})
return err
}

// Run starts the openshift-apiserver and blocks until stopCh is closed.
func (c *finalizerController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting %v", c.name)
defer klog.Infof("Shutting down %v", c.name)

if !cache.WaitForCacheSync(ctx.Done(), c.preRunHasSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
}

// always kick at least once in case we started after the namespace was cleared
c.queue.Add(c.namespaceName)

// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-ctx.Done()
}

func (c *finalizerController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *finalizerController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.sync()
if err == nil {
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}

// eventHandler queues the operator to check spec and status
func (c *finalizerController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(c.namespaceName) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(c.namespaceName) },
DeleteFunc: func(obj interface{}) { c.queue.Add(c.namespaceName) },
}
}
7 changes: 5 additions & 2 deletions pkg/operator/operatorclient/operatorclient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package operatorclient

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

operatorv1 "github.com/openshift/api/operator/v1"
Expand Down Expand Up @@ -35,7 +38,7 @@ func (c *OperatorClient) UpdateOperatorSpec(resourceVersion string, spec *operat
copy.ResourceVersion = resourceVersion
copy.Spec.OperatorSpec = *spec

ret, err := c.Client.OpenShiftAPIServers().Update(copy)
ret, err := c.Client.OpenShiftAPIServers().Update(context.TODO(), copy, metav1.UpdateOptions{})
if err != nil {
return nil, "", err
}
Expand All @@ -51,7 +54,7 @@ func (c *OperatorClient) UpdateOperatorStatus(resourceVersion string, status *op
copy.ResourceVersion = resourceVersion
copy.Status.OperatorStatus = *status

ret, err := c.Client.OpenShiftAPIServers().UpdateStatus(copy)
ret, err := c.Client.OpenShiftAPIServers().UpdateStatus(context.TODO(), copy, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
Expand Down
97 changes: 14 additions & 83 deletions pkg/operator/prunecontroller/prune_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,18 @@ package prune

import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1informer "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"sort"
"strconv"
"strings"

encryptionsecrets "github.com/openshift/library-go/pkg/operator/encryption/secrets"
"github.com/openshift/library-go/pkg/operator/events"
Expand All @@ -36,15 +30,10 @@ type PruneController struct {
podGetter corev1client.PodsGetter
podInformer corev1informer.PodInformer
secretInformer corev1informer.SecretInformer

cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}

const (
pruneControllerWorkQueueKey = "key"
numOldRevisionsToPreserve = 5
numOldRevisionsToPreserve = 5
)

// NewPruneController creates a new pruning controller
Expand All @@ -55,7 +44,7 @@ func NewPruneController(
podGetter corev1client.PodsGetter,
informers v1helpers.KubeInformersForNamespaces,
eventRecorder events.Recorder,
) *PruneController {
) factory.Controller {
c := &PruneController{
targetNamespace: targetNamespace,
secretPrefixes: secretPrefixes,
Expand All @@ -64,24 +53,15 @@ func NewPruneController(
podGetter: podGetter,
podInformer: informers.InformersFor(targetNamespace).Core().V1().Pods(),
secretInformer: informers.InformersFor(targetNamespace).Core().V1().Secrets(),
eventRecorder: eventRecorder.WithComponentSuffix("prune-controller"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "PruneController"),
}

c.podInformer.Informer().AddEventHandler(c.eventHandler())
c.secretInformer.Informer().AddEventHandler(c.eventHandler())

c.cachesToSync = append(
c.cachesToSync,
c.podInformer.Informer().HasSynced,
c.secretInformer.Informer().HasSynced,
)

return c
return factory.New().WithInformers(
c.podInformer.Informer(),
c.secretInformer.Informer(),
).WithSync(c.sync).ToController("PruneController", eventRecorder.WithComponentSuffix("prune-controller"))
}

func (c *PruneController) sync() error {
func (c *PruneController) sync(ctx context.Context, syncContext factory.SyncContext) error {
klog.V(5).Info("Syncing revision pruner")

pods, err := c.podInformer.Lister().Pods(c.targetNamespace).List(labels.SelectorFromSet(map[string]string{"apiserver": "true"}))
Expand All @@ -104,7 +84,7 @@ func (c *PruneController) sync() error {

// remove finalizer
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
s, err := c.secretGetter.Secrets(s.Namespace).Get(s.Name, metav1.GetOptions{})
s, err := c.secretGetter.Secrets(s.Namespace).Get(ctx, s.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
Expand All @@ -125,11 +105,11 @@ func (c *PruneController) sync() error {
}
s.Finalizers = newFinalizers

_, err = c.secretGetter.Secrets(s.Namespace).Update(s)
_, err = c.secretGetter.Secrets(s.Namespace).Update(ctx, s, metav1.UpdateOptions{})
return err
})

if err := c.secretGetter.Secrets(s.Namespace).Delete(s.Name, nil); err != nil && !errors.IsNotFound(err) {
if err := c.secretGetter.Secrets(s.Namespace).Delete(ctx, s.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return err
}
}
Expand Down Expand Up @@ -214,52 +194,3 @@ func minPodRevision(pods []*corev1.Pod) int {
}
return int(minRevision)
}

func (c *PruneController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting PruneController")
defer klog.Infof("Shutting down PruneController")
if !cache.WaitForCacheSync(ctx.Done(), c.cachesToSync...) {
return
}

// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-ctx.Done()
}

func (c *PruneController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *PruneController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.sync()
if err == nil {
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}

// eventHandler queues the operator to check spec and status
func (c *PruneController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(pruneControllerWorkQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(pruneControllerWorkQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(pruneControllerWorkQueueKey) },
}
}

0 comments on commit 1bc63ac

Please sign in to comment.