From 45e7aa7ab1fa4e8675848e2bcd9058c14cb0131a Mon Sep 17 00:00:00 2001 From: Tim Ramlot <42113979+inteon@users.noreply.github.com> Date: Mon, 9 Jan 2023 18:52:18 +0100 Subject: [PATCH] improve code based on feedback from review Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com> --- pkg/cache/internal/informers_map.go | 50 +++++++++++++---------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index a3ad6fd720..f798e0c3d2 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -163,32 +163,18 @@ func (ip *InformersMap) Start(ctx context.Context) error { ip.mu.Lock() defer ip.mu.Unlock() - // Set the stop channel so it can be passed to informers that are added later + // Set the context so it can be passed to informers that are added later ip.ctx = ctx - ip.waitGroup.Add(len(ip.informers.Structured) + len(ip.informers.Unstructured) + len(ip.informers.Metadata)) - // Start each informer for _, i := range ip.informers.Structured { - i := i - go func() { - defer ip.waitGroup.Done() - i.Informer.Run(ctx.Done()) - }() + ip.startInformerLocked(i.Informer) } for _, i := range ip.informers.Unstructured { - i := i - go func() { - defer ip.waitGroup.Done() - i.Informer.Run(ctx.Done()) - }() + ip.startInformerLocked(i.Informer) } for _, i := range ip.informers.Metadata { - i := i - go func() { - defer ip.waitGroup.Done() - i.Informer.Run(ctx.Done()) - }() + ip.startInformerLocked(i.Informer) } // Set started to true so we immediately start any informers added later. @@ -203,6 +189,21 @@ func (ip *InformersMap) Start(ctx context.Context) error { return nil } +func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) { + // Don't start the informer in case we are already waiting for the items in + // the waitGroup to finish, since waitGroups don't support waiting and adding + // at the same time. + if ip.stopped { + return + } + + ip.waitGroup.Add(1) + go func() { + defer ip.waitGroup.Done() + informer.Run(ip.ctx.Done()) + }() +} + func (ip *InformersMap) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: @@ -331,15 +332,10 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim } ip.informersByType(obj)[gvk] = i - // Start the Informer if need by - // TODO(seans): write thorough tests and document what happens here - can you add indexers? - // can you add eventhandlers? - if ip.started && !ip.stopped { - ip.waitGroup.Add(1) - go func() { - defer ip.waitGroup.Done() - i.Informer.Run(ip.ctx.Done()) - }() + // Start the informer in case the InformersMap has started, otherwise it will be + // started when the InformersMap starts. + if ip.started { + ip.startInformerLocked(i.Informer) } return i, ip.started, nil }