Skip to content

Commit

Permalink
Merge pull request #77 from ibuildthecloud/master
Browse files Browse the repository at this point in the history
Various enhancements
  • Loading branch information
ibuildthecloud committed Mar 28, 2020
2 parents 4054411 + 5964401 commit cbd9fef
Show file tree
Hide file tree
Showing 21 changed files with 906 additions and 207 deletions.
60 changes: 60 additions & 0 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,45 @@ type Reconciler func(oldObj runtime.Object, newObj runtime.Object) (bool, error)

type ClientFactory func(gvr schema.GroupVersionResource) (dynamic.NamespaceableResourceInterface, error)

type InformerFactory interface {
Get(gvk schema.GroupVersionKind, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, error)
}

type InformerGetter interface {
Informer() cache.SharedIndexInformer
GroupVersionKind() schema.GroupVersionKind
}

type PatchByGVK map[schema.GroupVersionKind]map[objectset.ObjectKey]string

func (p PatchByGVK) Add(gvk schema.GroupVersionKind, namespace, name, patch string) {
d, ok := p[gvk]
if !ok {
d = map[objectset.ObjectKey]string{}
p[gvk] = d
}
d[objectset.ObjectKey{
Name: name,
Namespace: namespace,
}] = patch
}

type Plan struct {
Create objectset.ObjectKeyByGVK
Delete objectset.ObjectKeyByGVK
Update PatchByGVK
Objects []runtime.Object
}

