Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared informer custom resync periods #40759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/clock"
)

// Config contains all the settings for a Controller.
Expand Down Expand Up @@ -50,13 +51,23 @@ type Config struct {
// queue.
FullResyncPeriod time.Duration

// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
ShouldResync ShouldResyncFunc

// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}

// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool

// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

Expand All @@ -65,6 +76,7 @@ type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}

type Controller interface {
Expand All @@ -77,6 +89,7 @@ type Controller interface {
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
Expand All @@ -92,6 +105,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock

c.reflectorMutex.Lock()
c.reflector = r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// TestPopReleaseLock tests that when processor listener blocks on chan,
// it should release the lock for pendingNotifications.
func TestPopReleaseLock(t *testing.T) {
pl := newProcessListener(nil)
pl := newProcessListener(nil, 0, 0, time.Now())
stopCh := make(chan struct{})
defer close(stopCh)
// make pop() block on nextCh: waiting for receiver to get notification.
Expand Down
26 changes: 15 additions & 11 deletions staging/src/k8s.io/client-go/tools/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/clock"
)

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
Expand All @@ -58,8 +59,9 @@ type Reflector struct {
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
// now() returns current time - exposed for testing purposes
now func() time.Time
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
Expand Down Expand Up @@ -103,7 +105,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
now: time.Now,
clock: &clock.RealClock{},
}
return r
}
Expand Down Expand Up @@ -223,8 +225,8 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// always fail so we end up listing frequently. Then, if we don't
// manually stop the timer, we could end up with many timers active
// concurrently.
t := time.NewTimer(r.resyncPeriod)
return t.C, t.Stop
t := r.clock.NewTimer(r.resyncPeriod)
return t.C(), t.Stop
}

// ListAndWatch first lists all items and get the resource version at the moment of call,
Expand Down Expand Up @@ -270,10 +272,12 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case <-cancelCh:
return
}
glog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
if r.ShouldResync == nil || r.ShouldResync() {
glog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
Expand Down Expand Up @@ -334,7 +338,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := time.Now()
start := r.clock.Now()
eventCount := 0

// Stopping the watcher should be idempotent and if we return from this function there's no way
Expand Down Expand Up @@ -393,7 +397,7 @@ loop:
}
}

watchDuration := time.Now().Sub(start)
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
return errors.New("very short watch")
Expand Down