Skip to content

Commit

Permalink
plugin: Replace readClusterState with existing watch events (#904)
Browse files Browse the repository at this point in the history
In short, readClusterState is super complicated, separately reimplements
the reserveResources() logic, and may be the source of several
startup-related bugs (probably #671 and #852).

So, given that we *already* have a pathway for updating our internal
state from changes in the cluster (i.e. the watch events), we should
just use that instead.
  • Loading branch information
sharnoff committed May 22, 2024
1 parent f7d101c commit ceceb07
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 341 deletions.
1 change: 1 addition & 0 deletions deploy/scheduler/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data:
},
"schedulerName": "autoscale-scheduler",
"eventQueueWorkers": 64,
"startupEventHandlingTimeoutSeconds": 15,
"dumpState": {
"port": 10298,
"timeoutSeconds": 5
Expand Down
5 changes: 3 additions & 2 deletions pkg/plugin/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ The entrypoint for plugin initialization is through the `NewAutoscaleEnforcerPlu
`plugin.go`, which in turn:

1. Fetches the scheduler config (and starts watching for changes) (see: [`config.go`])
2. Starts watching for pod deletion events (see: [`watch.go`])
3. Loads an initial state from the cluster's resources (see: `readClusterState` in [`state.go`])
2. Starts watching for pod events, among others (see: [`watch.go`])
3. Loads an initial state from the cluster's resources (by waiting for all the initial Pod start
events to be handled)
4. Spawns the HTTP server for handling `autoscaler-agent` requests (see: [`run.go`])

The plugins we implement are:
Expand Down
12 changes: 12 additions & 0 deletions pkg/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type Config struct {
// event queue.
EventQueueWorkers int `json:"eventQueueWorkers"`

// StartupEventHandlingTimeoutSeconds gives the maximum duration, in seconds, that we are
// allowed to wait to finish handling all of the initial events generated by reading the cluster
// state on startup.
//
// If event processing takes longer than this time, then plugin creation will fail, and the
// scheduler pod will retry.
StartupEventHandlingTimeoutSeconds int `json:"startupEventHandlingTimeoutSeconds"`

// RandomizeScores, if true, will cause the scheduler to score a node with a random number in
// the range [minScore + 1, trueScore], instead of the trueScore
RandomizeScores bool `json:"randomizeScores"`
Expand Down Expand Up @@ -132,6 +140,10 @@ func (c *Config) validate() (string, error) {
return "eventQueueWorkers", errors.New("value must be > 0")
}

if c.StartupEventHandlingTimeoutSeconds <= 0 {
return "startupEventHandlingTimeoutSeconds", errors.New("value must be > 0")
}

if c.DumpState != nil {
if path, err := c.DumpState.validate(); err != nil {
return fmt.Sprintf("dumpState.%s", path), err
Expand Down
106 changes: 94 additions & 12 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -87,10 +88,13 @@ func makeAutoscaleEnforcerPlugin(

handle: h,
vmClient: vmClient,
// remaining fields are set by p.readClusterState and p.makePrometheusRegistry
state: pluginState{ //nolint:exhaustruct // see above.
state: pluginState{
lock: util.NewChanMutex(),
ongoingMigrationDeletions: make(map[util.NamespacedName]int),
pods: make(map[util.NamespacedName]*podState),
nodes: make(map[string]*nodeState),
maxTotalReservableCPU: 0, // set during event handling
maxTotalReservableMem: 0, // set during event handling
conf: config,
},
metrics: PromMetrics{}, //nolint:exhaustruct // set by makePrometheusRegistry
Expand All @@ -105,8 +109,8 @@ func makeAutoscaleEnforcerPlugin(
}

// makePrometheusRegistry sets p.metrics, which we need to do before calling
// newEventQueueSet and readClusterState, because we set metrics eventQueueSet and for each
// node while we build the state.
// newEventQueueSet or handling events, because we set metrics in eventQueueSet and for each
// node as watch events get handled.
promReg := p.makePrometheusRegistry()

// Start watching Pod/VM events, adding them to a shared queue to process them in order
Expand All @@ -118,15 +122,42 @@ func makeAutoscaleEnforcerPlugin(
}
}

// A note about event handling:
//
// Before returning from this function, we want to make sure that we're caught up to the watch
// events generated by initially reading the cluster state (i.e. the initial List()).
//
// Doing this is non-trivial, so we accomplish it in pieces:
//
// 1. Using watch.WatchModeSync to force queueing events *before* returning from creating the
// watcher (note: and therefore, before any start to be handled); and
// 2. For each event created from the initial List(), increment a counter to track the number of
// these events, and decrement it as events are handled.
//
// The initial state building is complete when the counter reaches zero, at which point we close
// the channel that this function will wait on.

var initEventsCount atomic.Int32
var initEvents *eventCounter
incEventCount := func() { initEventsCount.Add(1) }

hlogger := logger.Named("handlers")
nwc := nodeWatchCallbacks{
submitNodeDeletion: func(logger *zap.Logger, nodeName string) {
pushToQueue(logger, nodeName, func() { p.handleNodeDeletion(hlogger, nodeName) })
},
}
pwc := podWatchCallbacks{
submitStarted: func(logger *zap.Logger, pod *corev1.Pod) {
pushToQueue(logger, pod.Name, func() { p.handleStarted(hlogger, pod) })
submitStarted: func(logger *zap.Logger, pod *corev1.Pod, preexisting bool) {
if preexisting {
incEventCount()
}
pushToQueue(logger, pod.Name, func() {
p.handleStarted(hlogger, pod)
if preexisting {
initEvents.dec()
}
})
},
submitDeletion: func(logger *zap.Logger, name util.NamespacedName) {
// NOTE: It's important that the name we use here is the same as the one we use for
Expand Down Expand Up @@ -179,7 +210,7 @@ func makeAutoscaleEnforcerPlugin(
podIndex := watch.NewIndexedStore(podStore, watch.NewNameIndex[corev1.Pod]())

logger.Info("Starting VM watcher")
vmStore, err := p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex)
_, err = p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex)
if err != nil {
return nil, fmt.Errorf("Error starting VM watcher: %w", err)
}
Expand All @@ -191,12 +222,13 @@ func makeAutoscaleEnforcerPlugin(

watchMetrics.MustRegister(promReg)

// ... but before handling the events, read the current cluster state:
logger.Info("Reading initial cluster state")
if err = p.readClusterState(ctx, logger, vmStore); err != nil {
return nil, fmt.Errorf("Error reading cluster state: %w", err)
}
// Set up tracking the initial events, now that we know the count:
totalQueued := initEventsCount.Load()
initEvents = newEventCounter(totalQueued)

// Start handling the queued events. Any handling of initial events will gradually reduce
// initEventsCount, and eventually we'll close(initEventsDone) to mark initial event handling as
// complete.
for i := 0; i < config.EventQueueWorkers; i += 1 {
// copy the loop variable to avoid it escaping pre Go 1.22
go func(ctx context.Context, idx int) {
Expand All @@ -216,6 +248,21 @@ func makeAutoscaleEnforcerPlugin(
return nil, fmt.Errorf("Error starting prometheus server: %w", err)
}

// Wait for all the initial events to be handled.
logger.Info("Waiting on initial events processing to be done", zap.Int32("count", totalQueued))
initEventsTimeout := time.Second * time.Duration(p.state.conf.StartupEventHandlingTimeoutSeconds)
select {
case <-initEvents.done():
// Done
case <-time.After(initEventsTimeout):
return nil, fmt.Errorf(
"Timed out waiting on initial events processing to complete after %s (%d remaining)",
initEventsTimeout,
initEvents.getRemaining(),
)
}
logger.Info("Initial events processing complete")

if err := p.startPermitHandler(ctx, logger.Named("agent-handler")); err != nil {
return nil, fmt.Errorf("permit handler: %w", err)
}
Expand All @@ -235,6 +282,41 @@ func makeAutoscaleEnforcerPlugin(
return &p, nil
}

// monotonically decreasing event counter that closes a channel once all events have been completed
// with dec().
//
// Used to make sure we've processed all the initial events before returning from
// makeAutoscaleEnforcerPlugin().
type eventCounter struct {
remaining atomic.Int32
signalDone chan struct{}
}

func newEventCounter(remaining int32) *eventCounter {
c := &eventCounter{
remaining: atomic.Int32{},
signalDone: make(chan struct{}),
}

c.remaining.Store(remaining)
return c
}

func (c *eventCounter) dec() {
r := c.remaining.Add(-1)
if r == 0 {
close(c.signalDone)
}
}

func (c *eventCounter) getRemaining() int32 {
return c.remaining.Load()
}

func (c *eventCounter) done() <-chan struct{} {
return c.signalDone
}

// Name returns the name of the AutoscaleEnforcer plugin
//
// Required for framework.Plugin
Expand Down
Loading

0 comments on commit ceceb07

Please sign in to comment.