Skip to content

Commit

Permalink
fix for flux plugin: combine two separate code paths for populating t…
Browse files Browse the repository at this point in the history
…he cache in resync() scenario and regular case #4117  (#4276)

* attempt #2

* small DRYing in unit tests

* step 2

* step 3

* michael's feedback
  • Loading branch information
gfichtenholt committed Feb 15, 2022
1 parent b4eedda commit 40a0de1
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type ChartCache struct {
// significant in that it flushes the whole redis cache and re-populates the state from k8s.
// When that happens we don't really want any concurrent access to the cache until the resync()
// operation is complete. In other words, we want to:
// - be able to have multiple concurrent readers (goroutines doing GetForOne()/GetForMultiple())
// - be able to have multiple concurrent readers (goroutines doing GetForOne())
// - only a single writer (goroutine doing a resync()) is allowed, and while its doing its job
// no readers are allowed
resyncCond *sync.Cond
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
log "k8s.io/klog/v2"
)

// Inspired by https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go and
// by https://github.com/kubernetes/client-go/blob/v0.22.4/util/workqueue/rate_limiting_queue.go
// but adds a few funcs, like Name(), ExpectAdd(), WaitUntilForgotten() and Reset()

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
workqueue.RateLimitingInterface
Expand Down Expand Up @@ -83,19 +87,17 @@ func (q *rateLimitingType) Reset() {

q.queue.reset()

// this way we "forget" about ratelimit failures
// this way we "forget" about ratelimit failures, i.e. the items queued up
// via previous call(s) to .AddRateLimited() (i.e. via q.DelayingInterface.AddAfter)
q.rateLimiter = workqueue.DefaultControllerRateLimiter()

// TODO (gfichtenholt) Also need to "forget" the items queued up via previous call(s)
// to .AddRateLimited() (i.e. via q.DelayingInterface.AddAfter) ?
}

// only used in unit tests
func (q *rateLimitingType) ExpectAdd(item string) {
q.queue.expectAdd(item)
}

// used in unit test and production code, when a repo/chart needs to be loaded on demand
// used in unit test AND production code, when a repo/chart needs to be loaded on demand
func (q *rateLimitingType) WaitUntilForgotten(item string) {
q.queue.waitUntilDone(item)
// q.queue might be done with the item, but it may have been
Expand Down

0 comments on commit 40a0de1

Please sign in to comment.