Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-go: support Shutdown() for metadata and dynamic informers #114434

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer.go
Expand Up @@ -61,6 +61,12 @@ type dynamicSharedInformerFactory struct {
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
tweakListOptions TweakListOptionsFunc

// wg tracks how many goroutines were started.
wg sync.WaitGroup
// shuttingDown is true when Shutdown has been called. It may still be running
// because it needs to wait for goroutines.
shuttingDown bool
}

var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
Expand All @@ -86,9 +92,21 @@ func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

if f.shuttingDown {
return
}

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Informer().Run(stopCh)
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer.Informer()
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
Expand Down Expand Up @@ -116,6 +134,15 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
return res
}

func (f *dynamicSharedInformerFactory) Shutdown() {
// Will return immediately if there is nothing to wait for.
defer f.wg.Wait()

f.lock.Lock()
defer f.lock.Unlock()
f.shuttingDown = true
}

// NewFilteredDynamicInformer constructs a new informer for a dynamic type.
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &dynamicInformer{
Expand Down
19 changes: 19 additions & 0 deletions staging/src/k8s.io/client-go/dynamic/dynamicinformer/interface.go
Expand Up @@ -24,9 +24,28 @@ import (

// DynamicSharedInformerFactory provides access to a shared informer and lister for dynamic client
type DynamicSharedInformerFactory interface {
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
Start(stopCh <-chan struct{})

// ForResource gives generic access to a shared informer of the matching type.
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer

// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool

// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
//
// In addition, Shutdown blocks until all goroutines have terminated. For that
// to happen, the close channel(s) that they were started with must be closed,
// either before Shutdown gets called or while it is waiting.
//
// Shutdown may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
Shutdown()
}

// TweakListOptionsFunc defines the signature of a helper function
Expand Down
Expand Up @@ -60,6 +60,11 @@ type metadataSharedInformerFactory struct {
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
tweakListOptions TweakListOptionsFunc
// wg tracks how many goroutines were started.
wg sync.WaitGroup
// shuttingDown is true when Shutdown has been called. It may still be running
// because it needs to wait for goroutines.
shuttingDown bool
}

var _ SharedInformerFactory = &metadataSharedInformerFactory{}
Expand All @@ -85,9 +90,21 @@ func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

if f.shuttingDown {
return
}

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Informer().Run(stopCh)
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer.Informer()
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
Expand Down Expand Up @@ -115,6 +132,15 @@ func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
return res
}

func (f *metadataSharedInformerFactory) Shutdown() {
// Will return immediately if there is nothing to wait for.
defer f.wg.Wait()

f.lock.Lock()
defer f.lock.Unlock()
f.shuttingDown = true
}

// NewFilteredMetadataInformer constructs a new informer for a metadata type.
func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &metadataInformer{
Expand Down
Expand Up @@ -24,9 +24,28 @@ import (

// SharedInformerFactory provides access to a shared informer and lister for dynamic client
type SharedInformerFactory interface {
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
Start(stopCh <-chan struct{})

// ForResource gives generic access to a shared informer of the matching type.
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer

// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool

// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
//
// In addition, Shutdown blocks until all goroutines have terminated. For that
// to happen, the close channel(s) that they were started with must be closed,
// either before Shutdown gets called or while it is waiting.
//
// Shutdown may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
Shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive by-stander question, why do we need a shutdown when starts already takes a stopCh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shutdown additionally actually waits for everything to terminate is the difference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to copy the documentation from https://github.com/kubernetes/kubernetes/pull/112200/files, then someone (like @apelisse) who looks at this interface can see how to use it without having to know that some other, similar interface elsewhere explains it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, copy+pasted from your PR. thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super excited about this API, one needs to keep two handles to shutdown a factory (the stopCh, AND the factory which is error-prone and smelly). I can also call shutdown on something that hasn't been created. I can shutdown but forget to close the channel (it will hang forever?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had called the new function WaitForShutdown initially and then there were concerns about that name that led to a renaming 🤷

See #112200 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "RejectNewAndWaitForShutdown" at least captures what it actually does :(

Apologies for not spending more time on that review :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we call the method "Run" instead of "Start" and you just wait for it to exit.

I agree that the most consistent thing to do here would be to expose a Run which blocks. Start could be refactored to call Run for backwards compatibility. Is that something people are interested in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alex, defining a Run sounds good to me. It's not clear without digging in if it is good to make one function call the other or not.

We should definitely leave the existing Start functions as-is behavior-wise to not break people.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can define Run to call Start and the now DOA Shutdown, that'd be a good API with minimal change.

}

// TweakListOptionsFunc defines the signature of a helper function
Expand Down