Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/charts/rig-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ rules:
resources:
- configmaps
- secrets
- pods
- events
verbs:
- get
- list
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package plugin
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
Expand Down Expand Up @@ -264,7 +263,6 @@ func (s requestServer) GetObject(
co.SetName(req.GetName())
if req.GetCurrent() {
if err := s.req.GetExisting(co); err != nil {
fmt.Println("error getting existing object", err)
return nil, err
}
} else {
Expand Down
143 changes: 132 additions & 11 deletions pkg/controller/plugin/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/pipeline"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -23,6 +25,7 @@ import (

type ObjectWatcher interface {
WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback)
WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback)
}

type CapsuleWatcher interface {
Expand Down Expand Up @@ -102,7 +105,11 @@ func (w *capsuleWatcher) WatchPrimary(ctx context.Context, objType client.Object
return w.w.watchPrimary(ctx, w.namespace, w.capsule, objType, w, cb)
}

type WatchCallback func(obj client.Object, objectWatcher ObjectWatcher) *apipipeline.ObjectStatus
type WatchCallback func(
obj client.Object,
events []*corev1.Event,
objectWatcher ObjectWatcher,
) *apipipeline.ObjectStatus

type Watcher interface {
NewCapsuleWatcher(
Expand Down Expand Up @@ -256,6 +263,41 @@ type objectWatch struct {
subWatchers map[string]*objectWatch
}

type eventListWatcher struct {
ctx context.Context
cc client.WithWatch
namespace string
fields fields.Set
logger hclog.Logger
}

func (w *eventListWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := &corev1.EventList{}
if err := w.cc.List(w.ctx, list, &client.ListOptions{
Namespace: w.namespace,
FieldSelector: fields.SelectorFromSet(w.fields),
Raw: &options,
}); err != nil {
w.logger.Error("error getting events", "fields", w.fields, "error", err)
return nil, err
}

return list, nil
}

func (w *eventListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
list := &corev1.EventList{}
wi, err := w.cc.Watch(w.ctx, list, &client.ListOptions{
Namespace: w.namespace,
FieldSelector: fields.SelectorFromSet(w.fields),
Raw: &options,
})
if err != nil {
w.logger.Error("error watching events", "fields", w.fields, "error", err)
}
return wi, err
}

type objectWatcher struct {
w *watcher
ctx context.Context
Expand All @@ -266,6 +308,9 @@ type objectWatcher struct {
store cache.Store
ctrl cache.Controller

eventStore cache.Store
eventCtrl cache.Controller

namespace string

lock sync.Mutex
Expand All @@ -285,6 +330,8 @@ func newObjectWatcher(
log.Fatal(err)
}

gvk := gvks[0]

gvkList := gvks[0]
gvkList.Kind += "List"

Expand All @@ -307,6 +354,22 @@ func newObjectWatcher(
ow.store = store
ow.ctrl = ctrl

apiVersion, kind := gvk.ToAPIVersionAndKind()
elw := &eventListWatcher{
ctx: ctx,
cc: cc,
namespace: namespace,
fields: fields.Set{
"involvedObject.apiVersion": apiVersion,
"involvedObject.kind": kind,
},
logger: logger,
}

eventStore, eventCtrl := cache.NewInformer(elw, &corev1.Event{}, 0, ow)
ow.eventStore = eventStore
ow.eventCtrl = eventCtrl

w.objectSyncing.Add(1)

go ow.run(ctx)
Expand Down Expand Up @@ -353,8 +416,11 @@ func (ow *objectWatcher) removeFilter(f *objectWatch) bool {

func (ow *objectWatcher) run(ctx context.Context) {
go ow.ctrl.Run(ctx.Done())
go ow.eventCtrl.Run(ctx.Done())

cache.WaitForCacheSync(ctx.Done(), ow.ctrl.HasSynced)
ow.logger.Info("waiting for sync", "namespace", ow.namespace, "gvk", ow.gvkList)
success := cache.WaitForCacheSync(ctx.Done(), ow.ctrl.HasSynced, ow.eventCtrl.HasSynced)
ow.logger.Info("sync is done", "namespace", ow.namespace, "gvk", ow.gvkList, "success", success)

ow.w.objectSyncing.Done()

Expand All @@ -371,6 +437,7 @@ func (ow *objectWatcher) List(options metav1.ListOptions) (runtime.Object, error
Namespace: ow.namespace,
Raw: &options,
}); err != nil {
ow.logger.Error("error getting object list", "gvk", ow.gvkList, "error", err)
return nil, err
}

Expand All @@ -383,13 +450,30 @@ func (ow *objectWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
return nil, err
}

return ow.cc.Watch(ow.ctx, list.(client.ObjectList), &client.ListOptions{
wi, err := ow.cc.Watch(ow.ctx, list.(client.ObjectList), &client.ListOptions{
Namespace: ow.namespace,
Raw: &options,
})
if err != nil {
ow.logger.Error("error watching objects", "gvk", ow.gvkList, "error", err)
}
return wi, err
}

func (ow *objectWatcher) OnAdd(obj interface{}, _ bool) {
if e, ok := obj.(*corev1.Event); ok {
key := cache.NewObjectName(e.InvolvedObject.Namespace, e.InvolvedObject.Name)
item, exists, err := ow.store.GetByKey(key.String())
if err != nil {
ow.logger.Error("error getting object from event", "gvk", ow.gvkList, "error", err)
}
if !exists {
return
}

obj = item
}

co, ok := obj.(client.Object)
if !ok {
ow.logger.Info("invalid object type")
Expand All @@ -410,6 +494,19 @@ func (ow *objectWatcher) OnUpdate(_, newObj interface{}) {
}

func (ow *objectWatcher) OnDelete(obj interface{}) {
if e, ok := obj.(*corev1.Event); ok {
key := cache.NewObjectName(e.InvolvedObject.Namespace, e.InvolvedObject.Name)
item, exists, err := ow.store.GetByKey(key.String())
if err != nil {
ow.logger.Error("error getting object from event", "gvk", ow.gvkList, "error", err)
}
if !exists {
return
}

obj = item
}

co, ok := obj.(client.Object)
if !ok {
ow.logger.Info("invalid object type")
Expand Down Expand Up @@ -454,7 +551,14 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov
if remove {
f.cw.deleted(ref)
} else {
os := f.cb(co, &res)
var events []*corev1.Event
for _, e := range ow.eventStore.List() {
event := e.(*corev1.Event)
if event.InvolvedObject.Name == co.GetName() {
events = append(events, event)
}
}
os := f.cb(co, events, &res)
os.ObjectRef = ref
f.cw.updated(os)
}
Expand Down Expand Up @@ -488,19 +592,16 @@ type objectWatcherResult struct {
watchers map[string]objectWatchCandidate
}

func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) {
func (r *objectWatcherResult) watchObject(key objectWatchKey, objType client.Object, cb WatchCallback) {
gvks, _, err := r.cc.Scheme().ObjectKinds(objType)
if err != nil {
// TODO!
log.Fatal(err)
}

key := objectWatchKey{
watcherKey: watcherKey{
namespace: r.namespace,
gvk: gvks[0],
},
names: []string{objectName},
key.watcherKey = watcherKey{
namespace: r.namespace,
gvk: gvks[0],
}

r.watchers[key.id()] = objectWatchCandidate{
Expand All @@ -510,6 +611,26 @@ func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType cl
}
}

func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) {
r.watchObject(
objectWatchKey{
names: []string{objectName},
},
objType,
cb,
)
}

func (r *objectWatcherResult) WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback) {
r.watchObject(
objectWatchKey{
labels: objectLabels,
},
objType,
cb,
)
}

type objectWatchCandidate struct {
key objectWatchKey
objType client.Object
Expand Down
2 changes: 0 additions & 2 deletions plugins/capsulesteps/deployment/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ const (
type Config struct{}

type Plugin struct {
plugin.NoWatchObjectStatus

reader client.Reader
configBytes []byte
}
Expand Down
66 changes: 66 additions & 0 deletions plugins/capsulesteps/deployment/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package deployment

import (
"context"

apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
"github.com/rigdev/rig/pkg/controller/plugin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func onPodUpdated(
obj client.Object,
events []*corev1.Event,
_ plugin.ObjectWatcher,
) *apipipeline.ObjectStatus {
pod := obj.(*corev1.Pod)

status := &apipipeline.ObjectStatus{
Type: apipipeline.ObjectType_OBJECT_TYPE_POD,
Properties: map[string]string{},
}

for _, c := range pod.Status.Conditions {
cond := &apipipeline.ObjectCondition{
Name: string(c.Type),
State: apipipeline.ObjectState_OBJECT_STATE_PENDING,
Message: c.Message,
}
status.Conditions = append(status.Conditions, cond)
}

for _, e := range events {
cond := &apipipeline.ObjectCondition{
Name: string(e.Name),
State: apipipeline.ObjectState_OBJECT_STATE_PENDING,
Message: e.Message,
}
status.Conditions = append(status.Conditions, cond)
}

return status
}

func onDeploymentUpdated(
obj client.Object,
_ []*corev1.Event,
objectWatcher plugin.ObjectWatcher,
) *apipipeline.ObjectStatus {
dep := obj.(*appsv1.Deployment)

objectWatcher.WatchSecondaryByLabels(labels.Set(dep.Spec.Template.GetLabels()), &corev1.Pod{}, onPodUpdated)

status := &apipipeline.ObjectStatus{
Type: apipipeline.ObjectType_OBJECT_TYPE_PRIMARY,
Properties: map[string]string{},
}

return status
}

func (p *Plugin) WatchObjectStatus(ctx context.Context, watcher plugin.CapsuleWatcher) error {
return watcher.WatchPrimary(ctx, &appsv1.Deployment{}, onDeploymentUpdated)
}
Loading