From 521597e44affe70a50cccd1e0bf72d46fa500394 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Thu, 9 May 2024 16:40:31 -0700 Subject: [PATCH] tmp: Use wrapper type for managing init events --- pkg/plugin/plugin.go | 64 ++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index d7b569518..d69f18d5d 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -8,7 +8,6 @@ import ( "time" "go.uber.org/zap" - "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -138,10 +137,9 @@ func makeAutoscaleEnforcerPlugin( // The initial state building is complete when the counter reaches zero, at which point we close // the channel that this function will wait on. - var initEventsCount atomic.Int64 - var initEventsDone *semaphore.Weighted // will be set after starting the watchers + var initEventsCount atomic.Int32 + var initEvents *eventCounter incEventCount := func() { initEventsCount.Add(1) } - decEventCount := func() { initEventsDone.Release(1) } hlogger := logger.Named("handlers") nwc := nodeWatchCallbacks{ @@ -157,7 +155,7 @@ func makeAutoscaleEnforcerPlugin( pushToQueue(logger, pod.Name, func() { p.handleStarted(hlogger, pod) if preexisting { - decEventCount() + initEvents.dec() } }) }, @@ -224,12 +222,9 @@ func makeAutoscaleEnforcerPlugin( watchMetrics.MustRegister(promReg) - // Set up the semaphore tracking the initial events, now that we know the count: + // Set up tracking the initial events, now that we know the count: totalQueued := initEventsCount.Load() - initEventsDone = semaphore.NewWeighted(totalQueued) - if !initEventsDone.TryAcquire(totalQueued) { - logger.Panic("failed to acquire all permits from init events semaphore", zap.Int64("count", totalQueued)) - } + initEvents = newEventCounter(totalQueued) // Start handling the queued events. Any handling of initial events will gradually reduce // initEventsCount, and eventually we'll close(initEventsDone) to mark initial event handling as @@ -254,16 +249,16 @@ func makeAutoscaleEnforcerPlugin( } // Wait for all the initial events to be handled. - logger.Info("Waiting on initial events processing to be done", zap.Int64("count", totalQueued)) + logger.Info("Waiting on initial events processing to be done", zap.Int32("count", totalQueued)) initEventsTimeout := time.Second * time.Duration(p.state.conf.StartupEventHandlingTimeoutSeconds) - initEventsCtx, cancel := context.WithTimeout(ctx, initEventsTimeout) - defer cancel() - initEventsWaitStart := time.Now() - if err := initEventsDone.Acquire(initEventsCtx, totalQueued); err != nil { + select { + case <-initEvents.done(): + // Done + case <-time.After(initEventsTimeout): return nil, fmt.Errorf( "Timed out waiting on initial events processing to complete after %s (%d remaining)", - time.Since(initEventsWaitStart), - initEventsCount.Load(), + initEventsTimeout, + initEvents.getRemaining(), ) } logger.Info("Initial events processing complete") @@ -287,6 +282,41 @@ func makeAutoscaleEnforcerPlugin( return &p, nil } +// monotonically decreasing event counter that closes a channel once all events have been completed +// with dec(). +// +// Used to make sure we've processed all the initial events before returning from +// makeAutoscaleEnforcerPlugin(). +type eventCounter struct { + remaining atomic.Int32 + signalDone chan struct{} +} + +func newEventCounter(remaining int32) *eventCounter { + c := &eventCounter{ + remaining: atomic.Int32{}, + signalDone: make(chan struct{}), + } + + c.remaining.Store(remaining) + return c +} + +func (c *eventCounter) dec() { + r := c.remaining.Add(-1) + if r == 0 { + close(c.signalDone) + } +} + +func (c *eventCounter) getRemaining() int32 { + return c.remaining.Load() +} + +func (c *eventCounter) done() <-chan struct{} { + return c.signalDone +} + // Name returns the name of the AutoscaleEnforcer plugin // // Required for framework.Plugin