Skip to content

Commit

Permalink
tmp: Use wrapper type for managing init events
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed May 9, 2024
1 parent b3f2d70 commit 521597e
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/sync/semaphore"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -138,10 +137,9 @@ func makeAutoscaleEnforcerPlugin(
// The initial state building is complete when the counter reaches zero, at which point we close
// the channel that this function will wait on.

var initEventsCount atomic.Int64
var initEventsDone *semaphore.Weighted // will be set after starting the watchers
var initEventsCount atomic.Int32
var initEvents *eventCounter
incEventCount := func() { initEventsCount.Add(1) }
decEventCount := func() { initEventsDone.Release(1) }

hlogger := logger.Named("handlers")
nwc := nodeWatchCallbacks{
Expand All @@ -157,7 +155,7 @@ func makeAutoscaleEnforcerPlugin(
pushToQueue(logger, pod.Name, func() {
p.handleStarted(hlogger, pod)
if preexisting {
decEventCount()
initEvents.dec()
}
})
},
Expand Down Expand Up @@ -224,12 +222,9 @@ func makeAutoscaleEnforcerPlugin(

watchMetrics.MustRegister(promReg)

// Set up the semaphore tracking the initial events, now that we know the count:
// Set up tracking the initial events, now that we know the count:
totalQueued := initEventsCount.Load()
initEventsDone = semaphore.NewWeighted(totalQueued)
if !initEventsDone.TryAcquire(totalQueued) {
logger.Panic("failed to acquire all permits from init events semaphore", zap.Int64("count", totalQueued))
}
initEvents = newEventCounter(totalQueued)

// Start handling the queued events. Any handling of initial events will gradually reduce
// initEventsCount, and eventually we'll close(initEventsDone) to mark initial event handling as
Expand All @@ -254,16 +249,16 @@ func makeAutoscaleEnforcerPlugin(
}

// Wait for all the initial events to be handled.
logger.Info("Waiting on initial events processing to be done", zap.Int64("count", totalQueued))
logger.Info("Waiting on initial events processing to be done", zap.Int32("count", totalQueued))
initEventsTimeout := time.Second * time.Duration(p.state.conf.StartupEventHandlingTimeoutSeconds)
initEventsCtx, cancel := context.WithTimeout(ctx, initEventsTimeout)
defer cancel()
initEventsWaitStart := time.Now()
if err := initEventsDone.Acquire(initEventsCtx, totalQueued); err != nil {
select {
case <-initEvents.done():
// Done
case <-time.After(initEventsTimeout):
return nil, fmt.Errorf(
"Timed out waiting on initial events processing to complete after %s (%d remaining)",
time.Since(initEventsWaitStart),
initEventsCount.Load(),
initEventsTimeout,
initEvents.getRemaining(),
)
}
logger.Info("Initial events processing complete")
Expand All @@ -287,6 +282,41 @@ func makeAutoscaleEnforcerPlugin(
return &p, nil
}

// monotonically decreasing event counter that closes a channel once all events have been completed
// with dec().
//
// Used to make sure we've processed all the initial events before returning from
// makeAutoscaleEnforcerPlugin().
type eventCounter struct {
remaining atomic.Int32
signalDone chan struct{}
}

func newEventCounter(remaining int32) *eventCounter {
c := &eventCounter{
remaining: atomic.Int32{},
signalDone: make(chan struct{}),
}

c.remaining.Store(remaining)
return c
}

func (c *eventCounter) dec() {
r := c.remaining.Add(-1)
if r == 0 {
close(c.signalDone)
}
}

func (c *eventCounter) getRemaining() int32 {
return c.remaining.Load()
}

func (c *eventCounter) done() <-chan struct{} {
return c.signalDone
}

// Name returns the name of the AutoscaleEnforcer plugin
//
// Required for framework.Plugin
Expand Down

0 comments on commit 521597e

Please sign in to comment.