Skip to content

Commit

Permalink
fix duplicate events
Browse files Browse the repository at this point in the history
  • Loading branch information
Cedric McKinnie committed Jul 12, 2023
1 parent 7274220 commit 4a3b012
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/triggers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewService(
opt(s)
}

s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces)
s.informers = newK8sInformers(clientset, dynamicClientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, logger)

return s
}
Expand Down
187 changes: 149 additions & 38 deletions pkg/triggers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package triggers

import (
"context"
"fmt"
"go.uber.org/zap"
"k8s.io/client-go/dynamic"

"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -34,7 +36,7 @@ import (
"github.com/kubeshop/testkube-operator/pkg/validation/tests/v1/testtrigger"
)

type InformersData struct {
type DynamicInformersEntry struct {
informer *informers.GenericInformer
cancel context.CancelFunc
}
Expand All @@ -53,11 +55,10 @@ type k8sInformers struct {
testTriggerInformer testkubeinformerv1.TestTriggerInformer
testSuiteInformer testkubeinformerv3.TestSuiteInformer
testInformer testkubeinformerv3.TestInformer
triggerCRInformers map[string]*InformersData
crInformers map[string]*InformersData
dynamicInformer *DynamicInformerManager
}

func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string) *k8sInformers {
func newK8sInformers(clientset kubernetes.Interface, dynamicClientset dynamic.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, logger *zap.SugaredLogger) *k8sInformers {

var k8sInformers k8sInformers
if len(watcherNamespaces) == 0 {
Expand All @@ -83,8 +84,7 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned
k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers()
k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V3().TestSuites()
k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests()
k8sInformers.triggerCRInformers = make(map[string]*InformersData)
k8sInformers.crInformers = make(map[string]*InformersData)
k8sInformers.dynamicInformer = NewDynamicInformerManager(dynamicClientset, logger)
return &k8sInformers
}

Expand All @@ -111,8 +111,10 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) {
} else {
if !running {
s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID)
s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces)
s.informers = newK8sInformers(s.clientset, s.dynamicClientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.logger)
stopChan = make(chan struct{})
//TODO: clear the initialization
s.informers.dynamicInformer.setStopCh(stopChan)
s.runInformers(ctx, stopChan)
running = true
}
Expand Down Expand Up @@ -627,6 +629,77 @@ func (s *Service) clusterEventEventHandler(ctx context.Context) cache.ResourceEv
}
}

type DynamicInformerManager struct {
dynamicClient dynamic.Interface
logger *zap.SugaredLogger
stop <-chan struct{}
triggerCRInformers map[string]*DynamicInformersEntry
crInformers map[string]*DynamicInformersEntry
}

func NewDynamicInformerManager(client dynamic.Interface, logger *zap.SugaredLogger) *DynamicInformerManager {
return &DynamicInformerManager{
dynamicClient: client,
logger: logger,
triggerCRInformers: make(map[string]*DynamicInformersEntry),
crInformers: make(map[string]*DynamicInformersEntry),
}
}

func (m *DynamicInformerManager) NewInformer(t *testtriggersv1.TestTrigger, addHandler cache.ResourceEventHandlerFuncs, modifyHandler cache.ResourceEventHandlerFuncs, deleteHandler cache.ResourceEventHandlerFuncs) {
if _, found := m.triggerCRInformers[t.Name+t.Namespace]; found {
m.logger.Debugf("trigger service: informer already exist for %s resource in namespace %s", t.Name, t.Namespace)
return
}

gvr := schema.GroupVersionResource{
Group: t.Spec.CustomResource.Group,
Version: t.Spec.CustomResource.Version,
Resource: t.Spec.CustomResource.Resource,
}

df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(m.dynamicClient, 0, t.Spec.ResourceSelector.Namespace, nil)
customResourceInformer := df.ForResource(gvr)
if string(t.Spec.Event) == string(testtrigger.EventCreated) {
customResourceInformer.Informer().AddEventHandler(addHandler)
}
if string(t.Spec.Event) == string(testtrigger.EventModified) {
customResourceInformer.Informer().AddEventHandler(modifyHandler)
}
if string(t.Spec.Event) == string(testtrigger.EventDeleted) {
customResourceInformer.Informer().AddEventHandler(deleteHandler)
}

stopCtx, cancel := context.WithCancel(context.Background())
go m.WaitForTermination(cancel, stopCtx.Done())

ifd := &DynamicInformersEntry{
informer: &customResourceInformer,
cancel: cancel,
}
m.triggerCRInformers[t.Name+t.Namespace] = ifd
m.crInformers[gvr.String()+serializeSelector(t.Spec.ResourceSelector)] = ifd
go customResourceInformer.Informer().Run(stopCtx.Done())
m.logger.Debugf("trigger service: started a new custom resource informers for %s resource and %s test trigger", gvr.String(), t.Name)
}

