Skip to content

Commit

Permalink
NETOBSERV-1354: fix concurrent access on watches (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Oct 13, 2023
1 parent 87a34ad commit 15c43ae
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions pkg/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watchers
import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,7 +29,8 @@ var (
type Watcher struct {
ctrl controller.Controller
cache cache.Cache
watched map[string]interface{}
watches map[string]bool
wmut sync.RWMutex
defaultNamespace string
}

Expand All @@ -39,7 +41,7 @@ func NewWatcher(ctrl controller.Controller, cache cache.Cache) *Watcher {
return &Watcher{
ctrl: ctrl,
cache: cache,
watched: make(map[string]interface{}),
watches: make(map[string]bool),
}
}

Expand All @@ -52,16 +54,32 @@ func kindToWatchable(kind flowslatest.MountableType) Watchable {

func (w *Watcher) Reset(namespace string) {
w.defaultNamespace = namespace
w.watched = make(map[string]interface{})
// Reset all registered watches as inactive
w.wmut.Lock()
for k := range w.watches {
w.watches[k] = false
}
w.wmut.Unlock()
}

func key(kind flowslatest.MountableType, name, namespace string) string {
return string(kind) + "/" + namespace + "/" + name
}

func (w *Watcher) setActiveWatch(key string) bool {
w.wmut.Lock()
_, exists := w.watches[key]
w.watches[key] = true
w.wmut.Unlock()
return exists
}

func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj client.Object) error {
if w.isWatched(kind, obj.GetName(), obj.GetNamespace()) {
// This watcher was already registered
k := key(kind, obj.GetName(), obj.GetNamespace())
// Mark as active
exists := w.setActiveWatch(k)
if exists {
// Don't register again
return nil
}
i, err := w.cache.GetInformer(ctx, obj)
Expand All @@ -74,7 +92,12 @@ func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj
err = w.ctrl.Watch(
&source.Informer{Informer: i},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
if w.isWatched(kind, o.GetName(), o.GetNamespace()) {
// The watch might be registered, but inactive
k := key(kind, o.GetName(), o.GetNamespace())
w.wmut.RLock()
active := w.watches[k]
w.wmut.RUnlock()
if active {
// Trigger FlowCollector reconcile
return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}}
}
Expand All @@ -84,17 +107,9 @@ func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj
if err != nil {
return err
}
w.watched[key(kind, obj.GetName(), obj.GetNamespace())] = true
return nil
}

func (w *Watcher) isWatched(kind flowslatest.MountableType, name, namespace string) bool {
if _, ok := w.watched[key(kind, name, namespace)]; ok {
return true
}
return false
}

func (w *Watcher) ProcessMTLSCerts(ctx context.Context, cl helper.Client, tls *flowslatest.ClientTLS, targetNamespace string) (caDigest string, userDigest string, err error) {
if tls.Enable && tls.CACert.Name != "" {
caRef := w.refFromCert(&tls.CACert)
Expand Down

0 comments on commit 15c43ae

Please sign in to comment.