Skip to content

Commit

Permalink
Rework how agent watches trigger on deployed resources changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aruiz14 committed Jan 8, 2024
1 parent 84c12ed commit d2dc45a
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions internal/cmd/agent/trigger/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package trigger
import (
"context"
"sync"
"sync/atomic"
"time"

Check failure on line 8 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / fleet-upgrade-test (v1.27.5-k3s1)

"time" imported and not used

Check failure on line 8 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-mc-test

"time" imported and not used

Check failure on line 8 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-test (v1.27.5-k3s1)

"time" imported and not used

Check failure on line 8 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-test (v1.24.17-k3s1)

"time" imported and not used

Check failure on line 8 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / unit-test

"time" imported and not used

"github.com/rancher/fleet/pkg/durations"

Check failure on line 10 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / fleet-upgrade-test (v1.27.5-k3s1)

"github.com/rancher/fleet/pkg/durations" imported and not used

Check failure on line 10 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-mc-test

"github.com/rancher/fleet/pkg/durations" imported and not used

Check failure on line 10 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-test (v1.27.5-k3s1)

"github.com/rancher/fleet/pkg/durations" imported and not used

Check failure on line 10 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / e2e-fleet-test (v1.24.17-k3s1)

"github.com/rancher/fleet/pkg/durations" imported and not used

Check failure on line 10 in internal/cmd/agent/trigger/watcher.go

View workflow job for this annotation

GitHub Actions / unit-test

"github.com/rancher/fleet/pkg/durations" imported and not used
Expand All @@ -26,6 +27,11 @@ type Trigger struct {
triggers map[schema.GroupVersionKind]map[objectset.ObjectKey]map[string]func()
restMapper meta.RESTMapper
client dynamic.Interface

// seenGenerations keeps a registry of the object UIDs and the latest observed generation, if any
// Uses sync.Map for a safe concurrent usage.
// Uses atomic.Int64 as values in order to stick to the first use case described at https://pkg.go.dev/sync#Map
seenGenerations sync.Map
}

func New(ctx context.Context, restMapper meta.RESTMapper, client dynamic.Interface) *Trigger {
Expand Down Expand Up @@ -125,7 +131,27 @@ func (t *Trigger) OnChange(key string, defaultNamespace string, trigger func(),
return nil
}

func (t *Trigger) call(gvk schema.GroupVersionKind, obj metav1.Object) {
func (t *Trigger) call(gvk schema.GroupVersionKind, obj metav1.Object, deleted bool) {
// If this type populates Generation metadata, use it to filter events that didn't modify that field
if currentGeneration := obj.GetGeneration(); currentGeneration != 0 {
uid := obj.GetUID()
if deleted {
t.seenGenerations.Delete(uid)
} else {
var previous *atomic.Int64
if value, ok := t.seenGenerations.Load(uid); ok {
previous = value.(*atomic.Int64)
} else {
previous = new(atomic.Int64)
t.seenGenerations.Store(uid, previous)
}

if previousGeneration := previous.Swap(currentGeneration); previousGeneration == currentGeneration {
return
}
}
}

t.RLock()
defer t.RUnlock()

Expand Down Expand Up @@ -211,7 +237,8 @@ func (w *watcher) Start(ctx context.Context) {
switch event.Type {
// Only trigger for Modified or Deleted objects, ignore the rest
case watch.Modified, watch.Deleted:
w.t.call(w.gvk, obj)
deleted := event.Type == watch.Deleted
w.t.call(w.gvk, obj, deleted)
}
}
}
Expand Down

0 comments on commit d2dc45a

Please sign in to comment.