From c7806070a47095c9e25b1642ef8908264c73e711 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Thu, 29 Jun 2023 14:21:18 -0400 Subject: [PATCH 01/17] Watch Test,TestSources and TestTriggers from alternate namespaces --- cmd/api-server/main.go | 1 + internal/config/config.go | 1 + pkg/triggers/service.go | 9 ++++- pkg/triggers/watcher.go | 51 +++++++++++++++-------- pkg/triggers/watcher_test.go | 78 +++++++++++++++++++++++++++++++++--- 5 files changed, 117 insertions(+), 23 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 6f672e321ee..f5c0d452444 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -430,6 +430,7 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), + triggers.WithWatchAllTestkubeResources(cfg.TestkubeWatchAll), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/internal/config/config.go b/internal/config/config.go index a133df341a0..97a608d46c7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,7 @@ type Config struct { TestkubeCloudWorkerCount int `envconfig:"TESTKUBE_CLOUD_WORKER_COUNT" default:"50"` TestkubeCloudLogStreamWorkerCount int `envconfig:"TESTKUBE_CLOUD_LOG_STREAM_WORKER_COUNT" default:"25"` TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` + TestkubeWatchAll bool `envconfig:"TESTKUBE_WATCH_ALL" default:"false"` GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"` TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index abe7e472407..381315cbf90 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -66,6 +66,7 @@ type Service struct { executorsClient executorsclientv1.Interface testkubeNamespace string watcherNamespaces []string + watchTestkubeCrAllNamespaces bool } type Option func(*Service) @@ -115,7 +116,7 @@ func NewService( opt(s) } - s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) + s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) return s } @@ -182,6 +183,12 @@ func WithWatcherNamespaces(namespaces string) Option { } } +func WithWatchAllTestkubeResources(watchAllTestkubeResources bool) Option { + return func(s *Service) { + s.watchTestkubeCrAllNamespaces = watchAllTestkubeResources + } +} + func (s *Service) Run(ctx context.Context) { leaseChan := make(chan bool) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index ffd1aecf779..fb4071250bf 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -41,13 +41,12 @@ type k8sInformers struct { clusterEventInformers []coreinformerv1.EventInformer configMapInformers []coreinformerv1.ConfigMapInformer - testTriggerInformer testkubeinformerv1.TestTriggerInformer - testSuiteInformer testkubeinformerv2.TestSuiteInformer - testInformer testkubeinformerv3.TestInformer + testTriggerInformers []testkubeinformerv1.TestTriggerInformer + testSuiteInformers []testkubeinformerv2.TestSuiteInformer + testInformers []testkubeinformerv3.TestInformer } -func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, - testkubeNamespace string, watcherNamespaces []string) *k8sInformers { +func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, watchTestkubeCrAllNamespaces bool) *k8sInformers { var k8sInformers k8sInformers if len(watcherNamespaces) == 0 { watcherNamespaces = append(watcherNamespaces, v1.NamespaceAll) @@ -65,11 +64,17 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) } - testkubeInformerFactory := externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) - k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() - k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V2().TestSuites() - k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() + var testkubeInformerFactory externalversions.SharedInformerFactory + if watchTestkubeCrAllNamespaces == true { + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0) + } else { + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) + } + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) return &k8sInformers } @@ -97,7 +102,7 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) { } else { if !running { s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID) - s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) + s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) stopChan = make(chan struct{}) s.runInformers(ctx, stopChan) running = true @@ -145,9 +150,15 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { s.informers.configMapInformers[i].Informer().AddEventHandler(s.configMapEventHandler(ctx)) } - s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler()) - s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler()) - s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler()) + for i := range s.informers.testTriggerInformers { + s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler()) + } + for i := range s.informers.testSuiteInformers { + s.informers.testSuiteInformers[i].Informer().AddEventHandler(s.testSuiteEventHandler()) + } + for i := range s.informers.testInformers { + s.informers.testInformers[i].Informer().AddEventHandler(s.testEventHandler()) + } s.logger.Debugf("trigger service: starting pod informers") for i := range s.informers.podInformers { @@ -190,11 +201,17 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { } s.logger.Debugf("trigger service: starting test trigger informer") - go s.informers.testTriggerInformer.Informer().Run(stop) + for i := range s.informers.testTriggerInformers { + go s.informers.testTriggerInformers[i].Informer().Run(stop) + } s.logger.Debugf("trigger service: starting test suite informer") - go s.informers.testSuiteInformer.Informer().Run(stop) + for i := range s.informers.testSuiteInformers { + go s.informers.testSuiteInformers[i].Informer().Run(stop) + } s.logger.Debugf("trigger service: starting test informer") - go s.informers.testInformer.Informer().Run(stop) + for i := range s.informers.testInformers { + go s.informers.testInformers[i].Informer().Run(stop) + } } func (s *Service) podEventHandler(ctx context.Context) cache.ResourceEventHandlerFuncs { diff --git a/pkg/triggers/watcher_test.go b/pkg/triggers/watcher_test.go index 35bdb674d7a..d19f54d7448 100644 --- a/pkg/triggers/watcher_test.go +++ b/pkg/triggers/watcher_test.go @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), } leaseChan := make(chan bool) @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), } leaseChan := make(chan bool) @@ -129,6 +129,74 @@ func TestService_runWatcher_lease(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.True(t, match, "pod created event should match the test trigger condition") }) + + t.Run("create a test trigger in an alternate namespace for pod created and match event on pod creation", func(t *testing.T) { + t.Parallel() + + clientset := fake.NewSimpleClientset() + testKubeClientset := faketestkube.NewSimpleClientset() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + testNamespace := "testkube" + testAlternateNamespace := "testkube-alternate" + + match := false + testExecutorF := func(ctx context.Context, trigger *testtriggersv1.TestTrigger) error { + assert.NotEqual(t, testNamespace, trigger.Namespace) + assert.Equal(t, "test-trigger-3", trigger.Name) + match = true + return nil + } + s := &Service{ + executor: testExecutorF, + identifier: "testkube-api", + clusterID: "testkube", + triggerStatus: make(map[statusKey]*triggerStatus), + clientset: clientset, + testKubeClientset: testKubeClientset, + logger: log.DefaultLogger, + watchTestkubeCrAllNamespaces: true, + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true), + } + + leaseChan := make(chan bool) + go func() { time.Sleep(50 * time.Millisecond); leaseChan <- true }() + go s.runWatcher(ctx, leaseChan) + + time.Sleep(50 * time.Millisecond) + leaseChan <- true + time.Sleep(50 * time.Millisecond) + + testTrigger := testtriggersv1.TestTrigger{ + ObjectMeta: metav1.ObjectMeta{Namespace: testAlternateNamespace, Name: "test-trigger-3"}, + Spec: testtriggersv1.TestTriggerSpec{ + Resource: "pod", + ResourceSelector: testtriggersv1.TestTriggerSelector{Namespace: testNamespace, Name: "test-pod"}, + Event: "created", + Action: "run", + Execution: "test", + TestSelector: testtriggersv1.TestTriggerSelector{Namespace: testAlternateNamespace, Name: "some-test"}, + }, + } + createdTestTrigger, err := testKubeClientset.TestsV1().TestTriggers(testAlternateNamespace).Create(ctx, &testTrigger, metav1.CreateOptions{}) + assert.NotNil(t, createdTestTrigger) + assert.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + assert.Len(t, s.triggerStatus, 1) + key := newStatusKey(testAlternateNamespace, "test-trigger-3") + assert.Contains(t, s.triggerStatus, key) + + testPod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: "test-pod"}} + _, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, &testPod, metav1.CreateOptions{}) + assert.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + assert.True(t, match, "pod created event should match the test trigger condition") + }) } func TestService_runWatcher_noLease(t *testing.T) { @@ -150,7 +218,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), } leaseChan := make(chan bool) @@ -190,7 +258,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), } leaseChan := make(chan bool) @@ -232,7 +300,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), } leaseChan := make(chan bool) From 9f1ea7aef9a60c80bdf75cd2b867d1cbefeb70e7 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Thu, 29 Jun 2023 14:29:21 -0400 Subject: [PATCH 02/17] Rename variable --- cmd/api-server/main.go | 2 +- pkg/triggers/service.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index f5c0d452444..9dde51673c0 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -430,7 +430,7 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), - triggers.WithWatchAllTestkubeResources(cfg.TestkubeWatchAll), + triggers.WatchTestkubeCrAllNamespaces(cfg.TestkubeWatchAll), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index 381315cbf90..695e3b730ed 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -183,9 +183,9 @@ func WithWatcherNamespaces(namespaces string) Option { } } -func WithWatchAllTestkubeResources(watchAllTestkubeResources bool) Option { +func WatchTestkubeCrAllNamespaces(watchTestkubeCrAllNamespaces bool) Option { return func(s *Service) { - s.watchTestkubeCrAllNamespaces = watchAllTestkubeResources + s.watchTestkubeCrAllNamespaces = watchTestkubeCrAllNamespaces } } From 2465ba7a7456117355d1680a3526877c32d9194d Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Thu, 29 Jun 2023 14:38:09 -0400 Subject: [PATCH 03/17] Run with multiple namespaces --- pkg/triggers/watcher.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index fb4071250bf..dab5bd8581b 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -62,19 +62,14 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.ingressInformers = append(k8sInformers.ingressInformers, f.Networking().V1().Ingresses()) k8sInformers.clusterEventInformers = append(k8sInformers.clusterEventInformers, f.Core().V1().Events()) k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) - } - var testkubeInformerFactory externalversions.SharedInformerFactory - if watchTestkubeCrAllNamespaces == true { - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0) - } else { + var testkubeInformerFactory externalversions.SharedInformerFactory testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) + testKubeClientset, 0, externalversions.WithNamespace(namespace)) + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) return &k8sInformers } From 9e7db4f0760024d135b2d36a19f0f56d303a8340 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Fri, 30 Jun 2023 03:54:03 -0400 Subject: [PATCH 04/17] Optional watch all namespaces --- pkg/triggers/watcher.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index dab5bd8581b..b06d1affd9d 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -63,14 +63,21 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.clusterEventInformers = append(k8sInformers.clusterEventInformers, f.Core().V1().Events()) k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) - var testkubeInformerFactory externalversions.SharedInformerFactory - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(namespace)) - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } + var watchedNamespaces string + if watchTestkubeCrAllNamespaces == true { + watchedNamespaces = v1.NamespaceAll + } else { + watchedNamespaces = testkubeNamespace + } + var testkubeInformerFactory externalversions.SharedInformerFactory + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(watchedNamespaces)) + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) + return &k8sInformers } From d7102f76a42bc511890e6a3b2ec7825a64bb47fa Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Fri, 30 Jun 2023 15:39:19 -0400 Subject: [PATCH 05/17] Support multi-namespace and add TestTrigger support for CustomResources --- cmd/api-server/main.go | 9 +- internal/config/config.go | 1 + pkg/mapper/customresources/mapper.go | 33 ++++++ pkg/triggers/event.go | 22 ++++ pkg/triggers/service.go | 24 ++++- pkg/triggers/watcher.go | 152 ++++++++++++++++++++++----- pkg/triggers/watcher_test.go | 28 ++--- 7 files changed, 224 insertions(+), 45 deletions(-) create mode 100644 pkg/mapper/customresources/mapper.go diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 9dde51673c0..2f7e7a68bf0 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -162,6 +162,11 @@ func main() { ui.ExitOnError("Creating k8s clientset", err) } + dynamicClientSet, err := k8sclient.ConnectToK8sDynamic() + if err != nil { + ui.ExitOnError("Creating k8s clientset", err) + } + k8sCfg, err := k8sclient.GetK8sClientConfig() if err != nil { ui.ExitOnError("Getting k8s client config", err) @@ -418,6 +423,7 @@ func main() { triggerService := triggers.NewService( sched, clientset, + dynamicClientSet, testkubeClientset, testsuitesClient, testsClientV3, @@ -430,7 +436,8 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), - triggers.WatchTestkubeCrAllNamespaces(cfg.TestkubeWatchAll), + triggers.WatchTestkubeAll(cfg.TestkubeWatchAll), + triggers.WatchTestkubeCrNamespaces(cfg.TestkubeCRWatcherNamespaces), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/internal/config/config.go b/internal/config/config.go index 97a608d46c7..ffb11efdbd5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -56,6 +56,7 @@ type Config struct { TestkubeCloudLogStreamWorkerCount int `envconfig:"TESTKUBE_CLOUD_LOG_STREAM_WORKER_COUNT" default:"25"` TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` TestkubeWatchAll bool `envconfig:"TESTKUBE_WATCH_ALL" default:"false"` + TestkubeCRWatcherNamespaces string `envconfig:"TESTKUBE_CR_WATCHER_NAMESPACES" default:""` GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"` TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` diff --git a/pkg/mapper/customresources/mapper.go b/pkg/mapper/customresources/mapper.go new file mode 100644 index 00000000000..21f0fff727e --- /dev/null +++ b/pkg/mapper/customresources/mapper.go @@ -0,0 +1,33 @@ +package customresources + +import ( + "fmt" + "time" + + testtriggersv1 "github.com/kubeshop/testkube-operator/apis/testtriggers/v1" +) + +// MapCRDConditionsToAPI maps CRD conditions to OpenAPI spec TestTriggerConditions +func MapCRDConditionsToAPI(conditions map[string]interface{}, currentTime time.Time) []testtriggersv1.TestTriggerCondition { + // TODO: find a way to generically map conditions to TriggerSpec + var results []testtriggersv1.TestTriggerCondition + for _, condition := range conditions { + c := condition.(map[string]string) + + // TODO: confirm appropriate layout + layout := "2006-01-02T15:04:05Z" + t, err := time.Parse(layout, c["LastTransitionTime"]) + if err != nil { + fmt.Println("Error parsing time:", err) + } + status := c["Status"] + results = append(results, testtriggersv1.TestTriggerCondition{ + Type_: string(c["Type"]), + Status: (*testtriggersv1.TestTriggerConditionStatuses)(&status), + Reason: c["Reason"], + Ttl: int32(currentTime.Sub(t) / time.Second), + }) + } + + return results +} diff --git a/pkg/triggers/event.go b/pkg/triggers/event.go index 2bd6621d8d4..0b462119a8f 100644 --- a/pkg/triggers/event.go +++ b/pkg/triggers/event.go @@ -2,6 +2,10 @@ package triggers import ( "context" + "github.com/kubeshop/testkube/pkg/mapper/customresources" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -125,3 +129,21 @@ func getServiceConditions( return services.MapCRDConditionsToAPI(service.Status.Conditions, time.Now()), nil } + +func getCustomResourceConditions( + ctx context.Context, + dynamicClientset dynamic.Interface, + object metav1.Object, + gvr schema.GroupVersionResource, +) ([]testtriggersv1.TestTriggerCondition, error) { + customresource, err := dynamicClientset.Resource(gvr).Get(ctx, object.GetName(), metav1.GetOptions{}) + if err != nil { + return nil, err + } + + objWithConditions, _, err := unstructured.NestedMap(customresource.Object, "status") + if err != nil { + return nil, err + } + return customresources.MapCRDConditionsToAPI(objWithConditions, time.Now()), nil +} diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index 695e3b730ed..d639d91e87d 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -3,6 +3,7 @@ package triggers import ( "context" "fmt" + "k8s.io/client-go/dynamic" "os" "strings" "time" @@ -56,6 +57,7 @@ type Service struct { triggerStatus map[statusKey]*triggerStatus scheduler *scheduler.Scheduler clientset kubernetes.Interface + dynamicClientset dynamic.Interface testKubeClientset testkubeclientsetv1.Interface testSuitesClient testsuitesclientv2.Interface testsClient testsclientv3.Interface @@ -66,7 +68,8 @@ type Service struct { executorsClient executorsclientv1.Interface testkubeNamespace string watcherNamespaces []string - watchTestkubeCrAllNamespaces bool + watchTestkubeAll bool + watchTestkubeCRNamespaces []string } type Option func(*Service) @@ -74,6 +77,7 @@ type Option func(*Service) func NewService( scheduler *scheduler.Scheduler, clientset kubernetes.Interface, + dynamicClientset dynamic.Interface, testKubeClientset testkubeclientsetv1.Interface, testSuitesClient testsuitesclientv2.Interface, testsClient testsclientv3.Interface, @@ -96,6 +100,7 @@ func NewService( defaultConditionsCheckBackoff: defaultConditionsCheckBackoff, scheduler: scheduler, clientset: clientset, + dynamicClientset: dynamicClientset, testKubeClientset: testKubeClientset, testSuitesClient: testSuitesClient, testsClient: testsClient, @@ -116,7 +121,7 @@ func NewService( opt(s) } - s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) + s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeAll, s.watchTestkubeCRNamespaces) return s } @@ -183,9 +188,20 @@ func WithWatcherNamespaces(namespaces string) Option { } } -func WatchTestkubeCrAllNamespaces(watchTestkubeCrAllNamespaces bool) Option { +func WatchTestkubeAll(watchTestkubeAll bool) Option { return func(s *Service) { - s.watchTestkubeCrAllNamespaces = watchTestkubeCrAllNamespaces + s.watchTestkubeAll = watchTestkubeAll + } +} + +func WatchTestkubeCrNamespaces(namespaces string) Option { + return func(s *Service) { + for _, namespace := range strings.Split(namespaces, ",") { + value := strings.TrimSpace(namespace) + if value != "" { + s.watcherNamespaces = append(s.watcherNamespaces, value) + } + } } } diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index b06d1affd9d..2aff9a4d1f0 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -2,11 +2,14 @@ package triggers import ( "context" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + appsinformerv1 "k8s.io/client-go/informers/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - appsinformerv1 "k8s.io/client-go/informers/apps/v1" coreinformerv1 "k8s.io/client-go/informers/core/v1" networkinginformerv1 "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/kubernetes" @@ -32,27 +35,29 @@ import ( ) type k8sInformers struct { - podInformers []coreinformerv1.PodInformer - deploymentInformers []appsinformerv1.DeploymentInformer - daemonsetInformers []appsinformerv1.DaemonSetInformer - statefulsetInformers []appsinformerv1.StatefulSetInformer - serviceInformers []coreinformerv1.ServiceInformer - ingressInformers []networkinginformerv1.IngressInformer - clusterEventInformers []coreinformerv1.EventInformer - configMapInformers []coreinformerv1.ConfigMapInformer + customResourceInformers []informers.GenericInformer + podInformers []coreinformerv1.PodInformer + deploymentInformers []appsinformerv1.DeploymentInformer + daemonsetInformers []appsinformerv1.DaemonSetInformer + statefulsetInformers []appsinformerv1.StatefulSetInformer + serviceInformers []coreinformerv1.ServiceInformer + ingressInformers []networkinginformerv1.IngressInformer + clusterEventInformers []coreinformerv1.EventInformer + configMapInformers []coreinformerv1.ConfigMapInformer testTriggerInformers []testkubeinformerv1.TestTriggerInformer testSuiteInformers []testkubeinformerv2.TestSuiteInformer testInformers []testkubeinformerv3.TestInformer } -func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, watchTestkubeCrAllNamespaces bool) *k8sInformers { +func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watchK8sResourceNamespaces []string, watchTestkubeAll bool, watchTestkubeCrNamespaces []string) *k8sInformers { + var k8sInformers k8sInformers - if len(watcherNamespaces) == 0 { - watcherNamespaces = append(watcherNamespaces, v1.NamespaceAll) + if len(watchK8sResourceNamespaces) == 0 { + watchK8sResourceNamespaces = append(watchK8sResourceNamespaces, v1.NamespaceAll) } - for _, namespace := range watcherNamespaces { + for _, namespace := range watchK8sResourceNamespaces { f := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) k8sInformers.podInformers = append(k8sInformers.podInformers, f.Core().V1().Pods()) k8sInformers.deploymentInformers = append(k8sInformers.deploymentInformers, f.Apps().V1().Deployments()) @@ -65,18 +70,22 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned } - var watchedNamespaces string - if watchTestkubeCrAllNamespaces == true { - watchedNamespaces = v1.NamespaceAll + if watchTestkubeAll == true { + watchTestkubeCrNamespaces = append(watchTestkubeCrNamespaces, v1.NamespaceAll) } else { - watchedNamespaces = testkubeNamespace + if len(watchTestkubeCrNamespaces) == 0 { + watchTestkubeCrNamespaces = append(watchTestkubeCrNamespaces, testkubeNamespace) + } + } + + for _, namespace := range watchTestkubeCrNamespaces { + var testkubeInformerFactory externalversions.SharedInformerFactory + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(namespace)) + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } - var testkubeInformerFactory externalversions.SharedInformerFactory - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(watchedNamespaces)) - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) return &k8sInformers } @@ -104,7 +113,7 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) { } else { if !running { s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID) - s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) + s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeAll, s.watchTestkubeCRNamespaces) stopChan = make(chan struct{}) s.runInformers(ctx, stopChan) running = true @@ -153,7 +162,7 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { } for i := range s.informers.testTriggerInformers { - s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler()) + s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler(ctx, stop)) } for i := range s.informers.testSuiteInformers { s.informers.testSuiteInformers[i].Informer().AddEventHandler(s.testSuiteEventHandler()) @@ -632,7 +641,7 @@ func (s *Service) clusterEventEventHandler(ctx context.Context) cache.ResourceEv } } -func (s *Service) testTriggerEventHandler() cache.ResourceEventHandlerFuncs { +func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struct{}) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { t, ok := obj.(*testtriggersv1.TestTrigger) @@ -645,6 +654,23 @@ func (s *Service) testTriggerEventHandler() cache.ResourceEventHandlerFuncs { t.Namespace, t.Name, t.Spec.Resource, t.Spec.Event, ) s.addTrigger(t) + + df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.dynamicClientset, 0, v1.NamespaceAll, nil) + + //gvr := schema.GroupVersionResource{ + // Group: "helm.toolkit.fluxcd.io", + // Version: "v2beta1", + // Resource: "helmreleases", + //} + gvr := schema.GroupVersionResource{ + Group: t.Spec.CustomResource.Group, + Version: t.Spec.CustomResource.Version, + Resource: t.Spec.CustomResource.Resource, + } + customResourceInformer := df.ForResource(gvr) + customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr)) + s.logger.Debugf("trigger service: starting custom resource informers") + go customResourceInformer.Informer().Run(stop) }, UpdateFunc: func(oldObj, newObj interface{}) { t, ok := newObj.(*testtriggersv1.TestTrigger) @@ -676,6 +702,80 @@ func (s *Service) testTriggerEventHandler() cache.ResourceEventHandlerFuncs { } } +func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { + getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) { + return func() ([]testtriggersv1.TestTriggerCondition, error) { + return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr) + } + } + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + customResource, ok := obj.(*unstructured.Unstructured) + if !ok { + s.logger.Errorf("failed to process create customResource event due to it being an unexpected type, received type %+v", obj) + return + } + if inPast(customResource.GetCreationTimestamp().Time, s.watchFromDate) { + s.logger.Debugf( + "trigger customResource: watcher component: no-op create trigger: customResource %s/%s was created in the past", + customResource.GetNamespace(), customResource.GetName(), + ) + return + } + s.logger.Debugf("trigger customResource: watcher component: emiting event: customResource %s/%s created", customResource.GetNamespace(), customResource.GetName()) + // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource + event := newWatcherEvent(testtrigger.EventCreated, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) + if err := s.match(ctx, event); err != nil { + s.logger.Errorf("event matcher returned an error while matching create customResource event: %v", err) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldCustomResource, ok := oldObj.(*unstructured.Unstructured) + if !ok { + s.logger.Errorf( + "failed to process update service event for old object due to it being an unexpected type, received type %+v", + oldCustomResource, + ) + return + } + newCustomResource, ok := newObj.(*unstructured.Unstructured) + if !ok { + s.logger.Errorf( + "failed to process update service event for new object due to it being an unexpected type, received type %+v", + newCustomResource, + ) + return + } + + if newCustomResource.GetGeneration() == newCustomResource.GetGeneration() { + s.logger.Debugf("trigger service: watcher component: no-op update trigger: service specs are equal") + } + s.logger.Debugf( + "trigger service: watcher component: emiting event: service %s/%s updated", + newCustomResource.GetNamespace(), newCustomResource.GetName(), + ) + // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource + event := newWatcherEvent(testtrigger.EventModified, newCustomResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(newCustomResource))) + if err := s.match(ctx, event); err != nil { + s.logger.Errorf("event matcher returned an error while matching update service event: %v", err) + } + }, + DeleteFunc: func(obj interface{}) { + customResource, ok := obj.(*unstructured.Unstructured) + if !ok { + s.logger.Errorf("failed to process delete customResource event due to it being an unexpected type, received type %+v", obj) + return + } + s.logger.Debugf("trigger service: watcher component: emiting event: customResource %s/%s deleted", customResource.GetNamespace(), customResource.GetName()) + // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource + event := newWatcherEvent(testtrigger.EventDeleted, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) + if err := s.match(ctx, event); err != nil { + s.logger.Errorf("event matcher returned an error while matching delete customResource event: %v", err) + } + }, + } +} + func (s *Service) testSuiteEventHandler() cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/pkg/triggers/watcher_test.go b/pkg/triggers/watcher_test.go index d19f54d7448..d55e5cbb980 100644 --- a/pkg/triggers/watcher_test.go +++ b/pkg/triggers/watcher_test.go @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), } leaseChan := make(chan bool) @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), } leaseChan := make(chan bool) @@ -150,15 +150,15 @@ func TestService_runWatcher_lease(t *testing.T) { return nil } s := &Service{ - executor: testExecutorF, - identifier: "testkube-api", - clusterID: "testkube", - triggerStatus: make(map[statusKey]*triggerStatus), - clientset: clientset, - testKubeClientset: testKubeClientset, - logger: log.DefaultLogger, - watchTestkubeCrAllNamespaces: true, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true), + executor: testExecutorF, + identifier: "testkube-api", + clusterID: "testkube", + triggerStatus: make(map[statusKey]*triggerStatus), + clientset: clientset, + testKubeClientset: testKubeClientset, + logger: log.DefaultLogger, + watchTestkubeAll: true, + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true, []string{}), } leaseChan := make(chan bool) @@ -218,7 +218,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), } leaseChan := make(chan bool) @@ -258,7 +258,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), } leaseChan := make(chan bool) @@ -300,7 +300,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), } leaseChan := make(chan bool) From 021946ae33bc6f70c8bfe62677911e1c53fae95b Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Thu, 6 Jul 2023 07:26:05 -0400 Subject: [PATCH 06/17] Add dynamic informers to watch custom resources --- pkg/mapper/customresources/mapper.go | 20 ++++++++---- pkg/triggers/event.go | 4 +-- pkg/triggers/service_test.go | 4 +++ pkg/triggers/watcher.go | 49 ++++++++++------------------ 4 files changed, 37 insertions(+), 40 deletions(-) diff --git a/pkg/mapper/customresources/mapper.go b/pkg/mapper/customresources/mapper.go index 21f0fff727e..88ab44beaa0 100644 --- a/pkg/mapper/customresources/mapper.go +++ b/pkg/mapper/customresources/mapper.go @@ -8,23 +8,29 @@ import ( ) // MapCRDConditionsToAPI maps CRD conditions to OpenAPI spec TestTriggerConditions -func MapCRDConditionsToAPI(conditions map[string]interface{}, currentTime time.Time) []testtriggersv1.TestTriggerCondition { +func MapCRDConditionsToAPI(conditions []interface{}, currentTime time.Time) []testtriggersv1.TestTriggerCondition { // TODO: find a way to generically map conditions to TriggerSpec var results []testtriggersv1.TestTriggerCondition for _, condition := range conditions { - c := condition.(map[string]string) + c := make(map[string]string) + for key, value := range condition.(map[string]interface{}) { + strKey := fmt.Sprintf("%v", key) + strValue := fmt.Sprintf("%v", value) + + c[strKey] = strValue + } // TODO: confirm appropriate layout - layout := "2006-01-02T15:04:05Z" - t, err := time.Parse(layout, c["LastTransitionTime"]) + layout := time.RFC3339 + t, err := time.Parse(layout, c["lastTransitionTime"]) if err != nil { fmt.Println("Error parsing time:", err) } - status := c["Status"] + status := c["status"] results = append(results, testtriggersv1.TestTriggerCondition{ - Type_: string(c["Type"]), + Type_: string(c["type"]), Status: (*testtriggersv1.TestTriggerConditionStatuses)(&status), - Reason: c["Reason"], + Reason: c["reason"], Ttl: int32(currentTime.Sub(t) / time.Second), }) } diff --git a/pkg/triggers/event.go b/pkg/triggers/event.go index 0b462119a8f..de66dfcaf06 100644 --- a/pkg/triggers/event.go +++ b/pkg/triggers/event.go @@ -136,12 +136,12 @@ func getCustomResourceConditions( object metav1.Object, gvr schema.GroupVersionResource, ) ([]testtriggersv1.TestTriggerCondition, error) { - customresource, err := dynamicClientset.Resource(gvr).Get(ctx, object.GetName(), metav1.GetOptions{}) + customresource, err := dynamicClientset.Resource(gvr).Namespace(object.GetNamespace()).Get(ctx, object.GetName(), metav1.GetOptions{}) if err != nil { return nil, err } - objWithConditions, _, err := unstructured.NestedMap(customresource.Object, "status") + objWithConditions, _, err := unstructured.NestedSlice(customresource.Object, "status", "conditions") if err != nil { return nil, err } diff --git a/pkg/triggers/service_test.go b/pkg/triggers/service_test.go index 0e700ca407b..21db9e68151 100644 --- a/pkg/triggers/service_test.go +++ b/pkg/triggers/service_test.go @@ -2,6 +2,8 @@ package triggers import ( "context" + "k8s.io/apimachinery/pkg/runtime" + fake2 "k8s.io/client-go/dynamic/fake" "testing" "time" @@ -139,9 +141,11 @@ func TestService_Run(t *testing.T) { fakeTestkubeClientset := faketestkube.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset() + fakeDynamicClientset := fake2.NewSimpleDynamicClient(runtime.NewScheme(), nil) s := NewService( sched, fakeClientset, + fakeDynamicClientset, fakeTestkubeClientset, mockTestSuitesClient, mockTestsClient, diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index 2aff9a4d1f0..bb0fd89221a 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -16,7 +16,6 @@ import ( "github.com/kubeshop/testkube-operator/pkg/clientset/versioned" testkubeinformerv1 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v1" - testkubeinformerv2 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v2" testkubeinformerv3 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v3" networkingv1 "k8s.io/api/networking/v1" @@ -46,7 +45,7 @@ type k8sInformers struct { configMapInformers []coreinformerv1.ConfigMapInformer testTriggerInformers []testkubeinformerv1.TestTriggerInformer - testSuiteInformers []testkubeinformerv2.TestSuiteInformer + testSuiteInformers []testkubeinformerv3.TestSuiteInformer testInformers []testkubeinformerv3.TestInformer } @@ -83,7 +82,7 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( testKubeClientset, 0, externalversions.WithNamespace(namespace)) k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V3().TestSuites()) k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } @@ -655,22 +654,19 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc ) s.addTrigger(t) - df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.dynamicClientset, 0, v1.NamespaceAll, nil) - - //gvr := schema.GroupVersionResource{ - // Group: "helm.toolkit.fluxcd.io", - // Version: "v2beta1", - // Resource: "helmreleases", - //} - gvr := schema.GroupVersionResource{ - Group: t.Spec.CustomResource.Group, - Version: t.Spec.CustomResource.Version, - Resource: t.Spec.CustomResource.Resource, - } - customResourceInformer := df.ForResource(gvr) - customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr)) - s.logger.Debugf("trigger service: starting custom resource informers") - go customResourceInformer.Informer().Run(stop) + if t.Spec.Resource == testtrigger.ResourceCustomResource { + df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.dynamicClientset, 0, t.Spec.ResourceSelector.Namespace, nil) + + gvr := schema.GroupVersionResource{ + Group: t.Spec.CustomResource.Group, + Version: t.Spec.CustomResource.Version, + Resource: t.Spec.CustomResource.Resource, + } + customResourceInformer := df.ForResource(gvr) + customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr)) + s.logger.Debugf("trigger service: starting custom resource informers") + go customResourceInformer.Informer().Run(stop) + } }, UpdateFunc: func(oldObj, newObj interface{}) { t, ok := newObj.(*testtriggersv1.TestTrigger) @@ -715,15 +711,7 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro s.logger.Errorf("failed to process create customResource event due to it being an unexpected type, received type %+v", obj) return } - if inPast(customResource.GetCreationTimestamp().Time, s.watchFromDate) { - s.logger.Debugf( - "trigger customResource: watcher component: no-op create trigger: customResource %s/%s was created in the past", - customResource.GetNamespace(), customResource.GetName(), - ) - return - } s.logger.Debugf("trigger customResource: watcher component: emiting event: customResource %s/%s created", customResource.GetNamespace(), customResource.GetName()) - // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource event := newWatcherEvent(testtrigger.EventCreated, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) if err := s.match(ctx, event); err != nil { s.logger.Errorf("event matcher returned an error while matching create customResource event: %v", err) @@ -747,14 +735,14 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro return } - if newCustomResource.GetGeneration() == newCustomResource.GetGeneration() { - s.logger.Debugf("trigger service: watcher component: no-op update trigger: service specs are equal") + if oldCustomResource.GetGeneration() == newCustomResource.GetGeneration() { + s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", oldCustomResource.GetKind(), oldCustomResource.GetName()) + return } s.logger.Debugf( "trigger service: watcher component: emiting event: service %s/%s updated", newCustomResource.GetNamespace(), newCustomResource.GetName(), ) - // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource event := newWatcherEvent(testtrigger.EventModified, newCustomResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(newCustomResource))) if err := s.match(ctx, event); err != nil { s.logger.Errorf("event matcher returned an error while matching update service event: %v", err) @@ -767,7 +755,6 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro return } s.logger.Debugf("trigger service: watcher component: emiting event: customResource %s/%s deleted", customResource.GetNamespace(), customResource.GetName()) - // TODO: testtrigger.ResourceService -> testtrigger.ResourceCustomResource event := newWatcherEvent(testtrigger.EventDeleted, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) if err := s.match(ctx, event); err != nil { s.logger.Errorf("event matcher returned an error while matching delete customResource event: %v", err) From 8c6b7c133f7d70a9c230c0bccd4637978c8e99f7 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Fri, 7 Jul 2023 18:41:12 -0400 Subject: [PATCH 07/17] Add nats tunnel and port-forward --- Makefile | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Makefile b/Makefile index b1969be09cf..a45a1b6f1cf 100644 --- a/Makefile +++ b/Makefile @@ -222,6 +222,14 @@ video: ffmpeg -y -r 30 -f image2pipe -vcodec ppm -i stream.out -b 65536K movie.mp4 +nats-tunnel: + kubectl run nats-tunnel-$$USER \ + -it --image=alpine/socat \ + --tty --rm --expose=true --port=4222 \ + tcp-listen:4222,fork,reuseaddr tcp-connect:testkube-nats:4222 + +port-forward-nats: + kubectl port-forward svc/nats-tunnel-$$USER 4222:4222 -ntestkube port-forward-minio: kubectl port-forward svc/testkube-minio-service-testkube 9090:9090 -ntestkube From 322b0fba2442a87fdce1f046b64b47e952be887d Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Fri, 7 Jul 2023 18:42:16 -0400 Subject: [PATCH 08/17] Add GC for informers --- pkg/triggers/watcher.go | 60 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index bb0fd89221a..383e165f1ce 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -2,12 +2,13 @@ package triggers import ( "context" + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" appsinformerv1 "k8s.io/client-go/informers/apps/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformerv1 "k8s.io/client-go/informers/core/v1" @@ -33,6 +34,11 @@ import ( "github.com/kubeshop/testkube-operator/pkg/validation/tests/v1/testtrigger" ) +type InformersData struct { + informer *informers.GenericInformer + cancel context.CancelFunc +} + type k8sInformers struct { customResourceInformers []informers.GenericInformer podInformers []coreinformerv1.PodInformer @@ -47,6 +53,8 @@ type k8sInformers struct { testTriggerInformers []testkubeinformerv1.TestTriggerInformer testSuiteInformers []testkubeinformerv3.TestSuiteInformer testInformers []testkubeinformerv3.TestInformer + triggerCRInformers map[string]*InformersData + crInformers map[string]*InformersData } func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watchK8sResourceNamespaces []string, watchTestkubeAll bool, watchTestkubeCrNamespaces []string) *k8sInformers { @@ -85,7 +93,8 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V3().TestSuites()) k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } - + k8sInformers.triggerCRInformers = make(map[string]*InformersData) + k8sInformers.crInformers = make(map[string]*InformersData) return &k8sInformers } @@ -665,7 +674,17 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc customResourceInformer := df.ForResource(gvr) customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr)) s.logger.Debugf("trigger service: starting custom resource informers") - go customResourceInformer.Informer().Run(stop) + + stopCtx, cancel := context.WithCancel(context.Background()) + go worker(cancel, stop, stopCtx.Done()) + + ifd := &InformersData{ + informer: &customResourceInformer, + cancel: cancel, + } + s.informers.triggerCRInformers[t.Name+t.Namespace] = ifd + s.informers.crInformers[gvr.String()+serializeSelector(t.Spec.ResourceSelector)] = ifd + go customResourceInformer.Informer().Run(stopCtx.Done()) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -694,10 +713,43 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc t.Namespace, t.Name, t.Spec.Resource, t.Spec.Event, ) s.removeTrigger(t) + if ifd, found := s.informers.triggerCRInformers[t.Name+t.Namespace]; found { + if ifd.informer != nil && ifd.cancel != nil { + ifd.cancel() + ifd.informer = nil + delete(s.informers.triggerCRInformers, t.Name+t.Namespace) + } + + } }, } } +// TODO: fix the formation +func serializeSelector(selector testtriggersv1.TestTriggerSelector) string { + return selector.Name + selector.Namespace +} + +func worker(cancel context.CancelFunc, stop <-chan struct{}, done <-chan struct{}) { + for { + select { + case <-stop: + // Context is done, so we should stop the worker + fmt.Println("Work is stopped") + cancel() + return + case <-done: + // Context is done, so we should stop the worker + fmt.Println("Work is done") + return + default: + // Do some work + fmt.Println("Working...") + time.Sleep(1 * time.Second) + } + } +} + func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) { return func() ([]testtriggersv1.TestTriggerCondition, error) { From 5911ecffdf6ad3ed62563d441d72ea61e7fabc85 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Mon, 10 Jul 2023 16:59:07 -0400 Subject: [PATCH 09/17] Revert "Optional watch all namespaces" This reverts commit 9e7db4f0760024d135b2d36a19f0f56d303a8340. --- pkg/triggers/watcher.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index b06d1affd9d..dab5bd8581b 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -63,21 +63,14 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.clusterEventInformers = append(k8sInformers.clusterEventInformers, f.Core().V1().Events()) k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) + var testkubeInformerFactory externalversions.SharedInformerFactory + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(namespace)) + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) } - var watchedNamespaces string - if watchTestkubeCrAllNamespaces == true { - watchedNamespaces = v1.NamespaceAll - } else { - watchedNamespaces = testkubeNamespace - } - var testkubeInformerFactory externalversions.SharedInformerFactory - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(watchedNamespaces)) - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) - return &k8sInformers } From c77aab3587019da2a660f85ab3c7371ef1feed16 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Mon, 10 Jul 2023 16:59:07 -0400 Subject: [PATCH 10/17] Revert "Run with multiple namespaces" This reverts commit 2465ba7a7456117355d1680a3526877c32d9194d. --- pkg/triggers/watcher.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index dab5bd8581b..fb4071250bf 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -62,14 +62,19 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.ingressInformers = append(k8sInformers.ingressInformers, f.Networking().V1().Ingresses()) k8sInformers.clusterEventInformers = append(k8sInformers.clusterEventInformers, f.Core().V1().Events()) k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) + } - var testkubeInformerFactory externalversions.SharedInformerFactory + var testkubeInformerFactory externalversions.SharedInformerFactory + if watchTestkubeCrAllNamespaces == true { + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0) + } else { testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(namespace)) - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) + testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) } + k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) + k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) + k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) return &k8sInformers } From b86e5116ee4e10e4c60ac54dc11367c12b4f02df Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Mon, 10 Jul 2023 16:59:07 -0400 Subject: [PATCH 11/17] Revert "Rename variable" This reverts commit 9f1ea7aef9a60c80bdf75cd2b867d1cbefeb70e7. --- cmd/api-server/main.go | 2 +- pkg/triggers/service.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 9dde51673c0..f5c0d452444 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -430,7 +430,7 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), - triggers.WatchTestkubeCrAllNamespaces(cfg.TestkubeWatchAll), + triggers.WithWatchAllTestkubeResources(cfg.TestkubeWatchAll), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index 695e3b730ed..381315cbf90 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -183,9 +183,9 @@ func WithWatcherNamespaces(namespaces string) Option { } } -func WatchTestkubeCrAllNamespaces(watchTestkubeCrAllNamespaces bool) Option { +func WithWatchAllTestkubeResources(watchAllTestkubeResources bool) Option { return func(s *Service) { - s.watchTestkubeCrAllNamespaces = watchTestkubeCrAllNamespaces + s.watchTestkubeCrAllNamespaces = watchAllTestkubeResources } } From 3f753d2307f88381756028947883e75e729fec7e Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Mon, 10 Jul 2023 16:59:07 -0400 Subject: [PATCH 12/17] Revert "Watch Test,TestSources and TestTriggers from alternate namespaces" This reverts commit c7806070a47095c9e25b1642ef8908264c73e711. --- cmd/api-server/main.go | 1 - internal/config/config.go | 1 - pkg/triggers/service.go | 9 +---- pkg/triggers/watcher.go | 51 ++++++++--------------- pkg/triggers/watcher_test.go | 78 +++--------------------------------- 5 files changed, 23 insertions(+), 117 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index f5c0d452444..6f672e321ee 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -430,7 +430,6 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), - triggers.WithWatchAllTestkubeResources(cfg.TestkubeWatchAll), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/internal/config/config.go b/internal/config/config.go index 97a608d46c7..a133df341a0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,7 +55,6 @@ type Config struct { TestkubeCloudWorkerCount int `envconfig:"TESTKUBE_CLOUD_WORKER_COUNT" default:"50"` TestkubeCloudLogStreamWorkerCount int `envconfig:"TESTKUBE_CLOUD_LOG_STREAM_WORKER_COUNT" default:"25"` TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` - TestkubeWatchAll bool `envconfig:"TESTKUBE_WATCH_ALL" default:"false"` GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"` TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index 381315cbf90..abe7e472407 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -66,7 +66,6 @@ type Service struct { executorsClient executorsclientv1.Interface testkubeNamespace string watcherNamespaces []string - watchTestkubeCrAllNamespaces bool } type Option func(*Service) @@ -116,7 +115,7 @@ func NewService( opt(s) } - s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) + s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) return s } @@ -183,12 +182,6 @@ func WithWatcherNamespaces(namespaces string) Option { } } -func WithWatchAllTestkubeResources(watchAllTestkubeResources bool) Option { - return func(s *Service) { - s.watchTestkubeCrAllNamespaces = watchAllTestkubeResources - } -} - func (s *Service) Run(ctx context.Context) { leaseChan := make(chan bool) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index fb4071250bf..ffd1aecf779 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -41,12 +41,13 @@ type k8sInformers struct { clusterEventInformers []coreinformerv1.EventInformer configMapInformers []coreinformerv1.ConfigMapInformer - testTriggerInformers []testkubeinformerv1.TestTriggerInformer - testSuiteInformers []testkubeinformerv2.TestSuiteInformer - testInformers []testkubeinformerv3.TestInformer + testTriggerInformer testkubeinformerv1.TestTriggerInformer + testSuiteInformer testkubeinformerv2.TestSuiteInformer + testInformer testkubeinformerv3.TestInformer } -func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, watchTestkubeCrAllNamespaces bool) *k8sInformers { +func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, + testkubeNamespace string, watcherNamespaces []string) *k8sInformers { var k8sInformers k8sInformers if len(watcherNamespaces) == 0 { watcherNamespaces = append(watcherNamespaces, v1.NamespaceAll) @@ -64,17 +65,11 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps()) } - var testkubeInformerFactory externalversions.SharedInformerFactory - if watchTestkubeCrAllNamespaces == true { - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0) - } else { - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) - } - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) + testkubeInformerFactory := externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) + k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() + k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V2().TestSuites() + k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() return &k8sInformers } @@ -102,7 +97,7 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) { } else { if !running { s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID) - s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces) + s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) stopChan = make(chan struct{}) s.runInformers(ctx, stopChan) running = true @@ -150,15 +145,9 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { s.informers.configMapInformers[i].Informer().AddEventHandler(s.configMapEventHandler(ctx)) } - for i := range s.informers.testTriggerInformers { - s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler()) - } - for i := range s.informers.testSuiteInformers { - s.informers.testSuiteInformers[i].Informer().AddEventHandler(s.testSuiteEventHandler()) - } - for i := range s.informers.testInformers { - s.informers.testInformers[i].Informer().AddEventHandler(s.testEventHandler()) - } + s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler()) + s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler()) + s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler()) s.logger.Debugf("trigger service: starting pod informers") for i := range s.informers.podInformers { @@ -201,17 +190,11 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { } s.logger.Debugf("trigger service: starting test trigger informer") - for i := range s.informers.testTriggerInformers { - go s.informers.testTriggerInformers[i].Informer().Run(stop) - } + go s.informers.testTriggerInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test suite informer") - for i := range s.informers.testSuiteInformers { - go s.informers.testSuiteInformers[i].Informer().Run(stop) - } + go s.informers.testSuiteInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test informer") - for i := range s.informers.testInformers { - go s.informers.testInformers[i].Informer().Run(stop) - } + go s.informers.testInformer.Informer().Run(stop) } func (s *Service) podEventHandler(ctx context.Context) cache.ResourceEventHandlerFuncs { diff --git a/pkg/triggers/watcher_test.go b/pkg/triggers/watcher_test.go index d19f54d7448..35bdb674d7a 100644 --- a/pkg/triggers/watcher_test.go +++ b/pkg/triggers/watcher_test.go @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -129,74 +129,6 @@ func TestService_runWatcher_lease(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.True(t, match, "pod created event should match the test trigger condition") }) - - t.Run("create a test trigger in an alternate namespace for pod created and match event on pod creation", func(t *testing.T) { - t.Parallel() - - clientset := fake.NewSimpleClientset() - testKubeClientset := faketestkube.NewSimpleClientset() - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - testNamespace := "testkube" - testAlternateNamespace := "testkube-alternate" - - match := false - testExecutorF := func(ctx context.Context, trigger *testtriggersv1.TestTrigger) error { - assert.NotEqual(t, testNamespace, trigger.Namespace) - assert.Equal(t, "test-trigger-3", trigger.Name) - match = true - return nil - } - s := &Service{ - executor: testExecutorF, - identifier: "testkube-api", - clusterID: "testkube", - triggerStatus: make(map[statusKey]*triggerStatus), - clientset: clientset, - testKubeClientset: testKubeClientset, - logger: log.DefaultLogger, - watchTestkubeCrAllNamespaces: true, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true), - } - - leaseChan := make(chan bool) - go func() { time.Sleep(50 * time.Millisecond); leaseChan <- true }() - go s.runWatcher(ctx, leaseChan) - - time.Sleep(50 * time.Millisecond) - leaseChan <- true - time.Sleep(50 * time.Millisecond) - - testTrigger := testtriggersv1.TestTrigger{ - ObjectMeta: metav1.ObjectMeta{Namespace: testAlternateNamespace, Name: "test-trigger-3"}, - Spec: testtriggersv1.TestTriggerSpec{ - Resource: "pod", - ResourceSelector: testtriggersv1.TestTriggerSelector{Namespace: testNamespace, Name: "test-pod"}, - Event: "created", - Action: "run", - Execution: "test", - TestSelector: testtriggersv1.TestTriggerSelector{Namespace: testAlternateNamespace, Name: "some-test"}, - }, - } - createdTestTrigger, err := testKubeClientset.TestsV1().TestTriggers(testAlternateNamespace).Create(ctx, &testTrigger, metav1.CreateOptions{}) - assert.NotNil(t, createdTestTrigger) - assert.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - assert.Len(t, s.triggerStatus, 1) - key := newStatusKey(testAlternateNamespace, "test-trigger-3") - assert.Contains(t, s.triggerStatus, key) - - testPod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: "test-pod"}} - _, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, &testPod, metav1.CreateOptions{}) - assert.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - assert.True(t, match, "pod created event should match the test trigger condition") - }) } func TestService_runWatcher_noLease(t *testing.T) { @@ -218,7 +150,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -258,7 +190,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -300,7 +232,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) From b418f3c6143942821338d2a29266227cfc2a7da9 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Fri, 30 Jun 2023 15:39:19 -0400 Subject: [PATCH 13/17] Support custom resources --- pkg/triggers/service.go | 2 +- pkg/triggers/service_test.go | 5 +-- pkg/triggers/watcher.go | 58 +++++++++------------------ pkg/triggers/watcher_test.go | 77 +++--------------------------------- 4 files changed, 25 insertions(+), 117 deletions(-) diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index d639d91e87d..982caa46c48 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -121,7 +121,7 @@ func NewService( opt(s) } - s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeAll, s.watchTestkubeCRNamespaces) + s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) return s } diff --git a/pkg/triggers/service_test.go b/pkg/triggers/service_test.go index 21db9e68151..9de7df3f7c7 100644 --- a/pkg/triggers/service_test.go +++ b/pkg/triggers/service_test.go @@ -2,8 +2,6 @@ package triggers import ( "context" - "k8s.io/apimachinery/pkg/runtime" - fake2 "k8s.io/client-go/dynamic/fake" "testing" "time" @@ -141,11 +139,10 @@ func TestService_Run(t *testing.T) { fakeTestkubeClientset := faketestkube.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset() - fakeDynamicClientset := fake2.NewSimpleDynamicClient(runtime.NewScheme(), nil) s := NewService( sched, fakeClientset, - fakeDynamicClientset, + nil, fakeTestkubeClientset, mockTestSuitesClient, mockTestsClient, diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index 383e165f1ce..423ff8ceb49 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -50,21 +50,21 @@ type k8sInformers struct { clusterEventInformers []coreinformerv1.EventInformer configMapInformers []coreinformerv1.ConfigMapInformer - testTriggerInformers []testkubeinformerv1.TestTriggerInformer - testSuiteInformers []testkubeinformerv3.TestSuiteInformer - testInformers []testkubeinformerv3.TestInformer + testTriggerInformer testkubeinformerv1.TestTriggerInformer + testSuiteInformer testkubeinformerv3.TestSuiteInformer + testInformer testkubeinformerv3.TestInformer triggerCRInformers map[string]*InformersData crInformers map[string]*InformersData } -func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watchK8sResourceNamespaces []string, watchTestkubeAll bool, watchTestkubeCrNamespaces []string) *k8sInformers { +func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string) *k8sInformers { var k8sInformers k8sInformers - if len(watchK8sResourceNamespaces) == 0 { - watchK8sResourceNamespaces = append(watchK8sResourceNamespaces, v1.NamespaceAll) + if len(watcherNamespaces) == 0 { + watcherNamespaces = append(watcherNamespaces, v1.NamespaceAll) } - for _, namespace := range watchK8sResourceNamespaces { + for _, namespace := range watcherNamespaces { f := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) k8sInformers.podInformers = append(k8sInformers.podInformers, f.Core().V1().Pods()) k8sInformers.deploymentInformers = append(k8sInformers.deploymentInformers, f.Apps().V1().Deployments()) @@ -77,22 +77,12 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned } - if watchTestkubeAll == true { - watchTestkubeCrNamespaces = append(watchTestkubeCrNamespaces, v1.NamespaceAll) - } else { - if len(watchTestkubeCrNamespaces) == 0 { - watchTestkubeCrNamespaces = append(watchTestkubeCrNamespaces, testkubeNamespace) - } - } - - for _, namespace := range watchTestkubeCrNamespaces { var testkubeInformerFactory externalversions.SharedInformerFactory testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(namespace)) - k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers()) - k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V3().TestSuites()) - k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests()) - } + testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) + k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() + k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V3().TestSuites() + k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() k8sInformers.triggerCRInformers = make(map[string]*InformersData) k8sInformers.crInformers = make(map[string]*InformersData) return &k8sInformers @@ -121,7 +111,7 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) { } else { if !running { s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID) - s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeAll, s.watchTestkubeCRNamespaces) + s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) stopChan = make(chan struct{}) s.runInformers(ctx, stopChan) running = true @@ -169,15 +159,9 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { s.informers.configMapInformers[i].Informer().AddEventHandler(s.configMapEventHandler(ctx)) } - for i := range s.informers.testTriggerInformers { - s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler(ctx, stop)) - } - for i := range s.informers.testSuiteInformers { - s.informers.testSuiteInformers[i].Informer().AddEventHandler(s.testSuiteEventHandler()) - } - for i := range s.informers.testInformers { - s.informers.testInformers[i].Informer().AddEventHandler(s.testEventHandler()) - } + s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler(ctx, stop)) + s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler()) + s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler()) s.logger.Debugf("trigger service: starting pod informers") for i := range s.informers.podInformers { @@ -220,17 +204,11 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { } s.logger.Debugf("trigger service: starting test trigger informer") - for i := range s.informers.testTriggerInformers { - go s.informers.testTriggerInformers[i].Informer().Run(stop) - } + go s.informers.testTriggerInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test suite informer") - for i := range s.informers.testSuiteInformers { - go s.informers.testSuiteInformers[i].Informer().Run(stop) - } + go s.informers.testSuiteInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test informer") - for i := range s.informers.testInformers { - go s.informers.testInformers[i].Informer().Run(stop) - } + go s.informers.testInformer.Informer().Run(stop) } func (s *Service) podEventHandler(ctx context.Context) cache.ResourceEventHandlerFuncs { diff --git a/pkg/triggers/watcher_test.go b/pkg/triggers/watcher_test.go index d55e5cbb980..806f924d56c 100644 --- a/pkg/triggers/watcher_test.go +++ b/pkg/triggers/watcher_test.go @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -130,73 +130,6 @@ func TestService_runWatcher_lease(t *testing.T) { assert.True(t, match, "pod created event should match the test trigger condition") }) - t.Run("create a test trigger in an alternate namespace for pod created and match event on pod creation", func(t *testing.T) { - t.Parallel() - - clientset := fake.NewSimpleClientset() - testKubeClientset := faketestkube.NewSimpleClientset() - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - testNamespace := "testkube" - testAlternateNamespace := "testkube-alternate" - - match := false - testExecutorF := func(ctx context.Context, trigger *testtriggersv1.TestTrigger) error { - assert.NotEqual(t, testNamespace, trigger.Namespace) - assert.Equal(t, "test-trigger-3", trigger.Name) - match = true - return nil - } - s := &Service{ - executor: testExecutorF, - identifier: "testkube-api", - clusterID: "testkube", - triggerStatus: make(map[statusKey]*triggerStatus), - clientset: clientset, - testKubeClientset: testKubeClientset, - logger: log.DefaultLogger, - watchTestkubeAll: true, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true, []string{}), - } - - leaseChan := make(chan bool) - go func() { time.Sleep(50 * time.Millisecond); leaseChan <- true }() - go s.runWatcher(ctx, leaseChan) - - time.Sleep(50 * time.Millisecond) - leaseChan <- true - time.Sleep(50 * time.Millisecond) - - testTrigger := testtriggersv1.TestTrigger{ - ObjectMeta: metav1.ObjectMeta{Namespace: testAlternateNamespace, Name: "test-trigger-3"}, - Spec: testtriggersv1.TestTriggerSpec{ - Resource: "pod", - ResourceSelector: testtriggersv1.TestTriggerSelector{Namespace: testNamespace, Name: "test-pod"}, - Event: "created", - Action: "run", - Execution: "test", - TestSelector: testtriggersv1.TestTriggerSelector{Namespace: testAlternateNamespace, Name: "some-test"}, - }, - } - createdTestTrigger, err := testKubeClientset.TestsV1().TestTriggers(testAlternateNamespace).Create(ctx, &testTrigger, metav1.CreateOptions{}) - assert.NotNil(t, createdTestTrigger) - assert.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - assert.Len(t, s.triggerStatus, 1) - key := newStatusKey(testAlternateNamespace, "test-trigger-3") - assert.Contains(t, s.triggerStatus, key) - - testPod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: "test-pod"}} - _, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, &testPod, metav1.CreateOptions{}) - assert.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - assert.True(t, match, "pod created event should match the test trigger condition") - }) } func TestService_runWatcher_noLease(t *testing.T) { @@ -218,7 +151,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -258,7 +191,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) @@ -300,7 +233,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false, []string{}), + informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), } leaseChan := make(chan bool) From b3d87f42b918808366571e04ba9dc28473cfb2a2 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Mon, 10 Jul 2023 18:27:13 -0400 Subject: [PATCH 14/17] Prevent UI error --- pkg/keymap/triggers/triggers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/keymap/triggers/triggers.go b/pkg/keymap/triggers/triggers.go index 40c50933e39..fd8607781be 100644 --- a/pkg/keymap/triggers/triggers.go +++ b/pkg/keymap/triggers/triggers.go @@ -36,5 +36,6 @@ func getSupportedEvents() map[string][]string { m[testtrigger.ResourceIngress] = []string{string(testtrigger.EventCreated), string(testtrigger.EventModified), string(testtrigger.EventDeleted)} m[testtrigger.ResourceEvent] = []string{string(testtrigger.EventCreated), string(testtrigger.EventModified), string(testtrigger.EventDeleted)} m[testtrigger.ResourceConfigMap] = []string{string(testtrigger.EventCreated), string(testtrigger.EventModified), string(testtrigger.EventDeleted)} + m[testtrigger.ResourceCustomResource] = []string{string(testtrigger.EventCreated), string(testtrigger.EventModified), string(testtrigger.EventDeleted)} return m } From 08480506a73ff6d46818efabe0504e754f18212b Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Thu, 29 Jun 2023 14:21:18 -0400 Subject: [PATCH 15/17] Remove watch all namespace flags --- cmd/api-server/main.go | 2 -- internal/config/config.go | 2 -- pkg/triggers/service.go | 19 ------------------- pkg/triggers/watcher.go | 40 +++++++++++++++------------------------ 4 files changed, 15 insertions(+), 48 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index f0706e471b6..cbb4a88e628 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -449,8 +449,6 @@ func main() { triggers.WithHostnameIdentifier(), triggers.WithTestkubeNamespace(cfg.TestkubeNamespace), triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces), - triggers.WatchTestkubeAll(cfg.TestkubeWatchAll), - triggers.WatchTestkubeCrNamespaces(cfg.TestkubeCRWatcherNamespaces), ) log.DefaultLogger.Info("starting trigger service") triggerService.Run(ctx) diff --git a/internal/config/config.go b/internal/config/config.go index c26c2848f8c..9c64dcb2ccc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,8 +55,6 @@ type Config struct { TestkubeCloudWorkerCount int `envconfig:"TESTKUBE_CLOUD_WORKER_COUNT" default:"50"` TestkubeCloudLogStreamWorkerCount int `envconfig:"TESTKUBE_CLOUD_LOG_STREAM_WORKER_COUNT" default:"25"` TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` - TestkubeWatchAll bool `envconfig:"TESTKUBE_WATCH_ALL" default:"false"` - TestkubeCRWatcherNamespaces string `envconfig:"TESTKUBE_CR_WATCHER_NAMESPACES" default:""` GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"` TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index d0e7489c845..60c18a51f5b 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -64,8 +64,6 @@ type Service struct { executorsClient executorsclientv1.Interface testkubeNamespace string watcherNamespaces []string - watchTestkubeAll bool - watchTestkubeCRNamespaces []string } type Option func(*Service) @@ -184,23 +182,6 @@ func WithWatcherNamespaces(namespaces string) Option { } } -func WatchTestkubeAll(watchTestkubeAll bool) Option { - return func(s *Service) { - s.watchTestkubeAll = watchTestkubeAll - } -} - -func WatchTestkubeCrNamespaces(namespaces string) Option { - return func(s *Service) { - for _, namespace := range strings.Split(namespaces, ",") { - value := strings.TrimSpace(namespace) - if value != "" { - s.watcherNamespaces = append(s.watcherNamespaces, value) - } - } - } -} - func (s *Service) Run(ctx context.Context) { leaseChan := make(chan bool) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index ccee19601be..874e039f4b6 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -9,15 +9,14 @@ import ( corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" appsinformerv1 "k8s.io/client-go/informers/apps/v1" "time" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformerv1 "k8s.io/client-go/informers/core/v1" networkinginformerv1 "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/kubernetes" @@ -26,21 +25,12 @@ import ( testkubeinformerv1 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v1" testkubeinformerv3 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v3" - networkingv1 "k8s.io/api/networking/v1" - - "github.com/google/go-cmp/cmp" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" testsv3 "github.com/kubeshop/testkube-operator/apis/tests/v3" testsuitev3 "github.com/kubeshop/testkube-operator/apis/testsuite/v3" testtriggersv1 "github.com/kubeshop/testkube-operator/apis/testtriggers/v1" - "github.com/kubeshop/testkube-operator/pkg/clientset/versioned" "github.com/kubeshop/testkube-operator/pkg/informers/externalversions" - testkubeinformerv1 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v1" - testkubeinformerv3 "github.com/kubeshop/testkube-operator/pkg/informers/externalversions/tests/v3" "github.com/kubeshop/testkube-operator/pkg/validation/tests/v1/testtrigger" ) @@ -63,8 +53,8 @@ type k8sInformers struct { testTriggerInformer testkubeinformerv1.TestTriggerInformer testSuiteInformer testkubeinformerv3.TestSuiteInformer testInformer testkubeinformerv3.TestInformer - triggerCRInformers map[string]*InformersData - crInformers map[string]*InformersData + triggerCRInformers map[string]*InformersData + crInformers map[string]*InformersData } func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string) *k8sInformers { @@ -87,12 +77,12 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned } - var testkubeInformerFactory externalversions.SharedInformerFactory - testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( - testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) - k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() - k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V3().TestSuites() - k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() + var testkubeInformerFactory externalversions.SharedInformerFactory + testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions( + testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace)) + k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() + k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V3().TestSuites() + k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() k8sInformers.triggerCRInformers = make(map[string]*InformersData) k8sInformers.crInformers = make(map[string]*InformersData) return &k8sInformers @@ -169,9 +159,9 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { s.informers.configMapInformers[i].Informer().AddEventHandler(s.configMapEventHandler(ctx)) } - s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler(ctx, stop)) - s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler()) - s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler()) + s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler(ctx, stop)) + s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler()) + s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler()) s.logger.Debugf("trigger service: starting pod informers") for i := range s.informers.podInformers { @@ -214,11 +204,11 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) { } s.logger.Debugf("trigger service: starting test trigger informer") - go s.informers.testTriggerInformer.Informer().Run(stop) + go s.informers.testTriggerInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test suite informer") - go s.informers.testSuiteInformer.Informer().Run(stop) + go s.informers.testSuiteInformer.Informer().Run(stop) s.logger.Debugf("trigger service: starting test informer") - go s.informers.testInformer.Informer().Run(stop) + go s.informers.testInformer.Informer().Run(stop) } func (s *Service) podEventHandler(ctx context.Context) cache.ResourceEventHandlerFuncs { From 7274220eb810e2ab1ed33782f3e8016f3d1a2591 Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Tue, 11 Jul 2023 09:40:29 -0400 Subject: [PATCH 16/17] debug statements --- pkg/triggers/watcher.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index 874e039f4b6..e65d91bb2c2 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -2,7 +2,7 @@ package triggers import ( "context" - "fmt" + "go.uber.org/zap" "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" @@ -654,7 +654,7 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc s.logger.Debugf("trigger service: starting custom resource informers") stopCtx, cancel := context.WithCancel(context.Background()) - go worker(cancel, stop, stopCtx.Done()) + go worker(cancel, stop, stopCtx.Done(), s.logger) ifd := &InformersData{ informer: &customResourceInformer, @@ -708,21 +708,21 @@ func serializeSelector(selector testtriggersv1.TestTriggerSelector) string { return selector.Name + selector.Namespace } -func worker(cancel context.CancelFunc, stop <-chan struct{}, done <-chan struct{}) { +func worker(cancel context.CancelFunc, stop <-chan struct{}, done <-chan struct{}, logger *zap.SugaredLogger) { for { select { case <-stop: // Context is done, so we should stop the worker - fmt.Println("Work is stopped") + logger.Debug("Work is stopped") cancel() return case <-done: // Context is done, so we should stop the worker - fmt.Println("Work is done") + logger.Debug("Work is done") return default: // Do some work - fmt.Println("Working...") + logger.Debug("Working...") time.Sleep(1 * time.Second) } } From 4a3b012287100d0273109fd957766fa3408045cb Mon Sep 17 00:00:00 2001 From: Cedric McKinnie Date: Tue, 11 Jul 2023 20:01:20 -0400 Subject: [PATCH 17/17] fix duplicate events --- pkg/triggers/service.go | 2 +- pkg/triggers/watcher.go | 187 ++++++++++++++++++++++++++++------- pkg/triggers/watcher_test.go | 10 +- 3 files changed, 155 insertions(+), 44 deletions(-) diff --git a/pkg/triggers/service.go b/pkg/triggers/service.go index 60c18a51f5b..40719a13de7 100644 --- a/pkg/triggers/service.go +++ b/pkg/triggers/service.go @@ -115,7 +115,7 @@ func NewService( opt(s) } - s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) + s.informers = newK8sInformers(clientset, dynamicClientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, logger) return s } diff --git a/pkg/triggers/watcher.go b/pkg/triggers/watcher.go index e65d91bb2c2..0a063c6a86d 100644 --- a/pkg/triggers/watcher.go +++ b/pkg/triggers/watcher.go @@ -2,7 +2,9 @@ package triggers import ( "context" + "fmt" "go.uber.org/zap" + "k8s.io/client-go/dynamic" "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" @@ -34,7 +36,7 @@ import ( "github.com/kubeshop/testkube-operator/pkg/validation/tests/v1/testtrigger" ) -type InformersData struct { +type DynamicInformersEntry struct { informer *informers.GenericInformer cancel context.CancelFunc } @@ -53,11 +55,10 @@ type k8sInformers struct { testTriggerInformer testkubeinformerv1.TestTriggerInformer testSuiteInformer testkubeinformerv3.TestSuiteInformer testInformer testkubeinformerv3.TestInformer - triggerCRInformers map[string]*InformersData - crInformers map[string]*InformersData + dynamicInformer *DynamicInformerManager } -func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string) *k8sInformers { +func newK8sInformers(clientset kubernetes.Interface, dynamicClientset dynamic.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, logger *zap.SugaredLogger) *k8sInformers { var k8sInformers k8sInformers if len(watcherNamespaces) == 0 { @@ -83,8 +84,7 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers() k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V3().TestSuites() k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests() - k8sInformers.triggerCRInformers = make(map[string]*InformersData) - k8sInformers.crInformers = make(map[string]*InformersData) + k8sInformers.dynamicInformer = NewDynamicInformerManager(dynamicClientset, logger) return &k8sInformers } @@ -111,8 +111,10 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) { } else { if !running { s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID) - s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces) + s.informers = newK8sInformers(s.clientset, s.dynamicClientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.logger) stopChan = make(chan struct{}) + //TODO: clear the initialization + s.informers.dynamicInformer.setStopCh(stopChan) s.runInformers(ctx, stopChan) running = true } @@ -627,6 +629,77 @@ func (s *Service) clusterEventEventHandler(ctx context.Context) cache.ResourceEv } } +type DynamicInformerManager struct { + dynamicClient dynamic.Interface + logger *zap.SugaredLogger + stop <-chan struct{} + triggerCRInformers map[string]*DynamicInformersEntry + crInformers map[string]*DynamicInformersEntry +} + +func NewDynamicInformerManager(client dynamic.Interface, logger *zap.SugaredLogger) *DynamicInformerManager { + return &DynamicInformerManager{ + dynamicClient: client, + logger: logger, + triggerCRInformers: make(map[string]*DynamicInformersEntry), + crInformers: make(map[string]*DynamicInformersEntry), + } +} + +func (m *DynamicInformerManager) NewInformer(t *testtriggersv1.TestTrigger, addHandler cache.ResourceEventHandlerFuncs, modifyHandler cache.ResourceEventHandlerFuncs, deleteHandler cache.ResourceEventHandlerFuncs) { + if _, found := m.triggerCRInformers[t.Name+t.Namespace]; found { + m.logger.Debugf("trigger service: informer already exist for %s resource in namespace %s", t.Name, t.Namespace) + return + } + + gvr := schema.GroupVersionResource{ + Group: t.Spec.CustomResource.Group, + Version: t.Spec.CustomResource.Version, + Resource: t.Spec.CustomResource.Resource, + } + + df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(m.dynamicClient, 0, t.Spec.ResourceSelector.Namespace, nil) + customResourceInformer := df.ForResource(gvr) + if string(t.Spec.Event) == string(testtrigger.EventCreated) { + customResourceInformer.Informer().AddEventHandler(addHandler) + } + if string(t.Spec.Event) == string(testtrigger.EventModified) { + customResourceInformer.Informer().AddEventHandler(modifyHandler) + } + if string(t.Spec.Event) == string(testtrigger.EventDeleted) { + customResourceInformer.Informer().AddEventHandler(deleteHandler) + } + + stopCtx, cancel := context.WithCancel(context.Background()) + go m.WaitForTermination(cancel, stopCtx.Done()) + + ifd := &DynamicInformersEntry{ + informer: &customResourceInformer, + cancel: cancel, + } + m.triggerCRInformers[t.Name+t.Namespace] = ifd + m.crInformers[gvr.String()+serializeSelector(t.Spec.ResourceSelector)] = ifd + go customResourceInformer.Informer().Run(stopCtx.Done()) + m.logger.Debugf("trigger service: started a new custom resource informers for %s resource and %s test trigger", gvr.String(), t.Name) +} + +func (m *DynamicInformerManager) setStopCh(stopChan chan struct{}) { + m.stop = stopChan +} + +func (m *DynamicInformerManager) TearDownInformer(t *testtriggersv1.TestTrigger) { + if ifd, found := m.triggerCRInformers[t.Name+t.Namespace]; found { + if ifd.informer != nil && ifd.cancel != nil { + ifd.cancel() + ifd.informer = nil + delete(m.triggerCRInformers, t.Name+t.Namespace) + m.logger.Debugf("trigger service: tearing down for %v resource and %s test trigger", t.Spec.CustomResource, t.Name) + } + m.logger.Debugf("trigger service: ignoring tear down for %v resource and %s test trigger", t.Spec.CustomResource, t.Name) + } + m.logger.Debugf("trigger service: could not found infomer for %v resource and %s test trigger to teardown", t.Spec.CustomResource, t.Name) +} + func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struct{}) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -642,27 +715,16 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc s.addTrigger(t) if t.Spec.Resource == testtrigger.ResourceCustomResource { - df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.dynamicClientset, 0, t.Spec.ResourceSelector.Namespace, nil) - gvr := schema.GroupVersionResource{ Group: t.Spec.CustomResource.Group, Version: t.Spec.CustomResource.Version, Resource: t.Spec.CustomResource.Resource, } - customResourceInformer := df.ForResource(gvr) - customResourceInformer.Informer().AddEventHandler(s.customResourceEventHandler(ctx, gvr)) - s.logger.Debugf("trigger service: starting custom resource informers") - - stopCtx, cancel := context.WithCancel(context.Background()) - go worker(cancel, stop, stopCtx.Done(), s.logger) - - ifd := &InformersData{ - informer: &customResourceInformer, - cancel: cancel, - } - s.informers.triggerCRInformers[t.Name+t.Namespace] = ifd - s.informers.crInformers[gvr.String()+serializeSelector(t.Spec.ResourceSelector)] = ifd - go customResourceInformer.Informer().Run(stopCtx.Done()) + s.informers.dynamicInformer.NewInformer(t, + s.customResourceAddEventHandler(ctx, gvr), + s.customResourceModifyEventHandler(ctx, gvr), + s.customResourceDeleteEventHandler(ctx, gvr), + ) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -691,14 +753,7 @@ func (s *Service) testTriggerEventHandler(ctx context.Context, stop <-chan struc t.Namespace, t.Name, t.Spec.Resource, t.Spec.Event, ) s.removeTrigger(t) - if ifd, found := s.informers.triggerCRInformers[t.Name+t.Namespace]; found { - if ifd.informer != nil && ifd.cancel != nil { - ifd.cancel() - ifd.informer = nil - delete(s.informers.triggerCRInformers, t.Name+t.Namespace) - } - - } + s.informers.dynamicInformer.TearDownInformer(t) }, } } @@ -708,27 +763,27 @@ func serializeSelector(selector testtriggersv1.TestTriggerSelector) string { return selector.Name + selector.Namespace } -func worker(cancel context.CancelFunc, stop <-chan struct{}, done <-chan struct{}, logger *zap.SugaredLogger) { +func (m *DynamicInformerManager) WaitForTermination(cancel context.CancelFunc, done <-chan struct{}) { for { select { - case <-stop: + case <-m.stop: // Context is done, so we should stop the worker - logger.Debug("Work is stopped") + m.logger.Debug("Work is stopped") cancel() return case <-done: // Context is done, so we should stop the worker - logger.Debug("Work is done") + m.logger.Debug("Work is done") return default: // Do some work - logger.Debug("Working...") + //m.logger.Debug("Working...") time.Sleep(1 * time.Second) } } } -func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { +func (s *Service) customResourceAddEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) { return func() ([]testtriggersv1.TestTriggerCondition, error) { return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr) @@ -741,12 +796,32 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro s.logger.Errorf("failed to process create customResource event due to it being an unexpected type, received type %+v", obj) return } + obsGeneration, _, err := unstructured.NestedInt64(customResource.Object, "status", "observedGeneration") + if err != nil { + s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", customResource.GetKind(), customResource.GetName()) + return + } + generation := customResource.GetGeneration() + if generation == obsGeneration { + s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", customResource.GetKind(), customResource.GetName()) + return + } s.logger.Debugf("trigger customResource: watcher component: emiting event: customResource %s/%s created", customResource.GetNamespace(), customResource.GetName()) event := newWatcherEvent(testtrigger.EventCreated, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) if err := s.match(ctx, event); err != nil { s.logger.Errorf("event matcher returned an error while matching create customResource event: %v", err) } }, + } +} + +func (s *Service) customResourceModifyEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { + getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) { + return func() ([]testtriggersv1.TestTriggerCondition, error) { + return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr) + } + } + return cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { oldCustomResource, ok := oldObj.(*unstructured.Unstructured) if !ok { @@ -765,7 +840,23 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro return } - if oldCustomResource.GetGeneration() == newCustomResource.GetGeneration() { + newObsGeneration, _, err := unstructured.NestedInt64(newCustomResource.Object, "status", "observedGeneration") + if err != nil { + s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", oldCustomResource.GetKind(), oldCustomResource.GetName()) + return + } + + oldObsGeneration, _, err := unstructured.NestedInt64(oldCustomResource.Object, "status", "observedGeneration") + if err != nil { + s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", oldCustomResource.GetKind(), oldCustomResource.GetName()) + return + } + + oldGeneration := oldCustomResource.GetGeneration() + newGeneration := newCustomResource.GetGeneration() + fmt.Printf("old gen: %v, old obs gen: %v, new gen: %v, new obs gen: %v\n", oldGeneration, oldObsGeneration, newGeneration, newObsGeneration) + + if newGeneration == newObsGeneration { s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", oldCustomResource.GetKind(), oldCustomResource.GetName()) return } @@ -778,12 +869,32 @@ func (s *Service) customResourceEventHandler(ctx context.Context, gvr schema.Gro s.logger.Errorf("event matcher returned an error while matching update service event: %v", err) } }, + } +} + +func (s *Service) customResourceDeleteEventHandler(ctx context.Context, gvr schema.GroupVersionResource) cache.ResourceEventHandlerFuncs { + getConditions := func(object metav1.Object) func() ([]testtriggersv1.TestTriggerCondition, error) { + return func() ([]testtriggersv1.TestTriggerCondition, error) { + return getCustomResourceConditions(ctx, s.dynamicClientset, object, gvr) + } + } + return cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { customResource, ok := obj.(*unstructured.Unstructured) if !ok { s.logger.Errorf("failed to process delete customResource event due to it being an unexpected type, received type %+v", obj) return } + obsGeneration, _, err := unstructured.NestedInt64(customResource.Object, "status", "observedGeneration") + if err != nil { + s.logger.Debugf("trigger service: watcher component: error when unstructuring custom resource: %s/%s ", customResource.GetKind(), customResource.GetName()) + return + } + generation := customResource.GetGeneration() + if generation == obsGeneration { + s.logger.Debugf("trigger service: watcher component: no-op update trigger: %s/%s new and old specs are equal", customResource.GetKind(), customResource.GetName()) + return + } s.logger.Debugf("trigger service: watcher component: emiting event: customResource %s/%s deleted", customResource.GetNamespace(), customResource.GetName()) event := newWatcherEvent(testtrigger.EventDeleted, customResource, testtrigger.ResourceCustomResource, withConditionsGetter(getConditions(customResource))) if err := s.match(ctx, event); err != nil { diff --git a/pkg/triggers/watcher_test.go b/pkg/triggers/watcher_test.go index 35bdb674d7a..51f0dfd482e 100644 --- a/pkg/triggers/watcher_test.go +++ b/pkg/triggers/watcher_test.go @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil), } leaseChan := make(chan bool) @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil), } leaseChan := make(chan bool) @@ -150,7 +150,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil), } leaseChan := make(chan bool) @@ -190,7 +190,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil), } leaseChan := make(chan bool) @@ -232,7 +232,7 @@ func TestService_runWatcher_noLease(t *testing.T) { clientset: clientset, testKubeClientset: testKubeClientset, logger: log.DefaultLogger, - informers: newK8sInformers(clientset, testKubeClientset, "", []string{}), + informers: newK8sInformers(clientset, nil, testKubeClientset, "", []string{}, nil), } leaseChan := make(chan bool)