diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/chart_cache.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/chart_cache.go index 881afc23bd7..226e264c22d 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/chart_cache.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/chart_cache.go @@ -63,7 +63,7 @@ type ChartCache struct { // significant in that it flushes the whole redis cache and re-populates the state from k8s. // When that happens we don't really want any concurrent access to the cache until the resync() // operation is complete. In other words, we want to: - // - be able to have multiple concurrent readers (goroutines doing GetForOne()/GetForMultiple()) + // - be able to have multiple concurrent readers (goroutines doing GetForOne()) // - only a single writer (goroutine doing a resync()) is allowed, and while its doing its job // no readers are allowed resyncCond *sync.Cond diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/rate_limiting_queue.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/rate_limiting_queue.go index 8227ce413d8..c2d32aee7b3 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/rate_limiting_queue.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/rate_limiting_queue.go @@ -17,6 +17,10 @@ import ( log "k8s.io/klog/v2" ) +// Inspired by https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go and +// by https://github.com/kubernetes/client-go/blob/v0.22.4/util/workqueue/rate_limiting_queue.go +// but adds a few funcs, like Name(), ExpectAdd(), WaitUntilForgotten() and Reset() + // RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { workqueue.RateLimitingInterface @@ -83,11 +87,9 @@ func (q *rateLimitingType) Reset() { q.queue.reset() - // this way we "forget" about ratelimit failures + // this way we "forget" about ratelimit failures, i.e. the items queued up + // via previous call(s) to .AddRateLimited() (i.e. via q.DelayingInterface.AddAfter) q.rateLimiter = workqueue.DefaultControllerRateLimiter() - - // TODO (gfichtenholt) Also need to "forget" the items queued up via previous call(s) - // to .AddRateLimited() (i.e. via q.DelayingInterface.AddAfter) ? } // only used in unit tests @@ -95,7 +97,7 @@ func (q *rateLimitingType) ExpectAdd(item string) { q.queue.expectAdd(item) } -// used in unit test and production code, when a repo/chart needs to be loaded on demand +// used in unit test AND production code, when a repo/chart needs to be loaded on demand func (q *rateLimitingType) WaitUntilForgotten(item string) { q.queue.waitUntilDone(item) // q.queue might be done with the item, but it may have been diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/watcher_cache.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/watcher_cache.go index 778da91a2b3..3fd58deb8f4 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/watcher_cache.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache/watcher_cache.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" errorutil "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" watchutil "k8s.io/client-go/tools/watch" @@ -39,6 +40,9 @@ const ( // max number of attempts to resync before giving up maxWatcherCacheResyncBackoff = 2 KeySegmentsSeparator = ":" + // max number of concurrent workers computing or retrieving cache values at + // the same time + maxWorkers = 10 ) var ( @@ -105,14 +109,16 @@ type NamespacedResourceWatcherCacheConfig struct { // corresponding entry. Note this maybe happen as a result of a newly created k8s object // or a modified object for which there was no entry in the cache // This allows the call site to return information about WHETHER OR NOT and WHAT is to be stored - // in the cache for a given k8s object (passed in as a TODO). + // in the cache for a given k8s object (passed in as a ctrlclient.Object). + // ref https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/client#Object // The call site may return []byte, but it doesn't have to be that. // The list of all types actually supported by redis you can find in // https://github.com/go-redis/redis/blob/v8.10.0/internal/proto/writer.go#L61 OnAddFunc ValueAdderFunc // 'OnModifyFunc' hook is called when an object for which there is a corresponding cache entry // is modified. This allows the call site to return information about WHETHER OR NOT and WHAT - // is to be stored in the cache for a given k8s object (passed in as a TODO). + // in the cache for a given k8s object (passed in as a ctrlclient.Object). + // ref https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/client#Object // The call site may return []byte, but it doesn't have to be that. // The list of all types actually supported by redis you can find in // https://github.com/go-redis/redis/blob/v8.10.0/internal/proto/writer.go#L61 @@ -127,7 +133,7 @@ type NamespacedResourceWatcherCacheConfig struct { OnResyncFunc ResyncFunc // These funcs are needed to manipulate API-specific objects, such as flux's - // sourcev1.HelmRepository in a generic fashion + // sourcev1.HelmRepository, in a generic fashion NewObjFunc NewObjectFunc NewListFunc NewObjectListFunc ListItemsFunc GetListItemsFunc @@ -140,14 +146,10 @@ func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWat return nil, fmt.Errorf("server not configured with redis Client") } else if config.ClientGetter == nil { return nil, fmt.Errorf("server not configured with clientGetter") - } else if config.OnAddFunc == nil || - config.OnModifyFunc == nil || - config.OnDeleteFunc == nil || - config.OnGetFunc == nil || - config.OnResyncFunc == nil || - config.NewObjFunc == nil || - config.NewListFunc == nil || - config.ListItemsFunc == nil { + } else if config.OnAddFunc == nil || config.OnModifyFunc == nil || + config.OnDeleteFunc == nil || config.OnGetFunc == nil || + config.OnResyncFunc == nil || config.NewObjFunc == nil || + config.NewListFunc == nil || config.ListItemsFunc == nil { return nil, fmt.Errorf("server not configured with expected cache hooks") } @@ -163,6 +165,11 @@ func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWat return nil, err } + // this will launch a single worker that processes items on the work queue as they come in + // runWorker will loop until "something bad" happens. The .Until() func will + // then rekick the worker after one second + go wait.Until(c.runWorker, time.Second, stopCh) + // let's do the initial sync and creating a new RetryWatcher here so // bootstrap errors, if any, are flagged early synchronously and the // caller does not end up with a partially initialized cache @@ -175,13 +182,6 @@ func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWat return nil, err } - // this will launch a single worker that processes items on the work queue as they come in - // runWorker will loop until "something bad" happens. The .Until will - // then rekick the worker after one second - // We should be able to launch multiple workers, and the workqueue will make sure that - // only a single worker works on an item with a given key. - go wait.Until(c.runWorker, time.Second, stopCh) - go c.watchLoop(watcher, stopCh) return &c, nil } @@ -235,10 +235,6 @@ func (c *NamespacedResourceWatcherCache) processNextWorkItem() bool { return false } - // ref https://go101.org/article/concurrent-synchronization-more.html - c.resyncCond.L.(*sync.RWMutex).RLock() - defer c.resyncCond.L.(*sync.RWMutex).RUnlock() - // We must remember to call Done so the queue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do @@ -536,11 +532,11 @@ func (c *NamespacedResourceWatcherCache) syncHandler(key string) error { return status.Errorf(codes.Internal, "error fetching object with key [%s]: %v", key, err) } } - return c.onAddOrModify(true, obj) + return c.onAddOrModify(obj) } -// this is effectively a cache SET operation -func (c *NamespacedResourceWatcherCache) onAddOrModify(checkOldValue bool, obj ctrlclient.Object) (err error) { +// this is effectively a cache GET followed by SET operation +func (c *NamespacedResourceWatcherCache) onAddOrModify(obj ctrlclient.Object) (err error) { log.V(4).Infof("+onAddOrModify") defer log.V(4).Infof("-onAddOrModify") @@ -550,12 +546,10 @@ func (c *NamespacedResourceWatcherCache) onAddOrModify(checkOldValue bool, obj c } var oldValue []byte - if checkOldValue { - if oldValue, err = c.redisCli.Get(c.redisCli.Context(), key).Bytes(); err != redis.Nil && err != nil { - return fmt.Errorf("onAddOrModify() failed to get value for key [%s] in cache due to: %v", key, err) - } else { - log.V(4).Infof("Redis [GET %s]: %d bytes read", key, len(oldValue)) - } + if oldValue, err = c.redisCli.Get(c.redisCli.Context(), key).Bytes(); err != redis.Nil && err != nil { + return fmt.Errorf("onAddOrModify() failed to get value for key [%s] in cache due to: %v", key, err) + } else { + log.V(4).Infof("Redis [GET %s]: %d bytes read", key, len(oldValue)) } var setVal bool @@ -664,12 +658,9 @@ func (c *NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, e // parallelize the process of value retrieval because fetchForOne() calls // c.config.onGet() which will de-code the data from bytes into expected struct, which // may be computationally expensive and thus benefit from multiple threads of execution -func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys []string) (map[string]interface{}, error) { +func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys sets.String) (map[string]interface{}, error) { response := make(map[string]interface{}) - // max number of concurrent workers retrieving cache values at the same time - const maxWorkers = 10 - type fetchValueJob struct { key string } @@ -704,7 +695,7 @@ func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys []string) (map[st }() go func() { - for _, key := range keys { + for key := range keys { requestChan <- fetchValueJob{key} } close(requestChan) @@ -732,7 +723,7 @@ func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys []string) (map[st // it's value will be returned, // whereas 'fetchForMultiple' does not guarantee that. // The keys are expected to be in the format of the cache (the caller does that) -func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[string]interface{}, error) { +func (c *NamespacedResourceWatcherCache) GetForMultiple(keys sets.String) (map[string]interface{}, error) { c.resyncCond.L.(*sync.RWMutex).RLock() defer c.resyncCond.L.(*sync.RWMutex).RUnlock() @@ -748,28 +739,94 @@ func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[stri } // now, re-compute and fetch the ones that are left over from the previous operation - keysLeft := []string{} + keysLeft := sets.String{} for key, value := range chartsUntyped { if value == nil { // this cache miss may have happened due to one of these reasons: - // 1) key truly does not exist in k8s (e.g. there is no repo with the given name in the "Ready" state) - // 2) key exists and the "Ready" repo currently being indexed but has not yet completed - // 3) key exists in k8s but the corresponding cache entry has been evicted by redis due to - // LRU maxmemory policies or entry TTL expiry (doesn't apply currently, cuz we use TTL=0 - // for all entries) - // In the 3rd case we want to re-compute the key and add it to the cache, which may potentially - // cause other entries to be evicted in order to make room for the ones being added - keysLeft = append(keysLeft, key) + // 1) key truly does not exist in k8s (e.g. there is no repo with + // the given name in the "Ready" state) + // 2) key exists and the "Ready" repo currently being indexed but + // has not yet completed + // 3) key exists in k8s but the corresponding cache entry has been + // evicted by redis due to LRU maxmemory policies or entry TTL + // expiry (doesn't apply currently, cuz we use TTL=0 for all entries) + // In the 3rd case we want to re-compute the key and add it to the cache, + // which may potentially cause other entries to be evicted in order to + // make room for the ones being added + keysLeft.Insert(key) } } - // this functionality is similar to that of populateWith() func, - // but different enough so I did not see the value of re-using the code + if chartsUntypedLeft, err := c.computeAndFetchValuesForKeys(keysLeft); err != nil { + return nil, err + } else { + + for k, v := range chartsUntypedLeft { + chartsUntyped[k] = v + } + } + return chartsUntyped, nil +} + +// This func is only called in the context of a resync() operation, +// after emptying the cache via FLUSHDB, i.e. on startup or after +// some major (network) failure. +// Computing a value for a key maybe expensive, e.g. indexing a repo takes a while, +// so we will do this in a concurrent fashion to minimize the time window and performance +// impact of doing so +func (c *NamespacedResourceWatcherCache) populateWith(items []ctrlclient.Object) error { + // confidence test: I'd like to make sure this is called within the context + // of resync, i.e. resync.Cond.L is locked by this goroutine. + if !common.RWMutexWriteLocked(c.resyncCond.L.(*sync.RWMutex)) { + return status.Errorf(codes.Internal, "Invalid state of the cache in populateWith()") + } + + keys := sets.String{} + for _, item := range items { + if key, err := c.keyFor(item); err != nil { + return status.Errorf(codes.Internal, "%v", err) + } else { + keys.Insert(key) + } + } + + // wait until all all items have been processed + c.computeValuesForKeys(keys) + return nil +} + +func (c *NamespacedResourceWatcherCache) computeValuesForKeys(keys sets.String) { + var wg sync.WaitGroup + numWorkers := int(math.Min(float64(len(keys)), float64(maxWorkers))) + requestChan := make(chan string, numWorkers) + + // Process only at most maxWorkers at a time + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + // The following loop will only terminate when the request channel is + // closed (and there are no more items) + for key := range requestChan { + // see GetForOne() for explanation of what is happening below + c.forceKey(key) + } + wg.Done() + }() + } + + go func() { + for key := range keys { + requestChan <- key + } + close(requestChan) + }() - // max number of concurrent workers retrieving cache values at the same time - const maxWorkers = 10 + // wait until all all items have been processed + wg.Wait() +} +func (c *NamespacedResourceWatcherCache) computeAndFetchValuesForKeys(keys sets.String) (map[string]interface{}, error) { type computeValueJob struct { key string } @@ -780,7 +837,7 @@ func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[stri } var wg sync.WaitGroup - numWorkers := int(math.Min(float64(len(keysLeft)), float64(maxWorkers))) + numWorkers := int(math.Min(float64(len(keys)), float64(maxWorkers))) requestChan := make(chan computeValueJob, numWorkers) responseChan := make(chan computeValueJobResult, numWorkers) @@ -792,9 +849,7 @@ func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[stri // closed (and there are no more items) for job := range requestChan { // see GetForOne() for explanation of what is happening below - c.queue.Add(job.key) - c.queue.WaitUntilForgotten(job.key) - value, err := c.fetchForOne(job.key) + value, err := c.forceAndFetchKey(job.key) responseChan <- computeValueJobResult{job, value, err} } wg.Done() @@ -807,7 +862,7 @@ func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[stri }() go func() { - for _, key := range keysLeft { + for key := range keys { requestChan <- computeValueJob{key} } close(requestChan) @@ -816,6 +871,7 @@ func (c *NamespacedResourceWatcherCache) GetForMultiple(keys []string) (map[stri // Start receiving results // The following loop will only terminate when the response channel is closed, i.e. // after the all the requests have been processed + chartsUntyped := make(map[string]interface{}) errs := []error{} for resp := range responseChan { if resp.err == nil { @@ -861,75 +917,6 @@ func (c *NamespacedResourceWatcherCache) fromKey(key string) (*types.NamespacedN return &types.NamespacedName{Namespace: parts[1], Name: parts[2]}, nil } -// This func is only called in the context of a resync() operation, -// after emptying the cache via FLUSHDB, i.e. on startup or after -// some major (network) failure. It writes directly into redis cache, bypassing the work queue. -// Computing a value for a key maybe expensive, e.g. indexing a repo takes a while, -// so we will do this in a concurrent fashion to minimize the time window and performance -// impact of doing so -func (c *NamespacedResourceWatcherCache) populateWith(items []ctrlclient.Object) error { - // confidence test: I'd like to make sure this is called within the context - // of resync, i.e. resync.Cond.L is locked by this goroutine. - if !common.RWMutexWriteLocked(c.resyncCond.L.(*sync.RWMutex)) { - return status.Errorf(codes.Internal, "Invalid state of the cache in populateWith()") - } - - // max number of concurrent workers computing cache values at the same time - const maxWorkers = 10 - - type populateJob struct { - item ctrlclient.Object - } - - type populateJobResult struct { - populateJob - err error - } - - var wg sync.WaitGroup - numWorkers := int(math.Min(float64(len(items)), float64(maxWorkers))) - requestChan := make(chan populateJob, numWorkers) - responseChan := make(chan populateJobResult, numWorkers) - - // Process only at most maxWorkers at a time - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - // The following loop will only terminate when the request channel is - // closed (and there are no more items) - for job := range requestChan { - // don't need to check old value since we just flushed the whole cache - err := c.onAddOrModify(false, job.item) - responseChan <- populateJobResult{job, err} - } - wg.Done() - }() - } - - go func() { - wg.Wait() - close(responseChan) - }() - - go func() { - for _, item := range items { - requestChan <- populateJob{item} - } - close(requestChan) - }() - - // Start receiving results - // The following loop will only terminate when the response channel is closed, i.e. - // after the all the requests have been processed - errs := []error{} - for resp := range responseChan { - if resp.err != nil { - errs = append(errs, resp.err) - } - } - return errorutil.NewAggregate(errs) -} - // GetForOne() is like fetchForOne() but if there is a cache miss, it will also check the // k8s for the corresponding object, process it and then add it to the cache and return the // result. @@ -944,24 +931,32 @@ func (c *NamespacedResourceWatcherCache) GetForOne(key string) (interface{}, err return nil, err } else if value == nil { // cache miss - c.queue.Add(key) - // now need to wait until this item has been processed by runWorker(). - // a little bit in-efficient: syncHandler() will eventually call config.onAdd() - // which encode the data as []byte before storing it in the cache. That part is fine. - // But to get back the original data we have to decode it via config.onGet(). - // It'd nice if there was a shortcut and skip the cycles spent decoding data from - // []byte to repoCacheEntry - c.queue.WaitUntilForgotten(key) - // yes, there is a small time window here between after we are done with WaitUntilDoneWith - // and the following fetch, where another concurrent goroutine may force the newly added - // cache entry out, but that is an edge case and I am willing to overlook it for now - // To fix it, would somehow require WaitUntilDoneWith returning a value from a cache, so - // the whole thing would be atomic. Don't know how to do this yet - return c.fetchForOne(key) + return c.forceAndFetchKey(key) } return value, nil } +func (c *NamespacedResourceWatcherCache) forceKey(key string) { + c.queue.Add(key) + // now need to wait until this item has been processed by runWorker(). + // a little bit in-efficient: syncHandler() will eventually call config.onAdd() + // which encode the data as []byte before storing it in the cache. That part is fine. + // But to get back the original data we have to decode it via config.onGet(). + // It'd nice if there was a shortcut and skip the cycles spent decoding data from + // []byte to repoCacheEntry + c.queue.WaitUntilForgotten(key) +} + +func (c *NamespacedResourceWatcherCache) forceAndFetchKey(key string) (interface{}, error) { + c.forceKey(key) + // yes, there is a small time window here between after we are done with WaitUntilForgotten() + // and the following fetch, where another concurrent goroutine may force the newly added + // cache entry out, but that is an edge case and I am willing to overlook it for now + // To fix it, would somehow require WaitUntilForgotten() returning a value from a cache, so + // the whole thing would be atomic. Don't know how to do this yet + return c.fetchForOne(key) +} + // this func is used by unit tests only func (c *NamespacedResourceWatcherCache) ExpectAdd(key string) { c.queue.ExpectAdd(key) diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go index 44e3bc64646..a39f033eb51 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go @@ -10,7 +10,6 @@ import ( "net/http" "net/http/httptest" "os" - "reflect" "strings" "testing" @@ -29,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ) type testSpecChartWithFile struct { @@ -752,9 +750,9 @@ func TestChartCacheResyncNotIdle(t *testing.T) { repoKey, repoBytes, err := s.redisKeyValueForRepo(*r) if err != nil { t.Fatalf("%+v", err) + } else { + redisMockSetValueForRepo(mock, repoKey, repoBytes, nil) } - mock.ExpectGet(repoKey).RedisNil() - redisMockSetValueForRepo(mock, repoKey, repoBytes) opts := &common.ClientOptions{} chartCacheKeys := []string{} @@ -779,18 +777,11 @@ func TestChartCacheResyncNotIdle(t *testing.T) { s.repoCache.ExpectAdd(repoKey) - ctx := context.Background() - var watcher *watch.RaceFreeFakeWatcher - if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + ctrlClient, watcher, err := ctrlClientAndWatcher(t, s) + if err != nil { t.Fatal(err) - } else if err = ctrlClient.Create(ctx, r); err != nil { - // unlike dynamic.Interface.Create, client.Create will create an object in k8s - // and an Add event will be fired + } else if err = ctrlClient.Create(context.Background(), r); err != nil { t.Fatal(err) - } else if ww, ok := ctrlClient.(*withWatchWrapper); !ok { - t.Fatalf("Unexpected condition: %s", reflect.TypeOf(ww)) - } else if watcher = ww.watcher; watcher == nil { - t.Fatalf("Unexpected condition watcher is nil") } done := make(chan int, 1) @@ -823,7 +814,7 @@ func TestChartCacheResyncNotIdle(t *testing.T) { t.Errorf("ERROR: Expected empty repo work queue!") } else { mock.ExpectFlushDB().SetVal("OK") - redisMockSetValueForRepo(mock, repoKey, repoBytes) + redisMockSetValueForRepo(mock, repoKey, repoBytes, nil) // now we can signal to the server it's ok to proceed repoResyncCh <- 0 @@ -873,8 +864,8 @@ func newChart(name, namespace string, spec *sourcev1.HelmChartSpec, status *sour APIVersion: sourcev1.GroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: name, - Generation: int64(1), + Name: name, + Generation: int64(1), }, } if namespace != "" { diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/release_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/release_test.go index 2281b0ae583..d203f5699ce 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/release_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/release_test.go @@ -34,12 +34,9 @@ import ( v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) type testSpecGetInstalledPackages struct { @@ -1185,46 +1182,14 @@ func newRelease(name string, namespace string, spec *helmv2.HelmReleaseSpec, sta func newServerWithChartsAndReleases(t *testing.T, actionConfig *action.Configuration, charts []sourcev1.HelmChart, releases []helmv2.HelmRelease) (*Server, redismock.ClientMock, error) { apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD) - - // register the GitOps Toolkit schema definitions - scheme := runtime.NewScheme() - _ = sourcev1.AddToScheme(scheme) - _ = helmv2.AddToScheme(scheme) - - rm := apimeta.NewDefaultRESTMapper([]schema.GroupVersion{sourcev1.GroupVersion, helmv2.GroupVersion}) - rm.Add(schema.GroupVersionKind{ - Group: sourcev1.GroupVersion.Group, - Version: sourcev1.GroupVersion.Version, - Kind: sourcev1.HelmRepositoryKind}, - apimeta.RESTScopeNamespace) - rm.Add(schema.GroupVersionKind{ - Group: sourcev1.GroupVersion.Group, - Version: sourcev1.GroupVersion.Version, - Kind: sourcev1.HelmChartKind}, - apimeta.RESTScopeNamespace) - rm.Add(schema.GroupVersionKind{ - Group: helmv2.GroupVersion.Group, - Version: helmv2.GroupVersion.Version, - Kind: helmv2.HelmReleaseKind}, - apimeta.RESTScopeNamespace) - - ctrlClientBuilder := ctrlfake.NewClientBuilder().WithScheme(scheme).WithRESTMapper(rm) - if len(charts) > 0 { - ctrlClientBuilder = ctrlClientBuilder.WithLists(&sourcev1.HelmChartList{Items: charts}) - } - if len(releases) > 0 { - ctrlClientBuilder = ctrlClientBuilder.WithLists(&helmv2.HelmReleaseList{Items: releases}) - } - ctrlClient := &withWatchWrapper{delegate: ctrlClientBuilder.Build()} - + ctrlClient := newCtrlClient(nil, charts, releases) clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) { return clientgetter. NewBuilder(). WithApiExt(apiextIfc). - WithControllerRuntime(ctrlClient). + WithControllerRuntime(&ctrlClient). Build(), nil } - return newServer(t, clientGetter, actionConfig, nil, nil) } diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go index 6c25b813be2..84092415550 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" log "k8s.io/klog/v2" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -67,12 +68,12 @@ func (s *Server) getRepoInCluster(ctx context.Context, key types.NamespacedName) } // regexp expressions are used for matching actual names against expected patters -func (s *Server) filterReadyReposByName(repoList []sourcev1.HelmRepository, match []string) ([]string, error) { +func (s *Server) filterReadyReposByName(repoList []sourcev1.HelmRepository, match []string) (sets.String, error) { if s.repoCache == nil { return nil, status.Errorf(codes.FailedPrecondition, "server cache has not been properly initialized") } - resultKeys := make([]string, 0) + resultKeys := sets.String{} for _, repo := range repoList { // first check if repo is in ready state if !isRepoReady(repo) { @@ -96,7 +97,7 @@ func (s *Server) filterReadyReposByName(repoList []sourcev1.HelmRepository, matc matched = true } if matched { - resultKeys = append(resultKeys, s.repoCache.KeyForNamespacedName(*name)) + resultKeys.Insert(s.repoCache.KeyForNamespacedName(*name)) } } return resultKeys, nil diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go index be432df67e1..0997f3249e0 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go @@ -9,12 +9,10 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "reflect" "strings" "testing" "time" - helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" "github.com/fluxcd/pkg/apis/meta" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" @@ -29,15 +27,12 @@ import ( "google.golang.org/grpc/status" apiextfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" typfake "k8s.io/client-go/kubernetes/fake" - ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) type testSpecGetAvailablePackageSummaries struct { @@ -637,8 +632,9 @@ func TestGetAvailablePackageSummaryAfterRepoIndexUpdate(t *testing.T) { t.Fatalf("%v", err) } + ctx := context.Background() responseBeforeUpdate, err := s.GetAvailablePackageSummaries( - context.Background(), + ctx, &corev1.GetAvailablePackageSummariesRequest{Context: &corev1.Context{}}) if err != nil { t.Fatalf("%v", err) @@ -667,8 +663,8 @@ func TestGetAvailablePackageSummaryAfterRepoIndexUpdate(t *testing.T) { t.Fatalf("%v", err) } - ctx := context.Background() - if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + ctrlClient, _, err := ctrlClientAndWatcher(t, s) + if err != nil { t.Fatal(err) } else if err = ctrlClient.Get(ctx, repoName, &repo); err != nil { t.Fatal(err) @@ -679,9 +675,9 @@ func TestGetAvailablePackageSummaryAfterRepoIndexUpdate(t *testing.T) { repo.Status.Artifact.Checksum = "4e881a3c34a5430c1059d2c4f753cb9aed006803" repo.Status.Artifact.Revision = "4e881a3c34a5430c1059d2c4f753cb9aed006803" - // there will be a GET to retrieve the old value from the cache followed by a SET to new value - mock.ExpectGet(key).SetVal(string(oldValue)) - key, newValue, err := s.redisMockSetValueForRepo(mock, repo) + // there will be a GET to retrieve the old value from the cache followed by a SET + // to new value + _, newValue, err := s.redisMockSetValueForRepo(mock, repo, oldValue) if err != nil { t.Fatalf("%+v", err) } @@ -701,7 +697,7 @@ func TestGetAvailablePackageSummaryAfterRepoIndexUpdate(t *testing.T) { mock.ExpectGet(key).SetVal(string(newValue)) responsePackagesAfterUpdate, err := s.GetAvailablePackageSummaries( - context.Background(), + ctx, &corev1.GetAvailablePackageSummariesRequest{Context: &corev1.Context{}}) if err != nil { t.Fatalf("%v", err) @@ -893,17 +889,13 @@ func TestGetAvailablePackageSummaryAfterCacheResync(t *testing.T) { t.Fatalf("%v", err) } - // now lets try to simulate HTTP 410 GONE exception which should force RetryWatcher to stop and force - // a cache resync. The ERROR eventwhich we'll send below should trigger a re-sync of the cache in the + // now lets try to simulate HTTP 410 GONE exception which should force + // RetryWatcher to stop and force a cache resync. The ERROR event which + // we'll send below should trigger a re-sync of the cache in the // background: a FLUSHDB followed by a SET - ctx := context.Background() - var watcher *watch.RaceFreeFakeWatcher - if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + _, watcher, err := ctrlClientAndWatcher(t, s) + if err != nil { t.Fatal(err) - } else if ww, ok := ctrlClient.(*withWatchWrapper); !ok { - t.Fatalf("Unexpected condition: %s", reflect.TypeOf(ww)) - } else if watcher = ww.watcher; watcher == nil { - t.Fatalf("Unexpected condition: watcher is nil") } watcher.Error(&errors.NewGone("test HTTP 410 Gone").ErrStatus) @@ -913,7 +905,7 @@ func TestGetAvailablePackageSummaryAfterCacheResync(t *testing.T) { // set up expectations mock.ExpectFlushDB().SetVal("OK") - if _, _, err := s.redisMockSetValueForRepo(mock, *repo); err != nil { + if _, _, err := s.redisMockSetValueForRepo(mock, *repo, nil); err != nil { t.Fatalf("%+v", err) } @@ -947,8 +939,7 @@ func TestGetAvailablePackageSummaryAfterCacheResync(t *testing.T) { } // test that causes RetryWatcher to stop and the cache needs to resync when there are -// lots of pending work items -// this test is focused on the repo cache work queue +// lots of pending work items. this test is focused on the repo cache work queue func TestGetAvailablePackageSummariesAfterCacheResyncQueueNotIdle(t *testing.T) { t.Run("test that causes RetryWatcher to stop and the repo cache needs to resync", func(t *testing.T) { // start with an empty server that only has an empty repo cache @@ -980,28 +971,22 @@ func TestGetAvailablePackageSummariesAfterCacheResyncQueueNotIdle(t *testing.T) } mapReposCached[key] = byteArray keysInOrder = append(keysInOrder, key) - mock.ExpectGet(key).RedisNil() - redisMockSetValueForRepo(mock, key, byteArray) + redisMockSetValueForRepo(mock, key, byteArray, nil) repos = append(repos, repo) } s.repoCache.ExpectAdd(keysInOrder[0]) - var watcher *watch.RaceFreeFakeWatcher - ctx := context.Background() - if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + ctrlClient, watcher, err := ctrlClientAndWatcher(t, s) + if err != nil { t.Fatal(err) } else { + ctx := context.Background() for _, r := range repos { if err = ctrlClient.Create(ctx, r); err != nil { t.Fatal(err) } } - if ww, ok := ctrlClient.(*withWatchWrapper); !ok { - t.Fatalf("Unexpected condition: %s", reflect.TypeOf(ww)) - } else if watcher = ww.watcher; watcher == nil { - t.Fatalf("Unexpected condition watcher is nil") - } } done := make(chan int, 1) @@ -1033,7 +1018,7 @@ func TestGetAvailablePackageSummariesAfterCacheResyncQueueNotIdle(t *testing.T) // populateWith() which will re-populate the cache from scratch based on // the current state in k8s (all MAX_REPOS repos). for i := 0; i <= (MAX_REPOS - len); i++ { - redisMockSetValueForRepo(mock, keysInOrder[i], mapReposCached[keysInOrder[i]]) + redisMockSetValueForRepo(mock, keysInOrder[i], mapReposCached[keysInOrder[i]], nil) } // now we can signal to the server it's ok to proceed resyncCh <- 0 @@ -1121,21 +1106,15 @@ func TestGetAvailablePackageSummariesAfterCacheResyncQueueIdle(t *testing.T) { if err != nil { t.Fatalf("%+v", err) } - mock.ExpectGet(key).RedisNil() - redisMockSetValueForRepo(mock, key, byteArray) + redisMockSetValueForRepo(mock, key, byteArray, nil) s.repoCache.ExpectAdd(key) - var watcher *watch.RaceFreeFakeWatcher - ctx := context.Background() - if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + ctrlClient, watcher, err := ctrlClientAndWatcher(t, s) + if err != nil { t.Fatal(err) - } else if err = ctrlClient.Create(ctx, repo); err != nil { + } else if err = ctrlClient.Create(context.Background(), repo); err != nil { t.Fatal(err) - } else if ww, ok := ctrlClient.(*withWatchWrapper); !ok { - t.Fatalf("Unexpected condition: %s", reflect.TypeOf(ww)) - } else if watcher = ww.watcher; watcher == nil { - t.Fatalf("Unexpected condition: watcher is nil") } done := make(chan int, 1) @@ -1161,7 +1140,7 @@ func TestGetAvailablePackageSummariesAfterCacheResyncQueueIdle(t *testing.T) { t.Errorf("ERROR: Expected empty repo work queue!") } else { mock.ExpectFlushDB().SetVal("OK") - redisMockSetValueForRepo(mock, key, byteArray) + redisMockSetValueForRepo(mock, key, byteArray, nil) // now we can signal to the server it's ok to proceed resyncCh <- 0 s.repoCache.WaitUntilResyncComplete() @@ -1338,39 +1317,15 @@ func TestGetPackageRepositories(t *testing.T) { func newServerWithRepos(t *testing.T, repos []sourcev1.HelmRepository, charts []testSpecChartWithUrl, secrets []runtime.Object) (*Server, redismock.ClientMock, error) { typedClient := typfake.NewSimpleClientset(secrets...) apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD) - - // register the GitOps Toolkit schema definitions - scheme := runtime.NewScheme() - _ = sourcev1.AddToScheme(scheme) - _ = helmv2.AddToScheme(scheme) - - rm := apimeta.NewDefaultRESTMapper([]schema.GroupVersion{sourcev1.GroupVersion, helmv2.GroupVersion}) - rm.Add(schema.GroupVersionKind{ - Group: sourcev1.GroupVersion.Group, - Version: sourcev1.GroupVersion.Version, - Kind: sourcev1.HelmRepositoryKind}, - apimeta.RESTScopeNamespace) - rm.Add(schema.GroupVersionKind{ - Group: helmv2.GroupVersion.Group, - Version: helmv2.GroupVersion.Version, - Kind: helmv2.HelmReleaseKind}, - apimeta.RESTScopeNamespace) - - ctrlClientBuilder := ctrlfake.NewClientBuilder().WithScheme(scheme).WithRESTMapper(rm) - if len(repos) > 0 { - ctrlClientBuilder = ctrlClientBuilder.WithLists(&sourcev1.HelmRepositoryList{Items: repos}) - } - ctrlClient := &withWatchWrapper{delegate: ctrlClientBuilder.Build()} - + ctrlClient := newCtrlClient(repos, nil, nil) clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) { return clientgetter. NewBuilder(). WithTyped(typedClient). WithApiExt(apiextIfc). - WithControllerRuntime(ctrlClient). + WithControllerRuntime(&ctrlClient). Build(), nil } - return newServer(t, clientGetter, nil, repos, charts) } @@ -1449,7 +1404,7 @@ func (s *Server) redisMockExpectGetFromRepoCache(mock redismock.ClientMock, filt return nil } -func (s *Server) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository) (key string, bytes []byte, err error) { +func (s *Server) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository, oldValue []byte) (key string, bytes []byte, err error) { backgroundClientGetter := func(ctx context.Context) (clientgetter.ClientInterfaces, error) { return s.clientGetter(ctx, s.kubeappsCluster) } @@ -1457,24 +1412,34 @@ func (s *Server) redisMockSetValueForRepo(mock redismock.ClientMock, repo source clientGetter: backgroundClientGetter, chartCache: nil, } - return sink.redisMockSetValueForRepo(mock, repo) + return sink.redisMockSetValueForRepo(mock, repo, oldValue) } -func (sink *repoEventSink) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository) (key string, byteArray []byte, err error) { +func (sink *repoEventSink) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository, oldValue []byte) (key string, newValue []byte, err error) { if key, err = redisKeyForRepo(repo); err != nil { return key, nil, err } - if key, byteArray, err = sink.redisKeyValueForRepo(repo); err != nil { + if key, newValue, err = sink.redisKeyValueForRepo(repo); err != nil { + if oldValue == nil { + mock.ExpectGet(key).RedisNil() + } else { + mock.ExpectGet(key).SetVal(string(oldValue)) + } mock.ExpectDel(key).SetVal(0) return key, nil, err } else { - redisMockSetValueForRepo(mock, key, byteArray) - return key, byteArray, nil + redisMockSetValueForRepo(mock, key, newValue, oldValue) + return key, newValue, nil } } -func redisMockSetValueForRepo(mock redismock.ClientMock, key string, byteArray []byte) { - mock.ExpectSet(key, byteArray, 0).SetVal("OK") +func redisMockSetValueForRepo(mock redismock.ClientMock, key string, newValue, oldValue []byte) { + if oldValue == nil { + mock.ExpectGet(key).RedisNil() + } else { + mock.ExpectGet(key).SetVal(string(oldValue)) + } + mock.ExpectSet(key, newValue, 0).SetVal("OK") mock.ExpectInfo("memory").SetVal("used_memory_rss_human:NA\r\nmaxmemory_human:NA") } diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go index 9d2580f0ab0..73fbd8cae22 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go @@ -87,8 +87,8 @@ func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string, // register the GitOps Toolkit schema definitions scheme := runtime.NewScheme() - _ = sourcev1.AddToScheme(scheme) - _ = helmv2.AddToScheme(scheme) + sourcev1.AddToScheme(scheme) + helmv2.AddToScheme(scheme) s := repoEventSink{ clientGetter: clientgetter.NewBackgroundClientGetter( diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go index 4635a79ba51..fe11a5c9bac 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go @@ -373,17 +373,24 @@ func newServer(t *testing.T, okRepos := sets.String{} for _, r := range repos { + key, err := redisKeyForRepo(r) + if err != nil { + t.Logf("Skipping repo [%s] due to %+v", key, err) + continue + } if isRepoReady(r) { // we are willfully just logging any errors coming from redisMockSetValueForRepo() // here and just skipping over to next repo. This is done for test // TestGetAvailablePackagesStatus where we make sure that even if the flux CRD happens // to be invalid flux plug in can still operate - key, _, err := sink.redisMockSetValueForRepo(mock, r) + _, _, err = sink.redisMockSetValueForRepo(mock, r, nil) if err != nil { t.Logf("Skipping repo [%s] due to %+v", key, err) } else { okRepos.Insert(key) } + } else { + mock.ExpectGet(key).RedisNil() } } diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/test_util_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/test_util_test.go index c79529bff8b..ab719cd9505 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/test_util_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/test_util_test.go @@ -10,6 +10,8 @@ import ( "reflect" "testing" + helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/google/go-cmp/cmp" corev1 "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1" plugins "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1" @@ -17,11 +19,13 @@ import ( k8scorev1 "k8s.io/api/core/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/api/meta" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" + ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) const KubeappsCluster = "default" @@ -44,7 +48,7 @@ func (w *withWatchWrapper) Get(ctx context.Context, key client.ObjectKey, obj cl func (w *withWatchWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { if err := w.delegate.List(ctx, list, opts...); err != nil { return err - } else if accessor, err := meta.ListAccessor(list); err != nil { + } else if accessor, err := apimeta.ListAccessor(list); err != nil { return err } else { accessor.SetResourceVersion("1") @@ -64,7 +68,7 @@ func (w *withWatchWrapper) Patch(ctx context.Context, obj client.Object, patch c return w.delegate.Patch(ctx, obj, patch, opts...) } -func (w *withWatchWrapper) RESTMapper() meta.RESTMapper { +func (w *withWatchWrapper) RESTMapper() apimeta.RESTMapper { return w.delegate.RESTMapper() } @@ -234,6 +238,59 @@ func installedRef(id, namespace string) *corev1.InstalledPackageReference { } } +func newCtrlClient(repos []sourcev1.HelmRepository, charts []sourcev1.HelmChart, releases []helmv2.HelmRelease) withWatchWrapper { + // register the flux GitOps Toolkit schema definitions + scheme := runtime.NewScheme() + sourcev1.AddToScheme(scheme) + helmv2.AddToScheme(scheme) + + rm := apimeta.NewDefaultRESTMapper([]schema.GroupVersion{sourcev1.GroupVersion, helmv2.GroupVersion}) + rm.Add(schema.GroupVersionKind{ + Group: sourcev1.GroupVersion.Group, + Version: sourcev1.GroupVersion.Version, + Kind: sourcev1.HelmRepositoryKind}, + apimeta.RESTScopeNamespace) + rm.Add(schema.GroupVersionKind{ + Group: sourcev1.GroupVersion.Group, + Version: sourcev1.GroupVersion.Version, + Kind: sourcev1.HelmChartKind}, + apimeta.RESTScopeNamespace) + rm.Add(schema.GroupVersionKind{ + Group: helmv2.GroupVersion.Group, + Version: helmv2.GroupVersion.Version, + Kind: helmv2.HelmReleaseKind}, + apimeta.RESTScopeNamespace) + + ctrlClientBuilder := ctrlfake.NewClientBuilder().WithScheme(scheme).WithRESTMapper(rm) + initLists := []client.ObjectList{} + if len(repos) > 0 { + initLists = append(initLists, &sourcev1.HelmRepositoryList{Items: repos}) + } + if len(charts) > 0 { + initLists = append(initLists, &sourcev1.HelmChartList{Items: charts}) + } + if len(releases) > 0 { + initLists = append(initLists, &helmv2.HelmReleaseList{Items: releases}) + } + if len(initLists) > 0 { + ctrlClientBuilder = ctrlClientBuilder.WithLists(initLists...) + } + return withWatchWrapper{delegate: ctrlClientBuilder.Build()} +} + +func ctrlClientAndWatcher(t *testing.T, s *Server) (client.WithWatch, *watch.RaceFreeFakeWatcher, error) { + ctx := context.Background() + if ctrlClient, err := s.clientGetter.ControllerRuntime(ctx, s.kubeappsCluster); err != nil { + return nil, nil, err + } else if ww, ok := ctrlClient.(*withWatchWrapper); !ok { + return nil, nil, fmt.Errorf("Could not cast %s to: *withWatchWrapper", reflect.TypeOf(ctrlClient)) + } else if watcher := ww.watcher; watcher == nil { + return nil, nil, fmt.Errorf("Unexpected condition watcher is nil") + } else { + return ctrlClient, watcher, nil + } +} + // misc global vars that get re-used in multiple tests var fluxPlugin = &plugins.Plugin{Name: "fluxv2.packages", Version: "v1alpha1"} var fluxHelmRepositoryCRD = &apiextv1.CustomResourceDefinition{ diff --git a/script/makefiles/deploy-dev.mk b/script/makefiles/deploy-dev.mk index 94678403384..0b43c72a61c 100644 --- a/script/makefiles/deploy-dev.mk +++ b/script/makefiles/deploy-dev.mk @@ -61,7 +61,7 @@ deploy-kapp-controller: # Add the flux controllers used for testing the kubeapps-apis integration. deploy-flux-controllers: - kubectl --kubeconfig=${CLUSTER_CONFIG} apply -f https://github.com/fluxcd/flux2/releases/download/v0.26.0/install.yaml + kubectl --kubeconfig=${CLUSTER_CONFIG} apply -f https://github.com/fluxcd/flux2/releases/download/v0.26.3/install.yaml reset-dev: helm --kubeconfig=${CLUSTER_CONFIG} -n kubeapps delete kubeapps || true