Skip to content

Commit

Permalink
fix for kubeapps-apis CrashLoopBackoff #4329 and [fluxv2] non-FQDN ch…
Browse files Browse the repository at this point in the history
…art url fails on chart view #4381  (#4382)

* attempt #2

* fix for #4329 kubeapps-apis CrashLoopBackoff

* fix for  [fluxv2] non-FQDN chart url fails on chart view #4381

* forgot two files

* added integration test for flux helm release auto-update

* moved test index yamls into a separate subdirectory not to crowd testdata

* fixed chart_cache.go to be consistent with latest helm code

* introduce retries+exponential backoff into NewRedisClientFromEnv

* fix retries in NewRedisClientFromEnv
  • Loading branch information
gfichtenholt committed Mar 7, 2022
1 parent e8953ea commit f439918
Show file tree
Hide file tree
Showing 33 changed files with 796 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/gob"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -139,11 +140,33 @@ func (c *ChartCache) SyncCharts(charts []models.Chart, clientOptions *common.Cli

// The tarball URL will always be the first URL in the repo.chartVersions.
// So says the helm plugin :-)
// however, not everybody agrees:
// ref https://github.com/helm/helm/blob/65d8e72504652e624948f74acbba71c51ac2e342/pkg/downloader/chart_downloader.go#L296
u, err := url.Parse(chart.ChartVersions[0].URLs[0])
if err != nil {
return fmt.Errorf("invalid URL format for chart [%s]: %v", chart.ID, err)
}

// If the URL is relative (no scheme), prepend the chart repo's base URL
// ref https://github.com/kubeapps/kubeapps/issues/4381
// ref https://github.com/helm/helm/blob/65d8e72504652e624948f74acbba71c51ac2e342/pkg/downloader/chart_downloader.go#L303
if !u.IsAbs() {
repoURL, err := url.Parse(chart.Repo.URL)
if err != nil {
return fmt.Errorf("invalid URL format for chart repo [%s]: %v", chart.ID, err)
}
q := repoURL.Query()
// We need a trailing slash for ResolveReference to work, but make sure there isn't already one
repoURL.Path = strings.TrimSuffix(repoURL.Path, "/") + "/"
u = repoURL.ResolveReference(u)
u.RawQuery = q.Encode()
}

entry := chartCacheStoreEntry{
namespace: chart.Repo.Namespace,
id: chart.ID,
version: chart.ChartVersions[0].Version,
url: chart.ChartVersions[0].URLs[0],
url: u.String(),
clientOptions: clientOptions,
deleted: false,
}
Expand Down Expand Up @@ -387,7 +410,11 @@ func (c *ChartCache) syncHandler(workerName, key string) error {

// this is effectively a cache GET operation
func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
c.resyncCond.L.(*sync.RWMutex).RLock()
defer c.resyncCond.L.(*sync.RWMutex).RUnlock()

log.Infof("+FetchForOne(%s)", key)

// read back from cache: should be either:
// - what we previously wrote OR
// - redis.Nil if the key does not exist or has been evicted due to memory pressure/TTL expiry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ type NamespacedResourceWatcherCacheConfig struct {
ListItemsFunc GetListItemsFunc
}