func (m *DynamicInformerManager) setStopCh(stopChan chan struct{}) {
m.stop = stopChan
}

func (m *DynamicInformerManager) TearDownInformer(t *testtriggersv1.TestTrigger) {
if ifd, found := m.triggerCRInformers[t.Name+t.Namespace]; found {
if ifd.informer != nil && ifd.cancel != nil {
ifd.cancel()
ifd.informer = nil
delete(m.triggerCRInformers, t.Name+t.Namespace)
m.logger.Debugf("trigger service: tearing down for %v resource and %s test trigger", t.Spec.CustomResource, t.Name)
}
m.logger.Debugf("trigger service: ignoring tear down for %v resource and %s test trigger", t.Spec.CustomResource, t.Name)
}
m.logger.Debugf("trigger service: could not found infomer for %v resource and %s test trigger to teardown", t.Spec.CustomResource, t.Name)
}

func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struct{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -642,27 +715,16 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc
s.addTrigger(t)

if t.Spec.Resource == testtrigger.ResourceCustomResource {
df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.dynamicClientset, 0, t.Spec.ResourceSelector.Namespace, nil)

gvr := schema.GroupVersionResource{
Group: t.Spec.CustomResource.Group,
Version: t.Spec.CustomResource.Version,
Resource: t.Spec.CustomResource.Resource,
}
customResourceInformer := df.ForResource(gvr)
customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr))
s.logger.Debugf("trigger service: starting custom resource informers")

stopCtx, cancel := context.WithCancel(context.Background())
go worker(cancel, stop, stopCtx.Done(), s.logger)

ifd := &InformersData{
informer: &customResourceInformer,
cancel: cancel,
}
s.informers.triggerCRInformers[t.Name+t.Namespace] = ifd
s.informers.crInformers[gvr.String()+serializeSelector(t.Spec.ResourceSelector)] = ifd
go customResourceInformer.Informer().Run(stopCtx.Done())
s.informers.dynamicInformer.NewInformer(t,
s.customResourceAddEventHandler(ctx, gvr),
s.customResourceModifyEventHandler(ctx, gvr),
s.customResourceDeleteEventHandler(ctx, gvr),
)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -691,14 +753,7 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc
t.Namespace, t.Name, t.Spec.Resource, t.Spec.Event,
)
s.removeTrigger(t)
if ifd, found := s.informers.triggerCRInformers[t.Name+t.Namespace]; found {
if ifd.informer != nil && ifd.cancel != nil {
ifd.cancel()
ifd.informer = nil
delete(s.informers.triggerCRInformers, t.Name+t.Namespace)
}

}
s.informers.dynamicInformer.TearDownInformer(t)
},
}
}
Expand All @@ -708,27 +763,27 @@ func serializeSelector(selector testtriggersv1.TestTriggerSelector) string {
return selector.Name + selector.Namespace
}

