Skip to content

Commit

Permalink
improve code based on feedback from review
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Jan 9, 2023
1 parent e466b38 commit 45e7aa7
Showing 1 changed file with 23 additions and 27 deletions.
50 changes: 23 additions & 27 deletions pkg/cache/internal/informers_map.go
Expand Up @@ -163,32 +163,18 @@ func (ip *InformersMap) Start(ctx context.Context) error {
ip.mu.Lock()
defer ip.mu.Unlock()

// Set the stop channel so it can be passed to informers that are added later
// Set the context so it can be passed to informers that are added later
ip.ctx = ctx

ip.waitGroup.Add(len(ip.informers.Structured) + len(ip.informers.Unstructured) + len(ip.informers.Metadata))

// Start each informer
for _, i := range ip.informers.Structured {
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Unstructured {
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Metadata {
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
ip.startInformerLocked(i.Informer)
}

// Set started to true so we immediately start any informers added later.
Expand All @@ -203,6 +189,21 @@ func (ip *InformersMap) Start(ctx context.Context) error {
return nil
}

func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
if ip.stopped {
return
}

ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
}()
}

func (ip *InformersMap) waitForStarted(ctx context.Context) bool {
select {
case <-ip.startWait:
Expand Down Expand Up @@ -331,15 +332,10 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
}
ip.informersByType(obj)[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started && !ip.stopped {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ip.ctx.Done())
}()
// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
}
return i, ip.started, nil
}
Expand Down

0 comments on commit 45e7aa7

Please sign in to comment.