func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWatcherCacheConfig, redisCli *redis.Client, stopCh <-chan struct{}) (*NamespacedResourceWatcherCache, error) {
// invokeExpectResync arg is only set to true for by unit tests only
func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWatcherCacheConfig, redisCli *redis.Client, stopCh <-chan struct{}, invokeExpectResync bool) (*NamespacedResourceWatcherCache, error) {
log.Infof("+NewNamespacedResourceWatcherCache(%s, %v, %v)", name, config.Gvr, redisCli)

if redisCli == nil {
Expand All @@ -160,29 +161,30 @@ func NewNamespacedResourceWatcherCache(name string, config NamespacedResourceWat
resyncCond: sync.NewCond(&sync.RWMutex{}),
}

// confidence test that the specified GVR is a valid registered CRD
// sanity check that the specified GVR is a valid registered CRD
if err := c.isGvrValid(); err != nil {
return nil, err
}

// this will launch a single worker that processes items on the work queue as they come in
// runWorker will loop until "something bad" happens. The .Until() func will
// this will launch a single worker that processes items on the work queue as they
// come in runWorker will loop until "something bad" happens. The .Until() func will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)

// let's do the initial sync and creating a new RetryWatcher here so
// bootstrap errors, if any, are flagged early synchronously and the
// caller does not end up with a partially initialized cache

// RetryWatcher will take care of re-starting the watcher if the underlying channel
// happens to close for some reason, as well as recover from other failures
// at the same time ensuring not to replay events that have been processed
watcher, err := c.resyncAndNewRetryWatcher(true)
if err != nil {
return nil, err
// this is needed by unit tests only. Since the potential lengthy bootstrap is done
// asynchronously (see below), the this func will set a condition before returning and
// the unit test will wait for for this condition to complete WaitUntilResyncComplete().
// That's how it knows when the bootstrap is done
if invokeExpectResync {
if _, err := c.ExpectResync(); err != nil {
return nil, err
}
}

go c.watchLoop(watcher, stopCh)
// per https://github.com/kubeapps/kubeapps/issues/4329
// we want to do this asynchronously, so that having to parse existing large repos in the cluster
// doesn't block the kubeapps apis pod start-up
go c.syncAndStartWatchLoop(stopCh)
return &c, nil
}

Expand Down Expand Up @@ -211,6 +213,22 @@ func (c *NamespacedResourceWatcherCache) isGvrValid() error {
return fmt.Errorf("CRD [%s] is not valid", c.config.Gvr)
}

func (c *NamespacedResourceWatcherCache) syncAndStartWatchLoop(stopCh <-chan struct{}) {
// RetryWatcher will take care of re-starting the watcher if the underlying channel
// happens to close for some reason, as well as recover from other failures
// at the same time ensuring not to replay events that have been processed
watcher, err := c.resyncAndNewRetryWatcher(true)
if err != nil {
err = fmt.Errorf(
"[%s]: Initial resync failed after [%d] retries were exhausted, last error: %v",
c.queue.Name(), maxWatcherCacheRetries, err)
// yes, I really want this to panic. Something is seriously wrong and
// possibly restarting kubeapps-apis server is needed...
runtime.Must(err)
}
c.watchLoop(watcher, stopCh)
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import (
// -rw-rw-rw-@ 1 gfichtenholt staff 10394218 Nov 7 19:41 bitnami_index.yaml
// Also now we are caching helmcharts themselves for each repo so that will affect how many will fit too
func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *testing.T) {
fluxPlugin, _ := checkEnv(t)
fluxPlugin, _, err := checkEnv(t)
if err != nil {
t.Fatal(err)
}

redisCli, err := newRedisClientForIntegrationTest(t)
if err != nil {
Expand All @@ -54,12 +57,6 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes
if err = redisCli.ConfigSet(redisCli.Context(), "notify-keyspace-events", "EA").Err(); err != nil {
t.Fatalf("%+v", err)
}
t.Cleanup(func() {
t.Logf("Resetting notify-keyspace-events")
if err = redisCli.ConfigSet(redisCli.Context(), "notify-keyspace-events", "").Err(); err != nil {
t.Logf("%v", err)
}
})

if err = initNumberOfChartsInBitnamiCatalog(t); err != nil {
t.Errorf("Failed to get number of charts in bitnami catalog due to: %v", err)
Expand Down Expand Up @@ -91,7 +88,7 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes
for ; totalRepos < MAX_REPOS_NEVER && evictedRepos.Len() == 0; totalRepos++ {
repo := fmt.Sprintf("bitnami-%d", totalRepos)
// this is to make sure we allow enough time for repository to be created and come to ready state
if err = kubeAddHelmRepository(t, repo, "https://charts.bitnami.com/bitnami", "default", ""); err != nil {
if err = kubeAddHelmRepository(t, repo, "https://charts.bitnami.com/bitnami", "default", "", 0); err != nil {
t.Fatalf("%v", err)
}
t.Cleanup(func() {
Expand Down Expand Up @@ -126,7 +123,10 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes

// one particular code path I'd like to test:
// make sure that GetAvailablePackageVersions() works w.r.t. a cache entry that's been evicted
grpcContext := newGrpcAdminContext(t, "test-create-admin")
grpcContext, err := newGrpcAdminContext(t, "test-create-admin")
if err != nil {
t.Fatal(err)
}

// copy the evicted list because before ForEach loop below will modify it in a goroutine
evictedCopy := sets.StringKeySet(evictedRepos)
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes
for ; totalRepos < MAX_REPOS_NEVER && evictedRepos.Len() == evictedCopy.Len(); totalRepos++ {
repo := fmt.Sprintf("bitnami-%d", totalRepos)
// this is to make sure we allow enough time for repository to be created and come to ready state
if err = kubeAddHelmRepository(t, repo, "https://charts.bitnami.com/bitnami", "default", ""); err != nil {
if err = kubeAddHelmRepository(t, repo, "https://charts.bitnami.com/bitnami", "default", "", 0); err != nil {
t.Fatalf("%v", err)
}
t.Cleanup(func() {
Expand Down

0 comments on commit f439918

Please sign in to comment.