Skip to content

Commit

Permalink
plugin: Replace readClusterState with existing watch events
Browse files Browse the repository at this point in the history
In short, readClusterState is super complicated, separately reimplements
the reserveResources() logic, and may be the source of several
startup-related bugs (probably #671 and #852).

So, given that we *already* have a pathway for updating our internal
state from changes in the cluster (i.e. the watch events), we should
just use that instead.
  • Loading branch information
sharnoff committed May 10, 2024
1 parent 13fa89e commit b1e53de
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 341 deletions.
1 change: 1 addition & 0 deletions deploy/scheduler/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data:
},
"schedulerName": "autoscale-scheduler",
"eventQueueWorkers": 64,
"startupEventHandlingTimeoutSeconds": 15,
"dumpState": {
"port": 10298,
"timeoutSeconds": 5
Expand Down
5 changes: 3 additions & 2 deletions pkg/plugin/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ The entrypoint for plugin initialization is through the `NewAutoscaleEnforcerPlu
`plugin.go`, which in turn:

1. Fetches the scheduler config (and starts watching for changes) (see: [`config.go`])
2. Starts watching for pod deletion events (see: [`watch.go`])
3. Loads an initial state from the cluster's resources (see: `readClusterState` in [`state.go`])
2. Starts watching for pod events, among others (see: [`watch.go`])
3. Loads an initial state from the cluster's resources (by waiting for all the initial Pod start
events to be handled)
4. Spawns the HTTP server for handling `autoscaler-agent` requests (see: [`run.go`])

The plugins we implement are:
Expand Down
12 changes: 12 additions & 0 deletions pkg/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type Config struct {
// event queue.
EventQueueWorkers int `json:"eventQueueWorkers"`

// StartupEventHandlingTimeoutSeconds gives the maximum duration, in seconds, that we are
// allowed to wait to finish handling all of the initial events generated by reading the cluster
// state on startup.
//
// If event processing takes longer than this time, then plugin creation will fail, and the
// scheduler pod will retry.
StartupEventHandlingTimeoutSeconds int `json:"startupEventHandlingTimeoutSeconds"`

// RandomizeScores, if true, will cause the scheduler to score a node with a random number in
// the range [minScore + 1, trueScore], instead of the trueScore
RandomizeScores bool `json:"randomizeScores"`
Expand Down Expand Up @@ -132,6 +140,10 @@ func (c *Config) validate() (string, error) {
return "eventQueueWorkers", errors.New("value must be > 0")
}

if c.StartupEventHandlingTimeoutSeconds <= 0 {
return "startupEventHandlingTimeoutSeconds", errors.New("value must be > 0")
}

if c.DumpState != nil {
if path, err := c.DumpState.validate(); err != nil {
return fmt.Sprintf("dumpState.%s", path), err
Expand Down
106 changes: 94 additions & 12 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -87,10 +88,13 @@ func makeAutoscaleEnforcerPlugin(

handle: h,
vmClient: vmClient,
// remaining fields are set by p.readClusterState and p.makePrometheusRegistry
state: pluginState{ //nolint:exhaustruct // see above.
state: pluginState{
lock: util.NewChanMutex(),
ongoingMigrationDeletions: make(map[util.NamespacedName]int),
pods: make(map[util.NamespacedName]*podState),
nodes: make(map[string]*nodeState),
maxTotalReservableCPU: 0, // set during event handling
maxTotalReservableMem: 0, // set during event handling
conf: config,
},
metrics: PromMetrics{}, //nolint:exhaustruct // set by makePrometheusRegistry
Expand All @@ -105,8 +109,8 @@ func makeAutoscaleEnforcerPlugin(
}

// makePrometheusRegistry sets p.metrics, which we need to do before calling
// newEventQueueSet and readClusterState, because we set metrics eventQueueSet and for each
// node while we build the state.
// newEventQueueSet or handling events, because we set metrics in eventQueueSet and for each
// node as watch events get handled.
promReg := p.makePrometheusRegistry()

// Start watching Pod/VM events, adding them to a shared queue to process them in order
Expand All @@ -118,15 +122,42 @@ func makeAutoscaleEnforcerPlugin(
}
}

// A note about event handling:
//
// Before returning from this function, we want to make sure that we're caught up to the watch
// events generated by initially reading the cluster state (i.e. the initial List()).
//
// Doing this is non-trivial, so we accomplish it in pieces:
//
// 1. Using watch.WatchModeSync to force queueing events *before* returning from creating the
// watcher (note: and therefore, before any start to be handled); and
// 2. For each event created from the initial List(), increment a counter to track the number of
// these events, and decrement it as events are handled.
//
// The initial state building is complete when the counter reaches zero, at which point we close
// the channel that this function will wait on.

var initEventsCount atomic.Int32
var initEvents *eventCounter
incEventCount := func() { initEventsCount.Add(1) }

hlogger := logger.Named("handlers")
nwc := nodeWatchCallbacks{
submitNodeDeletion: func(logger *zap.Logger, nodeName string) {
pushToQueue(logger, nodeName, func() { p.handleNodeDeletion(hlogger, nodeName) })
},
}
pwc := podWatchCallbacks{
submitStarted: func(logger *zap.Logger, pod *corev1.Pod) {
pushToQueue(logger, pod.Name, func() { p.handleStarted(hlogger, pod) })
submitStarted: func(logger *zap.Logger, pod *corev1.Pod, preexisting bool) {
if preexisting {
incEventCount()
}
pushToQueue(logger, pod.Name, func() {
p.handleStarted(hlogger, pod)
if preexisting {
initEvents.dec()
}
})
},
submitDeletion: func(logger *zap.Logger, name util.NamespacedName) {
// NOTE: It's important that the name we use here is the same as the one we use for
Expand Down Expand Up @@ -179,7 +210,7 @@ func makeAutoscaleEnforcerPlugin(
podIndex := watch.NewIndexedStore(podStore, watch.NewNameIndex[corev1.Pod]())

logger.Info("Starting VM watcher")
vmStore, err := p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex)
_, err = p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex)
if err != nil {
return nil, fmt.Errorf("Error starting VM watcher: %w", err)
}
Expand All @@ -191,12 +222,13 @@ func makeAutoscaleEnforcerPlugin(

watchMetrics.MustRegister(promReg)

// ... but before handling the events, read the current cluster state:
logger.Info("Reading initial cluster state")
if err = p.readClusterState(ctx, logger, vmStore); err != nil {
return nil, fmt.Errorf("Error reading cluster state: %w", err)
}
// Set up tracking the initial events, now that we know the count:
totalQueued := initEventsCount.Load()
initEvents = newEventCounter(totalQueued)

// Start handling the queued events. Any handling of initial events will gradually reduce
// initEventsCount, and eventually we'll close(initEventsDone) to mark initial event handling as
// complete.
for i := 0; i < config.EventQueueWorkers; i += 1 {
// copy the loop variable to avoid it escaping pre Go 1.22
go func(ctx context.Context, idx int) {
Expand All @@ -216,6 +248,21 @@ func makeAutoscaleEnforcerPlugin(
return nil, fmt.Errorf("Error starting prometheus server: %w", err)
}

// Wait for all the initial events to be handled.
logger.Info("Waiting on initial events processing to be done", zap.Int32("count", totalQueued))
initEventsTimeout := time.Second * time.Duration(p.state.conf.StartupEventHandlingTimeoutSeconds)
select {
case <-initEvents.done():
// Done
case <-time.After(initEventsTimeout):
return nil, fmt.Errorf(
"Timed out waiting on initial events processing to complete after %s (%d remaining)",
initEventsTimeout,
initEvents.getRemaining(),
)
}
logger.Info("Initial events processing complete")

if err := p.startPermitHandler(ctx, logger.Named("agent-handler")); err != nil {
return nil, fmt.Errorf("permit handler: %w", err)
}
Expand All @@ -235,6 +282,41 @@ func makeAutoscaleEnforcerPlugin(
return &p, nil
}

// monotonically decreasing event counter that closes a channel once all events have been completed
// with dec().
//
// Used to make sure we've processed all the initial events before returning from
// makeAutoscaleEnforcerPlugin().
type eventCounter struct {
remaining atomic.Int32
signalDone chan struct{}
}

func newEventCounter(remaining int32) *eventCounter {
c := &eventCounter{
remaining: atomic.Int32{},
signalDone: make(chan struct{}),
}

c.remaining.Store(remaining)
return c
}

func (c *eventCounter) dec() {
r := c.remaining.Add(-1)
if r == 0 {
close(c.signalDone)
}
}

func (c *eventCounter) getRemaining() int32 {
return c.remaining.Load()
}

func (c *eventCounter) done() <-chan struct{} {
return c.signalDone
}

// Name returns the name of the AutoscaleEnforcer plugin
//
// Required for framework.Plugin
Expand Down
Loading

0 comments on commit b1e53de

Please sign in to comment.