Skip to content

Commit

Permalink
Ansible operator: Adding support for event filtering via predicates
Browse files Browse the repository at this point in the history
Event filtering will allow Ansible operator to skip reconciles that are not required

Fixes operator-framework#1968
  • Loading branch information
Rammohan authored and rammohanc committed Oct 22, 2019
1 parent 7f46876 commit 7bdc0e2
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

### Added

- Added support for event filtering for ansible operator and added enableControllerWatchPredicates option for watches.yaml. ([#1968](https://github.com/operator-framework/operator-sdk/issues/1968))
- Added new `--skip-generation` flag to the `operator-sdk add api` command to support skipping generation of deepcopy and OpenAPI code and OpenAPI CRD specs. ([#1890](https://github.com/operator-framework/operator-sdk/pull/1890))
- The `operator-sdk olm-catalog gen-csv` command now produces indented JSON for the `alm-examples` annotation. ([#1793](https://github.com/operator-framework/operator-sdk/pull/1793))
- Added flag `--dep-manager` to command [`operator-sdk print-deps`](https://github.com/operator-framework/operator-sdk/blob/master/doc/sdk-cli-reference.md#print-deps) to specify the type of dependency manager file to print. The choice of dependency manager is inferred from top-level dependency manager files present if `--dep-manager` is not set. ([#1819](https://github.com/operator-framework/operator-sdk/pull/1819))
Expand Down
1 change: 1 addition & 0 deletions doc/ansible/dev/advanced_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Some features can be overridden per resource via an annotation on that CR. The o
| Reconcile Period | `reconcilePeriod` | time between reconcile runs for a particular CR | ansbile.operator-sdk/reconcile-period | 1m | |
| Manage Status | `manageStatus` | Allows the ansible operator to manage the conditions section of each resource's status section. | | true | |
| Watching Dependent Resources | `watchDependentResources` | Allows the ansible operator to dynamically watch resources that are created by ansible | | true | [dependent_watches.md](dependent_watches.md) |
| Enable Controller Watch Predicates | `enableControllerWatchPredicates` | Allows the ansible operator to add predicates to the controller watch | | false | [event_filtering.md](../../user/event-filtering.md) |
| Watching Cluster-Scoped Resources | `watchClusterScopedResources` | Allows the ansible operator to watch cluster-scoped resources that are created by ansible | | false | |
| Max Runner Artifacts | `maxRunnerArtifacts` | Manages the number of [artifact directories](https://ansible-runner.readthedocs.io/en/latest/intro.html#runner-artifacts-directory-hierarchy) that ansible runner will keep in the operator container for each individual resource. | ansible.operator-sdk/max-runner-artifacts | 20 | |
| Finalizer | `finalizer` | Sets a finalizer on the CR and maps a deletion event to a playbook or role | | | [finalizers.md](finalizers.md)|
Expand Down
11 changes: 6 additions & 5 deletions pkg/ansible/proxy/controllermap/controllermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ type WatchMap struct {

// Contents - Contains internal data associated with each controller
type Contents struct {
Controller controller.Controller
WatchDependentResources bool
WatchClusterScopedResources bool
OwnerWatchMap *WatchMap
AnnotationWatchMap *WatchMap
Controller controller.Controller
WatchDependentResources bool
WatchClusterScopedResources bool
EnableControllerWatchPredicates bool
OwnerWatchMap *WatchMap
AnnotationWatchMap *WatchMap
}

// NewControllerMap returns a new object that contains a mapping between GVK
Expand Down
19 changes: 17 additions & 2 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/operator-framework/operator-sdk/pkg/internal/predicates"
)

// RequestLogHandler - log the requests that come through the proxy.
Expand Down Expand Up @@ -210,6 +211,10 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
awMap := contents.AnnotationWatchMap
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(ownerMapping.GroupVersionKind)

// Adding dependentPredicate for avoiding reconciles when dependent objects are not changed by user
dependentPredicate := predicates.PredicateFuncs()

// Add a watch to controller
if contents.WatchDependentResources {
// Store watch in map
Expand All @@ -224,8 +229,13 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr

owMap.Store(resource.GroupVersionKind())
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())

if contents.EnableControllerWatchPredicates {
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u}, dependentPredicate)
} else {
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
}
// Store watch in map
err := contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
if err != nil {
return err
}
Expand All @@ -238,7 +248,12 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
awMap.Store(resource.GroupVersionKind())
typeString := fmt.Sprintf("%v.%v", owner.Kind, ownerGV.Group)
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_annotation_type", typeString)
err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString})

if contents.EnableControllerWatchPredicates {
err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString}, dependentPredicate)
} else {
err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString})
}
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ansible/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ func Run(flags *aoflags.AnsibleOperatorFlags) error {
}

