Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (p *pluginExecutor) start(ctx context.Context, pluginConfig string, restCon
Cmd: exec.CommandContext(ctx, p.binaryPath, p.args...),
Logger: pLogger,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
SyncStdout: os.Stderr,
SyncStderr: os.Stderr,
Stderr: os.Stderr,
})

Expand Down
136 changes: 136 additions & 0 deletions pkg/controller/plugin/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// nolint
package plugin

import (
"container/heap"
)

// This implementation is copied from here:
// https://gotipplay.golang.org/p/d4M0QBkfmIr
// https://github.com/golang/go/issues/47632

// A priorityHeap is a min-heap backed by a slice.
type priorityHeap[E any] struct {
s sliceHeap[E]
}

// newPriorityHeap constructs a new Heap with a comparison function.
func newPriorityHeap[E any](less func(E, E) bool) *priorityHeap[E] {
return &priorityHeap[E]{sliceHeap[E]{less: less}}
}

// Push pushes an element onto the heap. The complexity is O(log n)
// where n = h.Len().
func (h *priorityHeap[E]) Push(elem E) {
heap.Push(&h.s, elem)
}

// Pop removes and returns the minimum element (according to the less function)
// from the heap. Pop panics if the heap is empty.
// The complexity is O(log n) where n = h.Len().
func (h *priorityHeap[E]) Pop() E {
return heap.Pop(&h.s).(E)
}

// Peek returns the minimum element (according to the less function) in the heap.
// Peek panics if the heap is empty.
// The complexity is O(1).
func (h *priorityHeap[E]) Peek() E {
return h.s.s[0]
}

// Len returns the number of elements in the heap.
func (h *priorityHeap[E]) Len() int {
return len(h.s.s)
}

// Slice returns the underlying slice.
// The slice is in heap order; the minimum value is at index 0.
// The heap retains the returned slice, so altering the slice may break
// the invariants and invalidate the heap.
func (h *priorityHeap[E]) Slice() []E {
return h.s.s
}

// SetIndex specifies an optional function to be called
// when updating the position of any heap element within the slice,
// including during the element's initial Push.
//
// SetIndex must be called at most once, before any calls to Push.
//
// When an element is removed from the heap by Pop or Remove,
// the index function is called with the invalid index -1
// to signify that the element is no longer within the slice.
func (h *priorityHeap[E]) SetIndex(f func(E, int)) {
h.s.setIndex = f
}

// Fix re-establishes the heap ordering
// after the element at index i has changed its value.
// Changing the value of the element at index i and then calling Fix
// is equivalent to, but less expensive than,
// calling h.Remove(i) followed by a Push of the new value.
// The complexity is O(log n) where n = h.Len().
// The index for use with Fix is recorded using the function passed to SetIndex.
func (h *priorityHeap[E]) Fix(i int) {
heap.Fix(&h.s, i)
}

// Remove removes and returns the element at index i from the heap.
// The complexity is O(log n) where n = h.Len().
// The index for use with Remove is recorded using the function passed to SetIndex.
func (h *priorityHeap[E]) Remove(i int) E {
return heap.Remove(&h.s, i).(E)
}

// sliceHeap just exists to use the existing heap.Interface as the
// implementation of Heap.
type sliceHeap[E any] struct {
s []E
less func(E, E) bool
setIndex func(E, int)
}

func (s *sliceHeap[E]) Len() int { return len(s.s) }

func (s *sliceHeap[E]) Swap(i, j int) {
s.s[i], s.s[j] = s.s[j], s.s[i]
if s.setIndex != nil {
s.setIndex(s.s[i], i)
s.setIndex(s.s[j], j)
}
}

func (s *sliceHeap[E]) Less(i, j int) bool {
return s.less(s.s[i], s.s[j])
}

func (s *sliceHeap[E]) Push(x interface{}) {
s.s = append(s.s, x.(E))
if s.setIndex != nil {
s.setIndex(s.s[len(s.s)-1], len(s.s)-1)
}
}

