diff --git a/deploy/charts/rig-operator/templates/clusterrole.yaml b/deploy/charts/rig-operator/templates/clusterrole.yaml index eed969e5b..eeb1fcaa8 100644 --- a/deploy/charts/rig-operator/templates/clusterrole.yaml +++ b/deploy/charts/rig-operator/templates/clusterrole.yaml @@ -10,6 +10,8 @@ rules: resources: - configmaps - secrets + - pods + - events verbs: - get - list diff --git a/pkg/controller/plugin/external_plugin.go b/pkg/controller/plugin/external_plugin.go index 4d26ecc4b..3c354c902 100644 --- a/pkg/controller/plugin/external_plugin.go +++ b/pkg/controller/plugin/external_plugin.go @@ -3,7 +3,6 @@ package plugin import ( "context" "encoding/json" - "fmt" "io" "os" "os/exec" @@ -264,7 +263,6 @@ func (s requestServer) GetObject( co.SetName(req.GetName()) if req.GetCurrent() { if err := s.req.GetExisting(co); err != nil { - fmt.Println("error getting existing object", err) return nil, err } } else { diff --git a/pkg/controller/plugin/watcher.go b/pkg/controller/plugin/watcher.go index c8bc0f493..bde3e7b33 100644 --- a/pkg/controller/plugin/watcher.go +++ b/pkg/controller/plugin/watcher.go @@ -12,7 +12,9 @@ import ( apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin" "github.com/rigdev/rig/pkg/pipeline" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -23,6 +25,7 @@ import ( type ObjectWatcher interface { WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) + WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback) } type CapsuleWatcher interface { @@ -102,7 +105,11 @@ func (w *capsuleWatcher) WatchPrimary(ctx context.Context, objType client.Object return w.w.watchPrimary(ctx, w.namespace, w.capsule, objType, w, cb) } -type WatchCallback func(obj client.Object, objectWatcher ObjectWatcher) *apipipeline.ObjectStatus +type WatchCallback func( + obj client.Object, + events []*corev1.Event, + objectWatcher ObjectWatcher, +) *apipipeline.ObjectStatus type Watcher interface { NewCapsuleWatcher( @@ -256,6 +263,41 @@ type objectWatch struct { subWatchers map[string]*objectWatch } +type eventListWatcher struct { + ctx context.Context + cc client.WithWatch + namespace string + fields fields.Set + logger hclog.Logger +} + +func (w *eventListWatcher) List(options metav1.ListOptions) (runtime.Object, error) { + list := &corev1.EventList{} + if err := w.cc.List(w.ctx, list, &client.ListOptions{ + Namespace: w.namespace, + FieldSelector: fields.SelectorFromSet(w.fields), + Raw: &options, + }); err != nil { + w.logger.Error("error getting events", "fields", w.fields, "error", err) + return nil, err + } + + return list, nil +} + +func (w *eventListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + list := &corev1.EventList{} + wi, err := w.cc.Watch(w.ctx, list, &client.ListOptions{ + Namespace: w.namespace, + FieldSelector: fields.SelectorFromSet(w.fields), + Raw: &options, + }) + if err != nil { + w.logger.Error("error watching events", "fields", w.fields, "error", err) + } + return wi, err +} + type objectWatcher struct { w *watcher ctx context.Context @@ -266,6 +308,9 @@ type objectWatcher struct { store cache.Store ctrl cache.Controller + eventStore cache.Store + eventCtrl cache.Controller + namespace string lock sync.Mutex @@ -285,6 +330,8 @@ func newObjectWatcher( log.Fatal(err) } + gvk := gvks[0] + gvkList := gvks[0] gvkList.Kind += "List" @@ -307,6 +354,22 @@ func newObjectWatcher( ow.store = store ow.ctrl = ctrl + apiVersion, kind := gvk.ToAPIVersionAndKind() + elw := &eventListWatcher{ + ctx: ctx, + cc: cc, + namespace: namespace, + fields: fields.Set{ + "involvedObject.apiVersion": apiVersion, + "involvedObject.kind": kind, + }, + logger: logger, + } + + eventStore, eventCtrl := cache.NewInformer(elw, &corev1.Event{}, 0, ow) + ow.eventStore = eventStore + ow.eventCtrl = eventCtrl + w.objectSyncing.Add(1) go ow.run(ctx) @@ -353,8 +416,11 @@ func (ow *objectWatcher) removeFilter(f *objectWatch) bool { func (ow *objectWatcher) run(ctx context.Context) { go ow.ctrl.Run(ctx.Done()) + go ow.eventCtrl.Run(ctx.Done()) - cache.WaitForCacheSync(ctx.Done(), ow.ctrl.HasSynced) + ow.logger.Info("waiting for sync", "namespace", ow.namespace, "gvk", ow.gvkList) + success := cache.WaitForCacheSync(ctx.Done(), ow.ctrl.HasSynced, ow.eventCtrl.HasSynced) + ow.logger.Info("sync is done", "namespace", ow.namespace, "gvk", ow.gvkList, "success", success) ow.w.objectSyncing.Done() @@ -371,6 +437,7 @@ func (ow *objectWatcher) List(options metav1.ListOptions) (runtime.Object, error Namespace: ow.namespace, Raw: &options, }); err != nil { + ow.logger.Error("error getting object list", "gvk", ow.gvkList, "error", err) return nil, err } @@ -383,13 +450,30 @@ func (ow *objectWatcher) Watch(options metav1.ListOptions) (watch.Interface, err return nil, err } - return ow.cc.Watch(ow.ctx, list.(client.ObjectList), &client.ListOptions{ + wi, err := ow.cc.Watch(ow.ctx, list.(client.ObjectList), &client.ListOptions{ Namespace: ow.namespace, Raw: &options, }) + if err != nil { + ow.logger.Error("error watching objects", "gvk", ow.gvkList, "error", err) + } + return wi, err } func (ow *objectWatcher) OnAdd(obj interface{}, _ bool) { + if e, ok := obj.(*corev1.Event); ok { + key := cache.NewObjectName(e.InvolvedObject.Namespace, e.InvolvedObject.Name) + item, exists, err := ow.store.GetByKey(key.String()) + if err != nil { + ow.logger.Error("error getting object from event", "gvk", ow.gvkList, "error", err) + } + if !exists { + return + } + + obj = item + } + co, ok := obj.(client.Object) if !ok { ow.logger.Info("invalid object type") @@ -410,6 +494,19 @@ func (ow *objectWatcher) OnUpdate(_, newObj interface{}) { } func (ow *objectWatcher) OnDelete(obj interface{}) { + if e, ok := obj.(*corev1.Event); ok { + key := cache.NewObjectName(e.InvolvedObject.Namespace, e.InvolvedObject.Name) + item, exists, err := ow.store.GetByKey(key.String()) + if err != nil { + ow.logger.Error("error getting object from event", "gvk", ow.gvkList, "error", err) + } + if !exists { + return + } + + obj = item + } + co, ok := obj.(client.Object) if !ok { ow.logger.Info("invalid object type") @@ -454,7 +551,14 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov if remove { f.cw.deleted(ref) } else { - os := f.cb(co, &res) + var events []*corev1.Event + for _, e := range ow.eventStore.List() { + event := e.(*corev1.Event) + if event.InvolvedObject.Name == co.GetName() { + events = append(events, event) + } + } + os := f.cb(co, events, &res) os.ObjectRef = ref f.cw.updated(os) } @@ -488,19 +592,16 @@ type objectWatcherResult struct { watchers map[string]objectWatchCandidate } -func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) { +func (r *objectWatcherResult) watchObject(key objectWatchKey, objType client.Object, cb WatchCallback) { gvks, _, err := r.cc.Scheme().ObjectKinds(objType) if err != nil { // TODO! log.Fatal(err) } - key := objectWatchKey{ - watcherKey: watcherKey{ - namespace: r.namespace, - gvk: gvks[0], - }, - names: []string{objectName}, + key.watcherKey = watcherKey{ + namespace: r.namespace, + gvk: gvks[0], } r.watchers[key.id()] = objectWatchCandidate{ @@ -510,6 +611,26 @@ func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType cl } } +func (r *objectWatcherResult) WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) { + r.watchObject( + objectWatchKey{ + names: []string{objectName}, + }, + objType, + cb, + ) +} + +func (r *objectWatcherResult) WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback) { + r.watchObject( + objectWatchKey{ + labels: objectLabels, + }, + objType, + cb, + ) +} + type objectWatchCandidate struct { key objectWatchKey objType client.Object diff --git a/plugins/capsulesteps/deployment/plugin.go b/plugins/capsulesteps/deployment/plugin.go index 6acf48d3b..eb8ccc047 100644 --- a/plugins/capsulesteps/deployment/plugin.go +++ b/plugins/capsulesteps/deployment/plugin.go @@ -39,8 +39,6 @@ const ( type Config struct{} type Plugin struct { - plugin.NoWatchObjectStatus - reader client.Reader configBytes []byte } diff --git a/plugins/capsulesteps/deployment/watcher.go b/plugins/capsulesteps/deployment/watcher.go new file mode 100644 index 000000000..4164cd2e7 --- /dev/null +++ b/plugins/capsulesteps/deployment/watcher.go @@ -0,0 +1,66 @@ +package deployment + +import ( + "context" + + apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" + "github.com/rigdev/rig/pkg/controller/plugin" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func onPodUpdated( + obj client.Object, + events []*corev1.Event, + _ plugin.ObjectWatcher, +) *apipipeline.ObjectStatus { + pod := obj.(*corev1.Pod) + + status := &apipipeline.ObjectStatus{ + Type: apipipeline.ObjectType_OBJECT_TYPE_POD, + Properties: map[string]string{}, + } + + for _, c := range pod.Status.Conditions { + cond := &apipipeline.ObjectCondition{ + Name: string(c.Type), + State: apipipeline.ObjectState_OBJECT_STATE_PENDING, + Message: c.Message, + } + status.Conditions = append(status.Conditions, cond) + } + + for _, e := range events { + cond := &apipipeline.ObjectCondition{ + Name: string(e.Name), + State: apipipeline.ObjectState_OBJECT_STATE_PENDING, + Message: e.Message, + } + status.Conditions = append(status.Conditions, cond) + } + + return status +} + +func onDeploymentUpdated( + obj client.Object, + _ []*corev1.Event, + objectWatcher plugin.ObjectWatcher, +) *apipipeline.ObjectStatus { + dep := obj.(*appsv1.Deployment) + + objectWatcher.WatchSecondaryByLabels(labels.Set(dep.Spec.Template.GetLabels()), &corev1.Pod{}, onPodUpdated) + + status := &apipipeline.ObjectStatus{ + Type: apipipeline.ObjectType_OBJECT_TYPE_PRIMARY, + Properties: map[string]string{}, + } + + return status +} + +func (p *Plugin) WatchObjectStatus(ctx context.Context, watcher plugin.CapsuleWatcher) error { + return watcher.WatchPrimary(ctx, &appsv1.Deployment{}, onDeploymentUpdated) +} diff --git a/plugins/capsulesteps/ingress_routes/watcher.go b/plugins/capsulesteps/ingress_routes/watcher.go index a943a7997..95e985003 100644 --- a/plugins/capsulesteps/ingress_routes/watcher.go +++ b/plugins/capsulesteps/ingress_routes/watcher.go @@ -10,7 +10,7 @@ import ( v1 "github.com/cert-manager/cert-manager/pkg/apis/meta/v1" apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" "github.com/rigdev/rig/pkg/controller/plugin" - "google.golang.org/protobuf/types/known/timestamppb" + corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -18,8 +18,6 @@ import ( func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatus { status := &apipipeline.ObjectStatus{ Type: apipipeline.ObjectType_OBJECT_TYPE_PRIMARY, - State: apipipeline.ObjectState_OBJECT_STATE_HEALTHY, - UpdatedAt: timestamppb.Now(), Properties: map[string]string{}, } @@ -49,13 +47,15 @@ func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatus { return status } -func onCertificateUpdated(obj client.Object, objectWatcher plugin.ObjectWatcher) *apipipeline.ObjectStatus { +func onCertificateUpdated( + obj client.Object, + events []*corev1.Event, + objectWatcher plugin.ObjectWatcher, +) *apipipeline.ObjectStatus { cert := obj.(*cmv1.Certificate) status := &apipipeline.ObjectStatus{ Type: apipipeline.ObjectType_OBJECT_TYPE_SECONDARY, - State: apipipeline.ObjectState_OBJECT_STATE_HEALTHY, - UpdatedAt: timestamppb.Now(), Properties: map[string]string{}, } @@ -91,7 +91,11 @@ func onCertificateUpdated(obj client.Object, objectWatcher plugin.ObjectWatcher) return status } -func onIngressUpdated(obj client.Object, objectWatcher plugin.ObjectWatcher) *apipipeline.ObjectStatus { +func onIngressUpdated( + obj client.Object, + events []*corev1.Event, + objectWatcher plugin.ObjectWatcher, +) *apipipeline.ObjectStatus { ingress := obj.(*netv1.Ingress) objectWatcher.WatchSecondaryByName(ingress.GetName(), &cmv1.Certificate{}, onCertificateUpdated) diff --git a/proto/rig/operator/api/v1/pipeline/service.proto b/proto/rig/operator/api/v1/pipeline/service.proto index 5d30fa983..f2997099b 100644 --- a/proto/rig/operator/api/v1/pipeline/service.proto +++ b/proto/rig/operator/api/v1/pipeline/service.proto @@ -39,10 +39,8 @@ enum ObjectType { message ObjectStatus { ObjectRef object_ref = 1; ObjectType type = 2; - google.protobuf.Timestamp updated_at = 3; - ObjectState state = 4; - repeated ObjectCondition conditions = 5; - map properties = 6; + repeated ObjectCondition conditions = 3; + map properties = 4; } enum ObjectState { @@ -57,7 +55,6 @@ message ObjectCondition { google.protobuf.Timestamp updated_at = 2; ObjectState state = 3; string message = 4; - string last_error = 5; } message DryRunRequest {