cMap.Store(w.GroupVersionKind, &controllermap.Contents{Controller: *ctr,
WatchDependentResources: w.WatchDependentResources,
WatchClusterScopedResources: w.WatchClusterScopedResources,
OwnerWatchMap: controllermap.NewWatchMap(),
AnnotationWatchMap: controllermap.NewWatchMap(),
WatchDependentResources: w.WatchDependentResources,
WatchClusterScopedResources: w.WatchClusterScopedResources,
EnableControllerWatchPredicates: w.EnableControllerWatchPredicates,
OwnerWatchMap: controllermap.NewWatchMap(),
AnnotationWatchMap: controllermap.NewWatchMap(),
})
gvks = append(gvks, w.GroupVersionKind)
}
Expand Down
55 changes: 30 additions & 25 deletions pkg/ansible/watches/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ var log = logf.Log.WithName("watches")
// Watch - holds data used to create a mapping of GVK to ansible playbook or role.
// The mapping is used to compose an ansible operator.
type Watch struct {
GroupVersionKind schema.GroupVersionKind `yaml:",inline"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
MaxRunnerArtifacts int `yaml:"maxRunnerArtifacts"`
ReconcilePeriod time.Duration `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
WatchDependentResources bool `yaml:"watchDependentResources"`
WatchClusterScopedResources bool `yaml:"watchClusterScopedResources"`
Finalizer *Finalizer `yaml:"finalizer"`
GroupVersionKind schema.GroupVersionKind `yaml:",inline"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
MaxRunnerArtifacts int `yaml:"maxRunnerArtifacts"`
ReconcilePeriod time.Duration `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
WatchDependentResources bool `yaml:"watchDependentResources"`
WatchClusterScopedResources bool `yaml:"watchClusterScopedResources"`
EnableControllerWatchPredicates bool `yaml:"enableControllerWatchPredicates"`
Finalizer *Finalizer `yaml:"finalizer"`

// Not configurable via watches.yaml
MaxWorkers int `yaml:"maxWorkers"`
Expand All @@ -61,11 +62,12 @@ type Finalizer struct {

// Default values for optional fields on Watch
var (
maxRunnerArtifactsDefault = 20
reconcilePeriodDefault = "0s"
manageStatusDefault = true
watchDependentResourcesDefault = true
watchClusterScopedResourcesDefault = false
maxRunnerArtifactsDefault = 20
reconcilePeriodDefault = "0s"
manageStatusDefault = true
watchDependentResourcesDefault = true
watchClusterScopedResourcesDefault = false
enableControllerWatchPredicatesDefault = false

// these are overridden by cmdline flags
maxWorkersDefault = 1
Expand All @@ -78,17 +80,18 @@ var (
func (w *Watch) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Use an alias struct to handle complex types
type alias struct {
Group string `yaml:"group"`
Version string `yaml:"version"`
Kind string `yaml:"kind"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
MaxRunnerArtifacts int `yaml:"maxRunnerArtifacts"`
ReconcilePeriod string `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
WatchDependentResources bool `yaml:"watchDependentResources"`
WatchClusterScopedResources bool `yaml:"watchClusterScopedResources"`
Finalizer *Finalizer `yaml:"finalizer"`
Group string `yaml:"group"`
Version string `yaml:"version"`
Kind string `yaml:"kind"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
MaxRunnerArtifacts int `yaml:"maxRunnerArtifacts"`
ReconcilePeriod string `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
WatchDependentResources bool `yaml:"watchDependentResources"`
WatchClusterScopedResources bool `yaml:"watchClusterScopedResources"`
EnableControllerWatchPredicates bool `yaml:"enableControllerWatchPredicates"`
Finalizer *Finalizer `yaml:"finalizer"`
}
var tmp alias

Expand All @@ -99,6 +102,7 @@ func (w *Watch) UnmarshalYAML(unmarshal func(interface{}) error) error {
tmp.MaxRunnerArtifacts = maxRunnerArtifactsDefault
tmp.ReconcilePeriod = reconcilePeriodDefault
tmp.WatchClusterScopedResources = watchClusterScopedResourcesDefault
tmp.EnableControllerWatchPredicates = enableControllerWatchPredicatesDefault

if err := unmarshal(&tmp); err != nil {
return err
Expand Down Expand Up @@ -129,6 +133,7 @@ func (w *Watch) UnmarshalYAML(unmarshal func(interface{}) error) error {
w.ManageStatus = tmp.ManageStatus
w.WatchDependentResources = tmp.WatchDependentResources
w.WatchClusterScopedResources = tmp.WatchClusterScopedResources
w.EnableControllerWatchPredicates = tmp.EnableControllerWatchPredicates
w.Finalizer = tmp.Finalizer

return nil
Expand Down
44 changes: 3 additions & 41 deletions pkg/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"fmt"
"io"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -30,15 +29,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
rpb "k8s.io/helm/pkg/proto/hapi/release"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
crtpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/operator-framework/operator-sdk/pkg/helm/release"
"github.com/operator-framework/operator-sdk/pkg/predicate"
"github.com/operator-framework/operator-sdk/pkg/internal/predicates"
)

var log = logf.Log.WithName("helm.controller")
Expand Down Expand Up @@ -92,44 +90,8 @@ func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c c
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

dependentPredicate := crtpredicate.Funcs{
// We don't need to reconcile dependent resource creation events
// because dependent resources are only ever created during
// reconciliation. Another reconcile would be redundant.
CreateFunc: func(e event.CreateEvent) bool {
o := e.Object.(*unstructured.Unstructured)
log.V(1).Info("Skipping reconciliation for dependent resource creation", "name", o.GetName(), "namespace", o.GetNamespace(), "apiVersion", o.GroupVersionKind().GroupVersion(), "kind", o.GroupVersionKind().Kind)
return false
},

// Reconcile when a dependent resource is deleted so that it can be
// recreated.
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*unstructured.Unstructured)
log.V(1).Info("Reconciling due to dependent resource deletion", "name", o.GetName(), "namespace", o.GetNamespace(), "apiVersion", o.GroupVersionKind().GroupVersion(), "kind", o.GroupVersionKind().Kind)
return true
},

// Reconcile when a dependent resource is updated, so that it can
// be patched back to the resource managed by the Helm release, if
// necessary. Ignore updates that only change the status and
// resourceVersion.
UpdateFunc: func(e event.UpdateEvent) bool {
old := e.ObjectOld.(*unstructured.Unstructured).DeepCopy()
new := e.ObjectNew.(*unstructured.Unstructured).DeepCopy()

delete(old.Object, "status")
delete(new.Object, "status")
old.SetResourceVersion("")
new.SetResourceVersion("")

if reflect.DeepEqual(old.Object, new.Object) {
return false
}
log.V(1).Info("Reconciling due to dependent resource update", "name", new.GetName(), "namespace", new.GetNamespace(), "apiVersion", new.GroupVersionKind().GroupVersion(), "kind", new.GroupVersionKind().Kind)
return true
},
}
// using predefined functions for filtering events
dependentPredicate := predicates.PredicateFuncs()

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
Expand Down
81 changes: 81 additions & 0 deletions pkg/internal/predicates/predicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package predicates

import (
"reflect"

crtpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/event"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
logf "sigs.k8s.io/controller-runtime/pkg/log"


)

var log = logf.Log.WithName("predicates")

// PredicateFuncs returns functions defined for filtering events
func PredicateFuncs() crtpredicate.Funcs {

dependentPredicate := crtpredicate.Funcs{
// We don't need to reconcile dependent resource creation events
// because dependent resources are only ever created during
// reconciliation. Another reconcile would be redundant.
CreateFunc: func(e event.CreateEvent) bool {
o := e.Object.(*unstructured.Unstructured)
log.V(1).Info("Skipping reconciliation for dependent resource creation", "name", o.GetName(), "namespace", o.GetNamespace(), "apiVersion", o.GroupVersionKind().GroupVersion(), "kind", o.GroupVersionKind().Kind)
return false
},

// Reconcile when a dependent resource is deleted so that it can be
// recreated.
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*unstructured.Unstructured)
log.V(1).Info("Reconciling due to dependent resource deletion", "name", o.GetName(), "namespace", o.GetNamespace(), "apiVersion", o.GroupVersionKind().GroupVersion(), "kind", o.GroupVersionKind().Kind)
return true
},

// Don't reconcile when a generic event is received for a dependent
GenericFunc: func(e event.GenericEvent) bool {
o := e.Object.(*unstructured.Unstructured)
log.V(1).Info("Skipping reconcile due to generic event", "name", o.GetName(), "namespace", o.GetNamespace(), "apiVersion", o.GroupVersionKind().GroupVersion(), "kind", o.GroupVersionKind().Kind)
return false
},

// Reconcile when a dependent resource is updated, so that it can
// be patched back to the resource managed by the CR, if
// necessary. Ignore updates that only change the status and
// resourceVersion.
UpdateFunc: func(e event.UpdateEvent) bool {
old := e.ObjectOld.(*unstructured.Unstructured).DeepCopy()
new := e.ObjectNew.(*unstructured.Unstructured).DeepCopy()

delete(old.Object, "status")
delete(new.Object, "status")
old.SetResourceVersion("")
new.SetResourceVersion("")

if reflect.DeepEqual(old.Object, new.Object) {
return false
}
log.V(1).Info("Reconciling due to dependent resource update", "name", new.GetName(), "namespace", new.GetNamespace(), "apiVersion", new.GroupVersionKind().GroupVersion(), "kind", new.GroupVersionKind().Kind)
return true
},
}

return dependentPredicate
}

0 comments on commit 7bdc0e2

Please sign in to comment.