func (s *sliceHeap[E]) Pop() interface{} {
e := s.s[len(s.s)-1]
if s.setIndex != nil {
s.setIndex(e, -1)
}
s.s = s.s[:len(s.s)-1]
return e
}

type Item struct {
value string
priority int
index int
}

func (it *Item) Less(it1 *Item) bool {
return it.priority > it1.priority
}

func (it *Item) setIndex(index int) {
it.index = index
}
125 changes: 116 additions & 9 deletions pkg/controller/plugin/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"slices"
"strings"
"sync"
"time"

"github.com/hashicorp/go-hclog"
apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/pipeline"
"google.golang.org/protobuf/types/known/timestamppb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -24,6 +26,7 @@ import (
)

type ObjectWatcher interface {
Reschedule(deadline time.Time)
WatchSecondaryByName(objectName string, objType client.Object, cb WatchCallback)
WatchSecondaryByLabels(objectLabels labels.Set, objType client.Object, cb WatchCallback)
}
Expand Down Expand Up @@ -118,7 +121,7 @@ type WatchCallback func(
obj client.Object,
events []*corev1.Event,
objectWatcher ObjectWatcher,
) *apipipeline.ObjectStatus
) *apipipeline.ObjectStatusInfo

type Watcher interface {
NewCapsuleWatcher(
Expand Down Expand Up @@ -194,7 +197,7 @@ func (w *watcher) watchPrimary(
subWatchers: map[string]*objectWatch{},
}

w.startWatch(f, objType)
w.startWatch(f, objType, nil)

go func() {
w.objectSyncing.Wait()
Expand All @@ -208,12 +211,12 @@ func (w *watcher) watchPrimary(
return nil
}

func (w *watcher) startWatch(f *objectWatch, objType client.Object) {
func (w *watcher) startWatch(f *objectWatch, objType client.Object, parent *apipipeline.ObjectRef) {
w.lock.Lock()

ow, ok := w.objectWatchers[f.key.watcherKey]
if !ok {
ow = newObjectWatcher(w, f.key.watcherKey.namespace, objType, w.cc, w.logger)
ow = newObjectWatcher(w, f.key.watcherKey.namespace, objType, w.cc, parent, w.logger)
w.objectWatchers[f.key.watcherKey] = ow
}

Expand Down Expand Up @@ -307,6 +310,12 @@ func (w *eventListWatcher) Watch(options metav1.ListOptions) (watch.Interface, e
return wi, err
}

type queueObj struct {
deadline time.Time
obj client.Object
index int
}

type objectWatcher struct {
w *watcher
ctx context.Context
Expand All @@ -321,9 +330,13 @@ type objectWatcher struct {
eventCtrl cache.Controller

namespace string
parent *apipipeline.ObjectRef

lock sync.Mutex

objects map[string]*queueObj
queue *priorityHeap[*queueObj]

filters map[*objectWatch]struct{}
}

Expand All @@ -332,6 +345,7 @@ func newObjectWatcher(
namespace string,
obj client.Object,
cc client.WithWatch,
parent *apipipeline.ObjectRef,
logger hclog.Logger,
) *objectWatcher {
gvks, _, err := cc.Scheme().ObjectKinds(obj)
Expand All @@ -354,9 +368,16 @@ func newObjectWatcher(
gvkList: gvkList,
logger: logger,
namespace: namespace,
parent: parent,
filters: map[*objectWatch]struct{}{},
objects: map[string]*queueObj{},
queue: newPriorityHeap(func(a, b *queueObj) bool { return a.deadline.Before(b.deadline) }),
}

ow.queue.SetIndex(func(qo *queueObj, i int) {
qo.index = i
})

w.logger.Info("starting watcher", "gvk", ow.gvkList)

store, ctrl := cache.NewInformer(ow, obj, 0, ow)
Expand Down Expand Up @@ -433,7 +454,27 @@ func (ow *objectWatcher) run(ctx context.Context) {

ow.w.objectSyncing.Done()

<-ctx.Done()
timer := time.NewTicker(250 * time.Millisecond)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
}

ow.lock.Lock()

for ow.queue.Len() > 0 && ow.queue.Peek().deadline.Before(time.Now()) {
p := ow.queue.Pop()
for f := range ow.filters {
ow.handleForFilter(p.obj, f, false)
}
}

ow.lock.Unlock()
}
}

func (ow *objectWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
Expand Down Expand Up @@ -493,6 +534,36 @@ func (ow *objectWatcher) OnAdd(obj interface{}, _ bool) {

ow.lock.Lock()
defer ow.lock.Unlock()

if o, ok := ow.objects[co.GetName()]; ok {
if o.index >= 0 {
// Already scheduled to run in the future.
deadline := time.Now().Add(time.Second)
if o.deadline.After(deadline) {
o.deadline = deadline
ow.queue.Fix(o.index)
}
o.obj = co
return
}

// Deadline is when we can process the next event for this object.
deadline := o.deadline.Add(1 * time.Second)
if deadline.After(time.Now()) {
o.deadline = deadline
ow.queue.Push(o)
return
}

o.deadline = time.Now()
} else {
ow.objects[co.GetName()] = &queueObj{
deadline: time.Now(),
obj: co,
index: -1,
}
}

for f := range ow.filters {
ow.handleForFilter(co, f, false)
}
Expand Down Expand Up @@ -526,6 +597,15 @@ func (ow *objectWatcher) OnDelete(obj interface{}) {

ow.lock.Lock()
defer ow.lock.Unlock()

if q, ok := ow.objects[co.GetName()]; ok {
if q.index >= 0 {
// Already scheduled to run in the future, stop it.
ow.queue.Remove(q.index)
}
delete(ow.objects, co.GetName())
}

for f := range ow.filters {
ow.handleForFilter(co, f, true)
}
Expand Down Expand Up @@ -567,9 +647,29 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov
events = append(events, event)
}
}
os := f.cb(co, events, &res)
os.ObjectRef = ref
f.cw.updated(os)
info := f.cb(co, events, &res)
status := &apipipeline.ObjectStatus{
ObjectRef: ref,
Info: info,
UpdatedAt: timestamppb.Now(),
Parent: ow.parent,
}
f.cw.updated(status)

if !res.deadline.IsZero() {
// Must exist in map!
o := ow.objects[co.GetName()]
if o.index >= 0 {
if res.deadline.Before(o.deadline) {
o.deadline = res.deadline
ow.queue.Fix(o.index)
}
} else {
o.deadline = res.deadline
ow.queue.Push(o)
}
}

}

for key, w := range res.watchers {
Expand All @@ -582,7 +682,7 @@ func (ow *objectWatcher) handleForFilter(co client.Object, f *objectWatch, remov
}

f.subWatchers[key] = sf
go ow.w.startWatch(sf, w.objType)
go ow.w.startWatch(sf, w.objType, ref)
}
}

Expand All @@ -599,6 +699,7 @@ type objectWatcherResult struct {
cc client.WithWatch
namespace string
watchers map[string]objectWatchCandidate
deadline time.Time
}

func (r *objectWatcherResult) watchObject(key objectWatchKey, objType client.Object, cb WatchCallback) {
Expand Down Expand Up @@ -640,6 +741,12 @@ func (r *objectWatcherResult) WatchSecondaryByLabels(objectLabels labels.Set, ob
)
}

func (r *objectWatcherResult) Reschedule(deadline time.Time) {
if r.deadline.IsZero() || deadline.Before(r.deadline) {
r.deadline = deadline
}
}

type objectWatchCandidate struct {
key objectWatchKey
objType client.Object
Expand Down
Loading