Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RunUntil to the Reflector and Poller to allow early termination #3700

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 16 additions & 8 deletions pkg/client/cache/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,22 @@ func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {

// Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() {
go util.Forever(func() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}, p.period)
go util.Forever(p.run, p.period)
}

// RunUntil begins polling. It starts a goroutine and returns immediately.
// It will stop when the stopCh is closed.
func (p *Poller) RunUntil(stopCh <-chan struct{}) {
go util.Until(p.run, p.period, stopCh)
}

func (p *Poller) run() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}

func (p *Poller) sync(e Enumerator) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func (r *Reflector) Run() {
go util.Forever(func() { r.listAndWatch() }, r.period)
}

// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
}

func (r *Reflector) listAndWatch() {
var resourceVersion string

Expand Down