type Apply interface {
Apply(set *objectset.ObjectSet) error
ApplyObjects(objs ...runtime.Object) error
WithContext(ctx context.Context) Apply
WithCacheTypes(igs ...InformerGetter) Apply
WithCacheTypeFactory(factory InformerFactory) Apply
WithSetID(id string) Apply
WithOwner(obj runtime.Object) Apply
WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply
WithInjector(injs ...injectors.ConfigInjector) Apply
WithInjectorName(injs ...string) Apply
WithPatcher(gvk schema.GroupVersionKind, patchers Patcher) Apply
Expand All @@ -53,6 +80,10 @@ type Apply interface {
WithNoDelete() Apply
WithGVK(gvks ...schema.GroupVersionKind) Apply
WithSetOwnerReference(controller, block bool) Apply

FindOwner(obj runtime.Object) (runtime.Object, error)
PurgeOrphan(obj runtime.Object) error
DryRun(objs ...runtime.Object) (Plan, error)
}

func NewForConfig(cfg *rest.Config) (Apply, error) {
Expand All @@ -70,6 +101,7 @@ func New(discovery discovery.DiscoveryInterface, cf ClientFactory, igs ...Inform
clientFactory: cf,
discovery: discovery,
namespaced: map[schema.GroupVersionKind]bool{},
gvkToGVR: map[schema.GroupVersionKind]schema.GroupVersionResource{},
clients: map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface{},
},
informers: map[schema.GroupVersionKind]cache.SharedIndexInformer{},
Expand All @@ -93,6 +125,7 @@ type clients struct {
clientFactory ClientFactory
discovery discovery.DiscoveryInterface
namespaced map[schema.GroupVersionKind]bool
gvkToGVR map[schema.GroupVersionKind]schema.GroupVersionResource
clients map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface
}

Expand All @@ -102,6 +135,12 @@ func (c *clients) IsNamespaced(gvk schema.GroupVersionKind) bool {
return c.namespaced[gvk]
}

func (c *clients) gvr(gvk schema.GroupVersionKind) schema.GroupVersionResource {
c.Lock()
defer c.Unlock()
return c.gvkToGVR[gvk]
}

func (c *clients) client(gvk schema.GroupVersionKind) (dynamic.NamespaceableResourceInterface, error) {
c.Lock()
defer c.Unlock()
Expand All @@ -127,6 +166,7 @@ func (c *clients) client(gvk schema.GroupVersionKind) (dynamic.NamespaceableReso

c.namespaced[gvk] = resource.Namespaced
c.clients[gvk] = client
c.gvkToGVR[gvk] = gvk.GroupVersion().WithResource(resource.Name)
return client, nil
}

Expand All @@ -144,6 +184,10 @@ func (a *apply) newDesiredSet() desiredSet {
}
}

func (a *apply) DryRun(objs ...runtime.Object) (Plan, error) {
return a.newDesiredSet().DryRun(objs...)
}

func (a *apply) Apply(set *objectset.ObjectSet) error {
return a.newDesiredSet().Apply(set)
}
Expand All @@ -162,6 +206,10 @@ func (a *apply) WithOwner(obj runtime.Object) Apply {
return a.newDesiredSet().WithOwner(obj)
}

func (a *apply) WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply {
return a.newDesiredSet().WithOwnerKey(key, gvk)
}

func (a *apply) WithInjector(injs ...injectors.ConfigInjector) Apply {
return a.newDesiredSet().WithInjector(injs...)
}
Expand All @@ -174,6 +222,10 @@ func (a *apply) WithCacheTypes(igs ...InformerGetter) Apply {
return a.newDesiredSet().WithCacheTypes(igs...)
}

func (a *apply) WithCacheTypeFactory(factory InformerFactory) Apply {
return a.newDesiredSet().WithCacheTypeFactory(factory)
}

func (a *apply) WithGVK(gvks ...schema.GroupVersionKind) Apply {
return a.newDesiredSet().WithGVK(gvks...)
}
Expand Down Expand Up @@ -221,3 +273,11 @@ func (a *apply) WithSetOwnerReference(controller, block bool) Apply {
func (a *apply) WithContext(ctx context.Context) Apply {
return a.newDesiredSet().WithContext(ctx)
}

func (a *apply) FindOwner(obj runtime.Object) (runtime.Object, error) {
return a.newDesiredSet().FindOwner(obj)
}

func (a *apply) PurgeOrphan(obj runtime.Object) error {
return a.newDesiredSet().PurgeOrphan(obj)
}
33 changes: 31 additions & 2 deletions pkg/apply/desiredset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package apply
import (
"context"
"github.com/rancher/wrangler/pkg/apply/injectors"
"github.com/rancher/wrangler/pkg/kv"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)

type desiredSet struct {
a *apply
ctx context.Context
ctx context.Context
defaultNamespace string
listerNamespace string
setOwnerReference bool
Expand All @@ -23,6 +25,7 @@ type desiredSet struct {
pruneTypes map[schema.GroupVersionKind]cache.SharedIndexInformer
patchers map[schema.GroupVersionKind]Patcher
reconcilers map[schema.GroupVersionKind]Reconciler
informerFactory InformerFactory
remove bool
noDelete bool
setID string
Expand All @@ -33,6 +36,9 @@ type desiredSet struct {
ratelimitingQps float32
injectorNames []string
errs []error

createPlan bool
plan Plan
}

func (o *desiredSet) err(err error) error {
Expand All @@ -44,6 +50,12 @@ func (o desiredSet) Err() error {
return merr.NewErrors(append(o.errs, o.objs.Err())...)
}

func (o desiredSet) DryRun(objs ...runtime.Object) (Plan, error) {
o.objs = objectset.NewObjectSet()
o.objs.Add(objs...)
return o.dryRun()
}

func (o desiredSet) Apply(set *objectset.ObjectSet) error {
if set == nil {
set = objectset.NewObjectSet()
Expand Down Expand Up @@ -76,6 +88,14 @@ func (o desiredSet) WithSetID(id string) Apply {
return o
}

func (o desiredSet) WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply {
obj := &v1.PartialObjectMetadata{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(gvk)
o.owner = obj
return o
}

func (o desiredSet) WithOwner(obj runtime.Object) Apply {
o.owner = obj
return o
Expand All @@ -98,6 +118,11 @@ func (o desiredSet) WithInjectorName(injs ...string) Apply {
return o
}

func (o desiredSet) WithCacheTypeFactory(factory InformerFactory) Apply {
o.informerFactory = factory
return o
}

func (o desiredSet) WithCacheTypes(igs ...InformerGetter) Apply {
pruneTypes := make(map[schema.GroupVersionKind]cache.SharedIndexInformer, len(igs))
for k, v := range o.pruneTypes {
Expand Down Expand Up @@ -147,7 +172,11 @@ func (o desiredSet) WithRestrictClusterScoped() Apply {
}

func (o desiredSet) WithDefaultNamespace(ns string) Apply {
o.defaultNamespace = ns
if ns == "" {
o.defaultNamespace = defaultNamespace
} else {
o.defaultNamespace = ns
}
return o
}

Expand Down
34 changes: 26 additions & 8 deletions pkg/apply/desiredset_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
LabelName = "objectset.rio.cattle.io/owner-name"
LabelNamespace = "objectset.rio.cattle.io/owner-namespace"
LabelHash = "objectset.rio.cattle.io/hash"
LabelPrefix = "objectset.rio.cattle.io/"
)

var (
Expand Down Expand Up @@ -58,6 +59,15 @@ func (o *desiredSet) getRateLimit(labelHash string) flowcontrol.RateLimiter {
return rl
}

func (o *desiredSet) dryRun() (Plan, error) {
o.createPlan = true
o.plan.Create = objectset.ObjectKeyByGVK{}
o.plan.Update = PatchByGVK{}
o.plan.Delete = objectset.ObjectKeyByGVK{}
err := o.apply()
return o.plan, err
}

func (o *desiredSet) apply() error {
if o.objs == nil || o.objs.Len() == 0 {
o.remove = true
Expand All @@ -67,7 +77,7 @@ func (o *desiredSet) apply() error {
return err
}

labelSet, annotationSet, err := o.getLabelsAndAnnotations()
labelSet, annotationSet, err := GetLabelsAndAnnotations(o.setID, o.owner)
if err != nil {
return o.err(err)
}
Expand All @@ -90,13 +100,13 @@ func (o *desiredSet) apply() error {
objs := o.collect(objList)

debugID := o.debugID()
req, err := labels.NewRequirement(LabelHash, selection.Equals, []string{labelSet[LabelHash]})
sel, err := GetSelector(labelSet)
if err != nil {
return o.err(err)
}

for _, gvk := range o.objs.GVKOrder(o.knownGVK()...) {
o.process(debugID, labels.NewSelector().Add(*req), gvk, objs[gvk])
o.process(debugID, sel, gvk, objs[gvk])
}

return o.Err()
Expand Down Expand Up @@ -161,18 +171,26 @@ func (o *desiredSet) runInjectors(objList []runtime.Object) ([]runtime.Object, e
return objList, nil
}

func (o *desiredSet) getLabelsAndAnnotations() (map[string]string, map[string]string, error) {
func GetSelector(labelSet map[string]string) (labels.Selector, error) {
req, err := labels.NewRequirement(LabelHash, selection.Equals, []string{labelSet[LabelHash]})
if err != nil {
return nil, err
}
return labels.NewSelector().Add(*req), nil
}

func GetLabelsAndAnnotations(setID string, owner runtime.Object) (map[string]string, map[string]string, error) {
annotations := map[string]string{
LabelID: o.setID,
LabelID: setID,
}

if o.owner != nil {
gvk, err := gvk2.Get(o.owner)
if owner != nil {
gvk, err := gvk2.Get(owner)
if err != nil {
return nil, nil, err
}
annotations[LabelGVK] = gvk.String()
metadata, err := meta.Accessor(o.owner)
metadata, err := meta.Accessor(owner)
if err != nil {
return nil, nil, fmt.Errorf("failed to get metadata for %s", gvk)
}
Expand Down
35 changes: 31 additions & 4 deletions pkg/apply/desiredset_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"compress/gzip"
"encoding/base64"
"io/ioutil"
"strings"

data2 "github.com/rancher/wrangler/pkg/data"

"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/data/convert"
Expand Down Expand Up @@ -95,7 +98,7 @@ func emptyMaps(data map[string]interface{}, keys ...string) bool {
return true
}

func sanitizePatch(patch []byte) ([]byte, error) {
func sanitizePatch(patch []byte, removeObjectSetAnnotation bool) ([]byte, error) {
mod := false
data := map[string]interface{}{}
err := json.Unmarshal(patch, &data)
Expand All @@ -117,6 +120,23 @@ func sanitizePatch(patch []byte) ([]byte, error) {
mod = true
}

if removeObjectSetAnnotation {
metadata := convert.ToMapInterface(data2.GetValueN(data, "metadata"))
annotations := convert.ToMapInterface(data2.GetValueN(data, "metadata", "annotations"))
for k := range annotations {
if strings.HasPrefix(k, LabelPrefix) {
mod = true
delete(annotations, k)
}
}
if mod && len(annotations) == 0 {
delete(metadata, "annotations")
if len(metadata) == 0 {
delete(data, "metadata")
}
}
}

if emptyMaps(data, "metadata", "annotations") {
return []byte("{}"), nil
}
Expand Down Expand Up @@ -152,7 +172,7 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
return false, nil
}

patch, err = sanitizePatch(patch)
patch, err = sanitizePatch(patch, false)
if err != nil {
return false, err
}
Expand All @@ -172,6 +192,9 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
if err != nil {
return false, err
}
if originalObject == nil {
originalObject = oldObject
}
handled, err := reconciler(originalObject, newObject)
if err != nil {
return false, err
Expand All @@ -187,13 +210,17 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
return true, err
}

func (o *desiredSet) compareObjects(gvk schema.GroupVersionKind, patcher Patcher, client dynamic.NamespaceableResourceInterface, debugID string, oldObject, newObject runtime.Object, force bool) error {
func (o *desiredSet) compareObjects(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patcher, client dynamic.NamespaceableResourceInterface, debugID string, oldObject, newObject runtime.Object, force bool) error {
oldMetadata, err := meta.Accessor(oldObject)
if err != nil {
return err
}

if ran, err := applyPatch(gvk, o.reconcilers[gvk], patcher, debugID, oldObject, newObject); err != nil {
if o.createPlan {
o.plan.Objects = append(o.plan.Objects, oldObject)
}

if ran, err := applyPatch(gvk, reconciler, patcher, debugID, oldObject, newObject); err != nil {
return err
} else if !ran {
logrus.Debugf("DesiredSet - No change(2) %s %s/%s for %s", gvk, oldMetadata.GetNamespace(), oldMetadata.GetName(), debugID)
Expand Down
Loading

0 comments on commit cbd9fef

Please sign in to comment.