Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Use correct context to cancel "list and watch" & wait for all informers to complete #2121

Merged
merged 2 commits into from Jan 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 48 additions & 28 deletions pkg/cache/internal/informers_map.go
Expand Up @@ -117,9 +117,6 @@ type InformersMap struct {
// paramCodec is used by list and watch
paramCodec runtime.ParameterCodec

// stop is the stop channel to stop informers
stop <-chan struct{}

// resync is the base frequency the informers are resynced
// a 10 percent jitter will be added to the resync period between informers
// so that all informers will not send list requests simultaneously.
Expand All @@ -128,13 +125,22 @@ type InformersMap struct {
// mu guards access to the map
mu sync.RWMutex

// start is true if the informers have been started
// started is true if the informers have been started
started bool

// startWait is a channel that is closed after the
// informer has been started.
startWait chan struct{}

// waitGroup is the wait group that is used to wait for all informers to stop
waitGroup sync.WaitGroup

// stopped is true if the informers have been stopped
stopped bool

// ctx is the context to stop informers
ctx context.Context

// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
Expand All @@ -157,28 +163,47 @@ func (ip *InformersMap) Start(ctx context.Context) error {
ip.mu.Lock()
defer ip.mu.Unlock()

// Set the stop channel so it can be passed to informers that are added later
ip.stop = ctx.Done()
// Set the context so it can be passed to informers that are added later
ip.ctx = ctx

// Start each informer
for _, i := range ip.informers.Structured {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Unstructured {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Metadata {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}

// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-ctx.Done()
<-ctx.Done() // Block until the context is done
ip.mu.Lock()
ip.stopped = true // Set stopped to true so we don't start any new informers
ip.mu.Unlock()
ip.waitGroup.Wait() // Block until all informers have stopped
return nil
}

func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
if ip.stopped {
return
}

ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
}()
}

func (ip *InformersMap) waitForStarted(ctx context.Context) bool {
select {
case <-ip.startWait:
Expand Down Expand Up @@ -307,20 +332,15 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
}
ip.informersByType(obj)[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
go i.Informer.Run(ip.stop)
ip.startInformerLocked(i.Informer)
}
return i, ip.started, nil
}

func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
// TODO(vincepri): Wire the context in here and don't use TODO().
// Can we use the context from the Get call?
ctx := context.TODO()

// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand Down Expand Up @@ -351,16 +371,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if namespace != "" {
return resources.Namespace(namespace).List(ctx, opts)
return resources.Namespace(namespace).List(ip.ctx, opts)
}
return resources.List(ctx, opts)
return resources.List(ip.ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
if namespace != "" {
return resources.Namespace(namespace).Watch(ctx, opts)
return resources.Namespace(namespace).Watch(ip.ctx, opts)
}
return resources.Watch(ctx, opts)
return resources.Watch(ip.ctx, opts)
},
}, nil
//
Expand All @@ -386,9 +406,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
err error
)
if namespace != "" {
list, err = resources.Namespace(namespace).List(ctx, opts)
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
} else {
list, err = resources.List(ctx, opts)
list, err = resources.List(ip.ctx, opts)
}
if list != nil {
for i := range list.Items {
Expand All @@ -400,9 +420,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
if namespace != "" {
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
} else {
watcher, err = resources.Watch(ctx, opts)
watcher, err = resources.Watch(ip.ctx, opts)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -433,7 +453,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime

// Create the resulting object, and execute the request.
res := listObj.DeepCopyObject()
if err := req.Do(ctx).Into(res); err != nil {
if err := req.Do(ip.ctx).Into(res); err != nil {
return nil, err
}
return res, nil
Expand All @@ -446,7 +466,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
req.Namespace(namespace)
}
// Call the watch.
return req.Watch(ctx)
return req.Watch(ip.ctx)
},
}, nil
}
Expand Down