func worker(cancel context.CancelFunc, stop <-chan struct{}, done <-chan struct{}, logger *zap.SugaredLogger) {
func (m *DynamicInformerManager) WaitForTermination(cancel context.CancelFunc, done <-chan struct{}) {
for {
select {
case <-stop:
case <-m.stop:
// Context is done, so we should stop the worker
logger.Debug("Work is stopped")
m.logger.Debug("Work is stopped")
cancel()
return
case <-done:
// Context is done, so we should stop the worker
logger.Debug("Work is done")
m.logger.Debug("Work is done")
return
default:
// Do some work
logger.Debug("Working...")
//m.logger.Debug("Working...")
time.Sleep(1 * time.Second)
}
}
}

func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs {
func (s *Service) customResourceAddEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs {
getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) {
return func() ([]testtriggersv1.TestTriggerCondition, error) {
return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr)
Expand All @@ -741,12 +796,32 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro
s.logger.Errorf("failed to process create customResource event due to it being an unexpected type, received type %+v", obj)
return
}
obsGeneration, _, err := unstructured.NestedInt64(customResource.Object, "status", "observedGeneration")
if err != nil {
s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", customResource.GetKind(), customResource.GetName())
return
}
generation := customResource.GetGeneration()
if generation == obsGeneration {
s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", customResource.GetKind(), customResource.GetName())
return
}
s.logger.Debugf("trigger customResource: watcher component: emiting event: customResource %s/%s created", customResource.GetNamespace(), customResource.GetName())
event := newWatcherEvent(testtrigger.EventCreated, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource)))
if err := s.match(ctx, event); err != nil {
s.logger.Errorf("event matcher returned an error while matching create customResource event: %v", err)
}
},
}
}

func (s *Service) customResourceModifyEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs {
getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) {
return func() ([]testtriggersv1.TestTriggerCondition, error) {
return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr)
}
}
return cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldCustomResource, ok := oldObj.(*unstructured.Unstructured)
if !ok {
Expand All @@ -765,7 +840,23 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro
return
}

if oldCustomResource.GetGeneration() == newCustomResource.GetGeneration() {
newObsGeneration, _, err := unstructured.NestedInt64(newCustomResource.Object, "status", "observedGeneration")
if err != nil {
s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", oldCustomResource.GetKind(), oldCustomResource.GetName())
return
}

oldObsGeneration, _, err := unstructured.NestedInt64(oldCustomResource.Object, "status", "observedGeneration")
if err != nil {
s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", oldCustomResource.GetKind(), oldCustomResource.GetName())
return
}

oldGeneration := oldCustomResource.GetGeneration()
newGeneration := newCustomResource.GetGeneration()
fmt.Printf("old gen: %v, old obs gen: %v, new gen: %v, new obs gen: %v\n", oldGeneration, oldObsGeneration, newGeneration, newObsGeneration)

if newGeneration == newObsGeneration {
s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", oldCustomResource.GetKind(), oldCustomResource.GetName())
return
}
Expand All @@ -778,12 +869,32 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro
s.logger.Errorf("event matcher returned an error while matching update service event: %v", err)
}
},
}
}

func (s *Service) customResourceDeleteEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs {
getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) {
return func() ([]testtriggersv1.TestTriggerCondition, error) {
return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr)
}
}
return cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
customResource, ok := obj.(*unstructured.Unstructured)
if !ok {
s.logger.Errorf("failed to process delete customResource event due to it being an unexpected type, received type %+v", obj)
return
}
obsGeneration, _, err := unstructured.NestedInt64(customResource.Object, "status", "observedGeneration")
if err != nil {
s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", customResource.GetKind(), customResource.GetName())
return
}
generation := customResource.GetGeneration()
if generation == obsGeneration {
s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", customResource.GetKind(), customResource.GetName())
return
}
s.logger.Debugf("trigger service: watcher component: emiting event: customResource %s/%s deleted", customResource.GetNamespace(), customResource.GetName())
event := newWatcherEvent(testtrigger.EventDeleted, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource)))
if err := s.match(ctx, event); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/triggers/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil),
}

leaseChan := make(chan bool)
Expand Down

0 comments on commit 4a3b012

Please sign in to comment.