From 35f3f15faf1ed54996f58c27d5ce455729a39cbc Mon Sep 17 00:00:00 2001 From: Anders Johnsen Date: Fri, 3 May 2024 14:24:43 +0200 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=9A=A7=20Improve=20pod=20object=20sta?= =?UTF-8?q?tus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/controller/plugin/external_plugin.go | 2 + pkg/controller/plugin/heap.go | 135 +++++++ pkg/controller/plugin/watcher.go | 102 ++++- plugins/capsulesteps/deployment/watcher.go | 378 +++++++++++++++++- .../capsulesteps/ingress_routes/watcher.go | 15 +- .../api/v1/pipeline/object_status.proto | 51 +++ 6 files changed, 669 insertions(+), 14 deletions(-) create mode 100644 pkg/controller/plugin/heap.go diff --git a/pkg/controller/plugin/external_plugin.go b/pkg/controller/plugin/external_plugin.go index 59d270753..2ec9054b2 100644 --- a/pkg/controller/plugin/external_plugin.go +++ b/pkg/controller/plugin/external_plugin.go @@ -92,6 +92,8 @@ func (p *pluginExecutor) start(ctx context.Context, pluginConfig string, restCon Cmd: exec.CommandContext(ctx, p.binaryPath, p.args...), Logger: pLogger, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + SyncStdout: os.Stderr, + SyncStderr: os.Stderr, Stderr: os.Stderr, }) diff --git a/pkg/controller/plugin/heap.go b/pkg/controller/plugin/heap.go new file mode 100644 index 000000000..7af71702a --- /dev/null +++ b/pkg/controller/plugin/heap.go @@ -0,0 +1,135 @@ +package plugin + +import ( + "container/heap" +) + +// This implementation is copied from here: +// https://gotipplay.golang.org/p/d4M0QBkfmIr +// https://github.com/golang/go/issues/47632 + +// A priorityHeap is a min-heap backed by a slice. +type priorityHeap[E any] struct { + s sliceHeap[E] +} + +// newPriorityHeap constructs a new Heap with a comparison function. +func newPriorityHeap[E any](less func(E, E) bool) *priorityHeap[E] { + return &priorityHeap[E]{sliceHeap[E]{less: less}} +} + +// Push pushes an element onto the heap. The complexity is O(log n) +// where n = h.Len(). +func (h *priorityHeap[E]) Push(elem E) { + heap.Push(&h.s, elem) +} + +// Pop removes and returns the minimum element (according to the less function) +// from the heap. Pop panics if the heap is empty. +// The complexity is O(log n) where n = h.Len(). +func (h *priorityHeap[E]) Pop() E { + return heap.Pop(&h.s).(E) +} + +// Peek returns the minimum element (according to the less function) in the heap. +// Peek panics if the heap is empty. +// The complexity is O(1). +func (h *priorityHeap[E]) Peek() E { + return h.s.s[0] +} + +// Len returns the number of elements in the heap. +func (h *priorityHeap[E]) Len() int { + return len(h.s.s) +} + +// Slice returns the underlying slice. +// The slice is in heap order; the minimum value is at index 0. +// The heap retains the returned slice, so altering the slice may break +// the invariants and invalidate the heap. +func (h *priorityHeap[E]) Slice() []E { + return h.s.s +} + +// SetIndex specifies an optional function to be called +// when updating the position of any heap element within the slice, +// including during the element's initial Push. +// +// SetIndex must be called at most once, before any calls to Push. +// +// When an element is removed from the heap by Pop or Remove, +// the index function is called with the invalid index -1 +// to signify that the element is no longer within the slice. +func (h *priorityHeap[E]) SetIndex(f func(E, int)) { + h.s.setIndex = f +} + +// Fix re-establishes the heap ordering +// after the element at index i has changed its value. +// Changing the value of the element at index i and then calling Fix +// is equivalent to, but less expensive than, +// calling h.Remove(i) followed by a Push of the new value. +// The complexity is O(log n) where n = h.Len(). +// The index for use with Fix is recorded using the function passed to SetIndex. +func (h *priorityHeap[E]) Fix(i int) { + heap.Fix(&h.s, i) +} + +// Remove removes and returns the element at index i from the heap. +// The complexity is O(log n) where n = h.Len(). +// The index for use with Remove is recorded using the function passed to SetIndex. +func (h *priorityHeap[E]) Remove(i int) E { + return heap.Remove(&h.s, i).(E) +} + +// sliceHeap just exists to use the existing heap.Interface as the +// implementation of Heap. +type sliceHeap[E any] struct { + s []E + less func(E, E) bool + setIndex func(E, int) +} + +func (s *sliceHeap[E]) Len() int { return len(s.s) } + +func (s *sliceHeap[E]) Swap(i, j int) { + s.s[i], s.s[j] = s.s[j], s.s[i] + if s.setIndex != nil { + s.setIndex(s.s[i], i) + s.setIndex(s.s[j], j) + } +} + +func (s *sliceHeap[E]) Less(i, j int) bool { + return s.less(s.s[i], s.s[j]) +} + +func (s *sliceHeap[E]) Push(x interface{}) { + s.s = append(s.s, x.(E)) + if s.setIndex != nil { + s.setIndex(s.s[len(s.s)-1], len(s.s)-1) + } +} + +func (s *sliceHeap[E]) Pop() interface{} { + e := s.s[len(s.s)-1] + if s.setIndex != nil { + s.setIndex(e, -1) + } + s.s = s.s[:len(s.s)-1] + return e +} + +type Item struct { + value string + priority int + index int +} + +func (it *Item) Less(it1 *Item) bool { + return it.priority > it1.priority +} + +func (it *Item) setIndex(index int) { + it.index = index +} diff --git a/pkg/controller/plugin/watcher.go b/pkg/controller/plugin/watcher.go index abc49e7cf..e8d54e70e 100644 --- a/pkg/controller/plugin/watcher.go +++ b/pkg/controller/plugin/watcher.go @@ -7,6 +7,7 @@ import ( "slices" "strings" "sync" + "time" "github.com/hashicorp/go-hclog" apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" @@ -24,6 +25,7 @@ import ( ) type ObjectWatcher interface { + Reschedule(deadline time.Time) WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback) WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback) } @@ -307,6 +309,12 @@ func (w *eventListWatcher) Watch(options metav1.ListOptions) (watch.Interface, e return wi, err } +type queueObj struct { + deadline time.Time + obj client.Object + index int +} + type objectWatcher struct { w *watcher ctx context.Context @@ -324,6 +332,11 @@ type objectWatcher struct { lock sync.Mutex + objects map[string]*queueObj + queue *priorityHeap[*queueObj] + + lastProcess map[string]time.Duration + filters map[*objectWatch]struct{} } @@ -355,8 +368,14 @@ func newObjectWatcher( logger: logger, namespace: namespace, filters: map[*objectWatch]struct{}{}, + objects: map[string]*queueObj{}, + queue: newPriorityHeap(func(a, b *queueObj) bool { return a.deadline.Before(b.deadline) }), } + ow.queue.SetIndex(func(qo *queueObj, i int) { + qo.index = i + }) + w.logger.Info("starting watcher", "gvk", ow.gvkList) store, ctrl := cache.NewInformer(ow, obj, 0, ow) @@ -433,7 +452,27 @@ func (ow *objectWatcher) run(ctx context.Context) { ow.w.objectSyncing.Done() - <-ctx.Done() + timer := time.NewTicker(250 * time.Millisecond) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + } + + ow.lock.Lock() + + for ow.queue.Len() > 0 && ow.queue.Peek().deadline.Before(time.Now()) { + p := ow.queue.Pop() + for f := range ow.filters { + ow.handleForFilter(p.obj, f, false) + } + } + + ow.lock.Unlock() + } } func (ow *objectWatcher) List(options metav1.ListOptions) (runtime.Object, error) { @@ -493,6 +532,36 @@ func (ow *objectWatcher) OnAdd(obj interface{}, _ bool) { ow.lock.Lock() defer ow.lock.Unlock() + + if o, ok := ow.objects[co.GetName()]; ok { + if o.index >= 0 { + // Already scheduled to run in the future. + deadline := time.Now().Add(time.Second) + if o.deadline.After(deadline) { + o.deadline = deadline + ow.queue.Fix(o.index) + } + o.obj = co + return + } + + // Deadline is when we can process the next event for this object. + deadline := o.deadline.Add(1 * time.Second) + if deadline.After(time.Now()) { + o.deadline = deadline + ow.queue.Push(o) + return + } else { + o.deadline = time.Now() + } + } else { + ow.objects[co.GetName()] = &queueObj{ + deadline: time.Now(), + obj: co, + index: -1, + } + } + for f := range ow.filters { ow.handleForFilter(co, f, false) } @@ -526,6 +595,15 @@ func (ow *objectWatcher) OnDelete(obj interface{}) { ow.lock.Lock() defer ow.lock.Unlock() + + if q, ok := ow.objects[co.GetName()]; ok { + if q.index >= 0 { + // Already scheduled to run in the future, stop it. + ow.queue.Remove(q.index) + } + delete(ow.objects, co.GetName()) + } + for f := range ow.filters { ow.handleForFilter(co, f, true) } @@ -570,6 +648,21 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov os := f.cb(co, events, &res) os.ObjectRef = ref f.cw.updated(os) + + if !res.deadline.IsZero() { + // Must exist in map! + o := ow.objects[co.GetName()] + if o.index >= 0 { + if res.deadline.Before(o.deadline) { + o.deadline = res.deadline + ow.queue.Fix(o.index) + } + } else { + o.deadline = res.deadline + ow.queue.Push(o) + } + } + } for key, w := range res.watchers { @@ -599,6 +692,7 @@ type objectWatcherResult struct { cc client.WithWatch namespace string watchers map[string]objectWatchCandidate + deadline time.Time } func (r *objectWatcherResult) watchObject(key objectWatchKey, objType client.Object, cb WatchCallback) { @@ -640,6 +734,12 @@ func (r *objectWatcherResult) WatchSecondaryByLabels(objectLabels labels.Set, ob ) } +func (r *objectWatcherResult) Reschedule(deadline time.Time) { + if r.deadline.IsZero() || deadline.Before(r.deadline) { + r.deadline = deadline + } +} + type objectWatchCandidate struct { key objectWatchKey objType client.Object diff --git a/plugins/capsulesteps/deployment/watcher.go b/plugins/capsulesteps/deployment/watcher.go index 4164cd2e7..365bfd6f4 100644 --- a/plugins/capsulesteps/deployment/watcher.go +++ b/plugins/capsulesteps/deployment/watcher.go @@ -2,11 +2,18 @@ package deployment import ( "context" + "fmt" + "regexp" + "strings" + "time" apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" "github.com/rigdev/rig/pkg/controller/plugin" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/types/known/timestamppb" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -14,7 +21,7 @@ import ( func onPodUpdated( obj client.Object, events []*corev1.Event, - _ plugin.ObjectWatcher, + watcher plugin.ObjectWatcher, ) *apipipeline.ObjectStatus { pod := obj.(*corev1.Pod) @@ -23,25 +30,372 @@ func onPodUpdated( Properties: map[string]string{}, } - for _, c := range pod.Status.Conditions { - cond := &apipipeline.ObjectCondition{ - Name: string(c.Type), + makePlacementCondition(status, pod, events) + + containers := splitByContainers(status, pod, events) + + makeImagePullingConditions(containers) + makeRunningConditions(pod, containers, watcher) + + return status +} + +func makeRunningConditions(pod *corev1.Pod, containers []containerInfo, watcher plugin.ObjectWatcher) { + for _, c := range containers { + makeRunningCondition(pod, c, watcher) + } +} + +func makeRunningCondition(pod *corev1.Pod, container containerInfo, watcher plugin.ObjectWatcher) { + if c := getCondition(pod.Status.Conditions, "Initialized"); c != nil { + if c.Status != v1.ConditionTrue { + return + } + } + + if c := getCondition(pod.Status.Conditions, "PodReadyToStartContainers"); c != nil { + if c.Status != v1.ConditionTrue { + return + } + } else { + // Backup, for when PodReadyToStartContainers is not supported. + if container.status.LastTerminationState == (v1.ContainerState{}) && container.status.State.Running == nil { + return + } + } + + makeExecutingCondition(container) + + if container.spec.LivenessProbe != nil { + liveness := &apipipeline.ObjectCondition{ + Name: "Liveness", + Message: "Waiting for instance to start", State: apipipeline.ObjectState_OBJECT_STATE_PENDING, - Message: c.Message, } - status.Conditions = append(status.Conditions, cond) + + // Add 3 seconds to allow for the probe to run and be recorded. + probePeriod := time.Duration(container.spec.LivenessProbe.PeriodSeconds)*time.Second + 3 + if container.status.State.Running != nil { + if time.Since(container.status.State.Running.StartedAt.Time) < probePeriod { + liveness.Message = "Waiting for liveness check to pass" + liveness.UpdatedAt = timestamppb.New(container.status.State.Running.StartedAt.Time) + watcher.Reschedule(container.status.State.Running.StartedAt.Time.Add(probePeriod)) + } else { + liveness.Message = "Instance is alive" + liveness.State = apipipeline.ObjectState_OBJECT_STATE_PENDING + } + } + + unhealthyEvent := getEventWithPrefix(container.events, "Unhealthy", "Liveness ") + if unhealthyEvent != nil { + ts := timestampFromEvent(unhealthyEvent) + if container.status.State.Running != nil { + // Check if the event is still relevant. + if time.Since(ts.AsTime()) < probePeriod { + liveness.Message = unhealthyEvent.Message + liveness.UpdatedAt = ts + liveness.State = apipipeline.ObjectState_OBJECT_STATE_PENDING + watcher.Reschedule(ts.AsTime().Add(probePeriod)) + } + } else { + liveness.Message = unhealthyEvent.Message + liveness.UpdatedAt = ts + liveness.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + } + } + + container.subObj.Conditions = append(container.subObj.Conditions, liveness) } - for _, e := range events { - cond := &apipipeline.ObjectCondition{ - Name: string(e.Name), + if container.spec.ReadinessProbe != nil { + ready := &apipipeline.ObjectCondition{ + Name: "Readiness", + Message: "Waiting for instance to accept traffic", State: apipipeline.ObjectState_OBJECT_STATE_PENDING, - Message: e.Message, } - status.Conditions = append(status.Conditions, cond) + + readyCondition := getCondition(pod.Status.Conditions, "Ready") + var readyAt time.Time + if readyCondition != nil && readyCondition.Status == v1.ConditionTrue { + ready.Message = "Instance ready for traffic" + ready.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + readyAt = readyCondition.LastTransitionTime.Time + } + + unhealthyEvent := getEventWithPrefix(container.events, "Unhealthy", "Readiness ") + if unhealthyEvent != nil { + ts := timestampFromEvent(unhealthyEvent) + if ts.AsTime().After(readyAt) { + ready.Message = unhealthyEvent.Message + ready.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + ready.UpdatedAt = ts + } + } + + container.subObj.Conditions = append(container.subObj.Conditions, ready) } +} - return status +func makeExecutingCondition(container containerInfo) { + cond := &apipipeline.ObjectCondition{ + Name: "Running", + Message: "Waiting to start", + State: apipipeline.ObjectState_OBJECT_STATE_PENDING, + } + + containerStatus := &apipipeline.ContainerStatus{ + RestartCount: uint32(container.status.RestartCount), + } + + if container.status.LastTerminationState.Terminated != nil { + containerStatus.LastTermination = containerStateTerminatedFromK8s(container.status.LastTerminationState.Terminated) + // If this isn't overwritten, it's because the instance is 'Waiting to start' after it had crashed + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + } + + if container.status.State.Running != nil { + containerStatus.StartedAt = timestamppb.New(container.status.State.Running.StartedAt.Time) + cond.Message = "Container is running" + cond.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + } else if container.status.State.Waiting != nil { + cond.Message = fmt.Sprintf("%s: %s", container.status.State.Waiting.Reason, container.status.State.Waiting.Message) + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + } + + container.subObj.Conditions = append(container.subObj.Conditions, cond) + container.subObj.PlatformStatus = []*apipipeline.PlatformObjectStatus{{ + Name: container.name, + Kind: &apipipeline.PlatformObjectStatus_Container{ + Container: containerStatus, + }, + }} +} + +func containerStateTerminatedFromK8s(state *v1.ContainerStateTerminated) *apipipeline.ContainerStatus_ContainerTermination { + if state == nil { + return nil + } + return &apipipeline.ContainerStatus_ContainerTermination{ + ExitCode: state.ExitCode, + Signal: state.Signal, + Reason: state.Reason, + Message: state.Message, + StartedAt: timestamppb.New(state.StartedAt.Time), + FinishedAt: timestamppb.New(state.FinishedAt.Time), + ContainerId: state.ContainerID, + } +} + +func makeImagePullingConditions(containers []containerInfo) { + for _, c := range containers { + makeImagePullingCondition(c) + } +} + +func makeImagePullingCondition(container containerInfo) { + cond := &apipipeline.ObjectCondition{ + Name: "Pull container image", + State: apipipeline.ObjectState_OBJECT_STATE_PENDING, + } + + for _, e := range container.events { + switch { + case e.Reason == "Pulled": + cond.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + case e.Reason == "Pulling": + cond.State = apipipeline.ObjectState_OBJECT_STATE_PENDING + case e.Reason == "BackOff" && strings.HasPrefix(e.Message, "Back-off pulling image"): + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + case e.Reason == "Failed" && strings.HasPrefix(e.Message, "Failed to pull image"): + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + default: + continue + } + + cond.Message = e.Message + cond.UpdatedAt = timestampFromEvent(e) + break + } + + // Override event if latest event is wrong. + if container.status.ImageID != "" && cond.State != apipipeline.ObjectState_OBJECT_STATE_HEALTHY { + cond.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + cond.Message = "Image pulled" + } + + if cond.State != apipipeline.ObjectState_OBJECT_STATE_HEALTHY { + // Bad state. Try to pull out a better message from the waiting status, if possible. + if w := container.status.State.Waiting; w != nil && cond.Message == "" { + switch w.Reason { + case "ErrImagePull": + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + case "ImagePullBackOff": + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + default: + } + + cond.Message = w.Message + cond.UpdatedAt = timestamppb.Now() + } + } + + container.subObj.Properties["Image"] = container.status.Image + container.subObj.Conditions = append(container.subObj.Conditions, cond) +} + +func makePlacementCondition(status *apipipeline.ObjectStatus, pod *corev1.Pod, events []*corev1.Event) { + cond := &apipipeline.ObjectCondition{ + Name: "Node placement", + State: apipipeline.ObjectState_OBJECT_STATE_PENDING, + } + + if node := pod.Spec.NodeName; node != "" { + status.Properties["Node"] = node + } + + if scheduled := getCondition(pod.Status.Conditions, "PodScheduled"); scheduled != nil { + cond.UpdatedAt = timestampFromCondition(scheduled) + + switch scheduled.Status { + case "True": + cond.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + } + + cond.Message = scheduled.Message + } + + for _, e := range events { + switch e.Reason { + case "Scheduled": + cond.State = apipipeline.ObjectState_OBJECT_STATE_HEALTHY + case "FailedScheduling": + cond.State = apipipeline.ObjectState_OBJECT_STATE_ERROR + default: + continue + } + + cond.Message = e.Message + cond.UpdatedAt = timestampFromEvent(e) + break + } + + status.Conditions = append(status.Conditions, cond) +} + +type containerInfo struct { + name string + status v1.ContainerStatus + spec v1.Container + events []*v1.Event + subObj *apipipeline.SubObjectStatus + initContainer bool + sidecar bool +} + +func splitByContainers(status *apipipeline.ObjectStatus, pod *v1.Pod, events []*v1.Event) []containerInfo { + containers := map[string]containerInfo{} + + for _, c := range pod.Spec.Containers { + containers[c.Name] = containerInfo{ + name: c.Name, + spec: c, + } + } + for _, c := range pod.Spec.InitContainers { + ci := containerInfo{ + name: c.Name, + spec: c, + } + if r := c.RestartPolicy; r != nil && *r == v1.ContainerRestartPolicyAlways { + ci.sidecar = true + } else { + ci.initContainer = true + } + containers[c.Name] = ci + } + + for _, s := range pod.Status.ContainerStatuses { + info := containers[s.Name] + info.status = s + containers[s.Name] = info + } + for _, s := range pod.Status.InitContainerStatuses { + info := containers[s.Name] + info.status = s + containers[s.Name] = info + } + + for _, e := range events { + p := e.InvolvedObject.FieldPath + name := containerNameFromEventFieldPath(p) + if name == "" { + continue + } + info := containers[name] + info.events = append(info.events, e) + containers[name] = info + } + + for name, c := range containers { + c.subObj = &apipipeline.SubObjectStatus{ + Name: name, + Type: "container", + Properties: map[string]string{}, + } + containers[name] = c + status.SubObjects = append(status.SubObjects, c.subObj) + } + + return maps.Values(containers) +} + +var _containerNameRe = regexp.MustCompile(`spec\.(initC|c)ontainers{(?P[^}]*)}`) + +func containerNameFromEventFieldPath(s string) string { + m := _containerNameRe.FindStringSubmatch(s) + idx := _containerNameRe.SubexpIndex("name") + if idx >= len(m) { + return "" + } + return m[idx] +} + +func getCondition(conditions []v1.PodCondition, conditionType string) *v1.PodCondition { + for _, c := range conditions { + if c.Type == v1.PodConditionType(conditionType) { + return &c + } + } + return nil +} + +func getEventWithPrefix(events []*v1.Event, eventType, prefix string) *v1.Event { + for _, e := range events { + if e.Reason == eventType && strings.HasPrefix(e.Message, prefix) { + return e + } + } + return nil +} + +func timestampFromCondition(condition *v1.PodCondition) *timestamppb.Timestamp { + return timestamppb.New(condition.LastTransitionTime.Time) +} + +func timestampFromEvent(e *v1.Event) *timestamppb.Timestamp { + if !e.LastTimestamp.IsZero() { + return timestamppb.New(e.LastTimestamp.Time) + } + + if !e.FirstTimestamp.IsZero() { + return timestamppb.New(e.FirstTimestamp.Time) + } + + if !e.EventTime.IsZero() { + return timestamppb.New(e.EventTime.Time) + } + + return timestamppb.Now() } func onDeploymentUpdated( diff --git a/plugins/capsulesteps/ingress_routes/watcher.go b/plugins/capsulesteps/ingress_routes/watcher.go index 95e985003..9269d18db 100644 --- a/plugins/capsulesteps/ingress_routes/watcher.go +++ b/plugins/capsulesteps/ingress_routes/watcher.go @@ -22,9 +22,11 @@ func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatus { } var hosts []string + host := "" for _, r := range ingress.Spec.Rules { if r.Host != "" { hosts = append(hosts, r.Host) + host = r.Host } } @@ -42,8 +44,19 @@ func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatus { status.Properties["IP"] = lb.IP } } - status.Conditions = append(status.Conditions, ipCondition) + parts := strings.Split(ingress.GetName(), "-") + routeID := parts[len(parts)-1] + status.Conditions = append(status.Conditions, ipCondition) + status.PlatformStatus = append(status.PlatformStatus, &apipipeline.PlatformObjectStatus{ + Name: routeID, + Kind: &apipipeline.PlatformObjectStatus_Route{ + Route: &apipipeline.RouteStatus{ + Id: routeID, + Host: host, + }, + }, + }) return status } diff --git a/proto/rig/operator/api/v1/pipeline/object_status.proto b/proto/rig/operator/api/v1/pipeline/object_status.proto index 7e77a2a0a..76b144edf 100644 --- a/proto/rig/operator/api/v1/pipeline/object_status.proto +++ b/proto/rig/operator/api/v1/pipeline/object_status.proto @@ -30,6 +30,57 @@ message ObjectStatus { ObjectType type = 2; repeated ObjectCondition conditions = 3; map properties = 4; + repeated SubObjectStatus sub_objects = 5; + repeated PlatformObjectStatus platform_status = 6; +} + +message RouteStatus { + string id = 1; + string host = 2; +} + +message InstanceStatus {} + +message ContainerStatus { + // Information about the last container termination. + message ContainerTermination { + // Exit status from the last termination of the container + int32 exit_code = 1; + // Signal from the last termination of the container + int32 signal = 2; + // (brief) reason from the last termination of the container + string reason = 3; + // Message regarding the last termination of the container + string message = 4; + // Time at which previous execution of the container started + google.protobuf.Timestamp started_at = 5; + // Time at which the container last terminated + google.protobuf.Timestamp finished_at = 6; + // Container's ID in the format 'type://container_id' + string container_id = 7; + } + + uint32 restart_count = 1; + ContainerTermination last_termination = 2; + google.protobuf.Timestamp started_at = 3; +} + +message PlatformObjectStatus { + string name = 1; + + oneof kind { + RouteStatus route = 2; + InstanceStatus instance = 3; + ContainerStatus container = 4; + } +} + +message SubObjectStatus { + string name = 1; + string type = 2; + repeated ObjectCondition conditions = 3; + map properties = 4; + repeated PlatformObjectStatus platform_status = 5; } enum ObjectState { From 1afabbd310c0b16e522cb6607177682d9e7637a1 Mon Sep 17 00:00:00 2001 From: Anders Johnsen Date: Fri, 3 May 2024 16:09:50 +0200 Subject: [PATCH 2/3] fix lint --- pkg/controller/plugin/heap.go | 1 + pkg/controller/plugin/watcher.go | 6 ++---- plugins/capsulesteps/deployment/watcher.go | 4 +++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/controller/plugin/heap.go b/pkg/controller/plugin/heap.go index 7af71702a..6048978f0 100644 --- a/pkg/controller/plugin/heap.go +++ b/pkg/controller/plugin/heap.go @@ -1,3 +1,4 @@ +// nolint package plugin import ( diff --git a/pkg/controller/plugin/watcher.go b/pkg/controller/plugin/watcher.go index e8d54e70e..9b3adb46d 100644 --- a/pkg/controller/plugin/watcher.go +++ b/pkg/controller/plugin/watcher.go @@ -335,8 +335,6 @@ type objectWatcher struct { objects map[string]*queueObj queue *priorityHeap[*queueObj] - lastProcess map[string]time.Duration - filters map[*objectWatch]struct{} } @@ -551,9 +549,9 @@ func (ow *objectWatcher) OnAdd(obj interface{}, _ bool) { o.deadline = deadline ow.queue.Push(o) return - } else { - o.deadline = time.Now() } + + o.deadline = time.Now() } else { ow.objects[co.GetName()] = &queueObj{ deadline: time.Now(), diff --git a/plugins/capsulesteps/deployment/watcher.go b/plugins/capsulesteps/deployment/watcher.go index 365bfd6f4..29e717ade 100644 --- a/plugins/capsulesteps/deployment/watcher.go +++ b/plugins/capsulesteps/deployment/watcher.go @@ -171,7 +171,9 @@ func makeExecutingCondition(container containerInfo) { }} } -func containerStateTerminatedFromK8s(state *v1.ContainerStateTerminated) *apipipeline.ContainerStatus_ContainerTermination { +func containerStateTerminatedFromK8s( + state *v1.ContainerStateTerminated, +) *apipipeline.ContainerStatus_ContainerTermination { if state == nil { return nil } From d9d4e05780b907e4cd2b185478f1f399bcd69b02 Mon Sep 17 00:00:00 2001 From: Anders Johnsen Date: Fri, 3 May 2024 17:10:12 +0200 Subject: [PATCH 3/3] Proto cleanup --- pkg/controller/plugin/watcher.go | 25 +++++++++++++------ plugins/capsulesteps/deployment/watcher.go | 21 +++++++++------- .../capsulesteps/ingress_routes/watcher.go | 12 ++++----- .../api/v1/pipeline/object_status.proto | 23 ++++++++--------- 4 files changed, 44 insertions(+), 37 deletions(-) diff --git a/pkg/controller/plugin/watcher.go b/pkg/controller/plugin/watcher.go index 9b3adb46d..9712b0d70 100644 --- a/pkg/controller/plugin/watcher.go +++ b/pkg/controller/plugin/watcher.go @@ -13,6 +13,7 @@ import ( apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline" apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin" "github.com/rigdev/rig/pkg/pipeline" + "google.golang.org/protobuf/types/known/timestamppb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -120,7 +121,7 @@ type WatchCallback func( obj client.Object, events []*corev1.Event, objectWatcher ObjectWatcher, -) *apipipeline.ObjectStatus +) *apipipeline.ObjectStatusInfo type Watcher interface { NewCapsuleWatcher( @@ -196,7 +197,7 @@ func (w *watcher) watchPrimary( subWatchers: map[string]*objectWatch{}, } - w.startWatch(f, objType) + w.startWatch(f, objType, nil) go func() { w.objectSyncing.Wait() @@ -210,12 +211,12 @@ func (w *watcher) watchPrimary( return nil } -func (w *watcher) startWatch(f *objectWatch, objType client.Object) { +func (w *watcher) startWatch(f *objectWatch, objType client.Object, parent *apipipeline.ObjectRef) { w.lock.Lock() ow, ok := w.objectWatchers[f.key.watcherKey] if !ok { - ow = newObjectWatcher(w, f.key.watcherKey.namespace, objType, w.cc, w.logger) + ow = newObjectWatcher(w, f.key.watcherKey.namespace, objType, w.cc, parent, w.logger) w.objectWatchers[f.key.watcherKey] = ow } @@ -329,6 +330,7 @@ type objectWatcher struct { eventCtrl cache.Controller namespace string + parent *apipipeline.ObjectRef lock sync.Mutex @@ -343,6 +345,7 @@ func newObjectWatcher( namespace string, obj client.Object, cc client.WithWatch, + parent *apipipeline.ObjectRef, logger hclog.Logger, ) *objectWatcher { gvks, _, err := cc.Scheme().ObjectKinds(obj) @@ -365,6 +368,7 @@ func newObjectWatcher( gvkList: gvkList, logger: logger, namespace: namespace, + parent: parent, filters: map[*objectWatch]struct{}{}, objects: map[string]*queueObj{}, queue: newPriorityHeap(func(a, b *queueObj) bool { return a.deadline.Before(b.deadline) }), @@ -643,9 +647,14 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov events = append(events, event) } } - os := f.cb(co, events, &res) - os.ObjectRef = ref - f.cw.updated(os) + info := f.cb(co, events, &res) + status := &apipipeline.ObjectStatus{ + ObjectRef: ref, + Info: info, + UpdatedAt: timestamppb.Now(), + Parent: ow.parent, + } + f.cw.updated(status) if !res.deadline.IsZero() { // Must exist in map! @@ -673,7 +682,7 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov } f.subWatchers[key] = sf - go ow.w.startWatch(sf, w.objType) + go ow.w.startWatch(sf, w.objType, ref) } } diff --git a/plugins/capsulesteps/deployment/watcher.go b/plugins/capsulesteps/deployment/watcher.go index 29e717ade..dfdbb823a 100644 --- a/plugins/capsulesteps/deployment/watcher.go +++ b/plugins/capsulesteps/deployment/watcher.go @@ -22,12 +22,17 @@ func onPodUpdated( obj client.Object, events []*corev1.Event, watcher plugin.ObjectWatcher, -) *apipipeline.ObjectStatus { +) *apipipeline.ObjectStatusInfo { pod := obj.(*corev1.Pod) - status := &apipipeline.ObjectStatus{ - Type: apipipeline.ObjectType_OBJECT_TYPE_POD, + status := &apipipeline.ObjectStatusInfo{ Properties: map[string]string{}, + PlatformStatus: []*apipipeline.PlatformObjectStatus{{ + Name: pod.GetName(), + Kind: &apipipeline.PlatformObjectStatus_Instance{ + Instance: &apipipeline.InstanceStatus{}, + }, + }}, } makePlacementCondition(status, pod, events) @@ -245,7 +250,7 @@ func makeImagePullingCondition(container containerInfo) { container.subObj.Conditions = append(container.subObj.Conditions, cond) } -func makePlacementCondition(status *apipipeline.ObjectStatus, pod *corev1.Pod, events []*corev1.Event) { +func makePlacementCondition(status *apipipeline.ObjectStatusInfo, pod *corev1.Pod, events []*corev1.Event) { cond := &apipipeline.ObjectCondition{ Name: "Node placement", State: apipipeline.ObjectState_OBJECT_STATE_PENDING, @@ -294,7 +299,7 @@ type containerInfo struct { sidecar bool } -func splitByContainers(status *apipipeline.ObjectStatus, pod *v1.Pod, events []*v1.Event) []containerInfo { +func splitByContainers(status *apipipeline.ObjectStatusInfo, pod *v1.Pod, events []*v1.Event) []containerInfo { containers := map[string]containerInfo{} for _, c := range pod.Spec.Containers { @@ -341,7 +346,6 @@ func splitByContainers(status *apipipeline.ObjectStatus, pod *v1.Pod, events []* for name, c := range containers { c.subObj = &apipipeline.SubObjectStatus{ Name: name, - Type: "container", Properties: map[string]string{}, } containers[name] = c @@ -404,13 +408,12 @@ func onDeploymentUpdated( obj client.Object, _ []*corev1.Event, objectWatcher plugin.ObjectWatcher, -) *apipipeline.ObjectStatus { +) *apipipeline.ObjectStatusInfo { dep := obj.(*appsv1.Deployment) objectWatcher.WatchSecondaryByLabels(labels.Set(dep.Spec.Template.GetLabels()), &corev1.Pod{}, onPodUpdated) - status := &apipipeline.ObjectStatus{ - Type: apipipeline.ObjectType_OBJECT_TYPE_PRIMARY, + status := &apipipeline.ObjectStatusInfo{ Properties: map[string]string{}, } diff --git a/plugins/capsulesteps/ingress_routes/watcher.go b/plugins/capsulesteps/ingress_routes/watcher.go index 9269d18db..89faf2b31 100644 --- a/plugins/capsulesteps/ingress_routes/watcher.go +++ b/plugins/capsulesteps/ingress_routes/watcher.go @@ -15,9 +15,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatus { - status := &apipipeline.ObjectStatus{ - Type: apipipeline.ObjectType_OBJECT_TYPE_PRIMARY, +func toIngressStatus(ingress *netv1.Ingress) *apipipeline.ObjectStatusInfo { + status := &apipipeline.ObjectStatusInfo{ Properties: map[string]string{}, } @@ -64,11 +63,10 @@ func onCertificateUpdated( obj client.Object, events []*corev1.Event, objectWatcher plugin.ObjectWatcher, -) *apipipeline.ObjectStatus { +) *apipipeline.ObjectStatusInfo { cert := obj.(*cmv1.Certificate) - status := &apipipeline.ObjectStatus{ - Type: apipipeline.ObjectType_OBJECT_TYPE_SECONDARY, + status := &apipipeline.ObjectStatusInfo{ Properties: map[string]string{}, } @@ -108,7 +106,7 @@ func onIngressUpdated( obj client.Object, events []*corev1.Event, objectWatcher plugin.ObjectWatcher, -) *apipipeline.ObjectStatus { +) *apipipeline.ObjectStatusInfo { ingress := obj.(*netv1.Ingress) objectWatcher.WatchSecondaryByName(ingress.GetName(), &cmv1.Certificate{}, onCertificateUpdated) diff --git a/proto/rig/operator/api/v1/pipeline/object_status.proto b/proto/rig/operator/api/v1/pipeline/object_status.proto index 76b144edf..abd93559b 100644 --- a/proto/rig/operator/api/v1/pipeline/object_status.proto +++ b/proto/rig/operator/api/v1/pipeline/object_status.proto @@ -18,20 +18,18 @@ message ObjectStatusChange { } } -enum ObjectType { - OBJECT_TYPE_UNSPECIFIED = 0; - OBJECT_TYPE_PRIMARY = 1; - OBJECT_TYPE_SECONDARY = 2; - OBJECT_TYPE_POD = 3; -} - message ObjectStatus { ObjectRef object_ref = 1; - ObjectType type = 2; - repeated ObjectCondition conditions = 3; - map properties = 4; - repeated SubObjectStatus sub_objects = 5; - repeated PlatformObjectStatus platform_status = 6; + ObjectStatusInfo info = 2; + google.protobuf.Timestamp updated_at = 3; + ObjectRef parent = 4; +} + +message ObjectStatusInfo { + repeated ObjectCondition conditions = 1; + map properties = 2; + repeated SubObjectStatus sub_objects = 3; + repeated PlatformObjectStatus platform_status = 4; } message RouteStatus { @@ -77,7 +75,6 @@ message PlatformObjectStatus { message SubObjectStatus { string name = 1; - string type = 2; repeated ObjectCondition conditions = 3; map properties = 4; repeated PlatformObjectStatus platform_status = 5;