diff --git a/deploy/scheduler/config_map.yaml b/deploy/scheduler/config_map.yaml index 88ea2dc04..915fc7fab 100644 --- a/deploy/scheduler/config_map.yaml +++ b/deploy/scheduler/config_map.yaml @@ -34,6 +34,7 @@ data: }, "schedulerName": "autoscale-scheduler", "eventQueueWorkers": 64, + "startupEventHandlingTimeoutSeconds": 15, "dumpState": { "port": 10298, "timeoutSeconds": 5 diff --git a/pkg/plugin/ARCHITECTURE.md b/pkg/plugin/ARCHITECTURE.md index 174935228..9ff153bf2 100644 --- a/pkg/plugin/ARCHITECTURE.md +++ b/pkg/plugin/ARCHITECTURE.md @@ -58,8 +58,9 @@ The entrypoint for plugin initialization is through the `NewAutoscaleEnforcerPlu `plugin.go`, which in turn: 1. Fetches the scheduler config (and starts watching for changes) (see: [`config.go`]) - 2. Starts watching for pod deletion events (see: [`watch.go`]) - 3. Loads an initial state from the cluster's resources (see: `readClusterState` in [`state.go`]) + 2. Starts watching for pod events, among others (see: [`watch.go`]) + 3. Loads an initial state from the cluster's resources (by waiting for all the initial Pod start + events to be handled) 4. Spawns the HTTP server for handling `autoscaler-agent` requests (see: [`run.go`]) The plugins we implement are: diff --git a/pkg/plugin/config.go b/pkg/plugin/config.go index 0078628a8..734e8ae0c 100644 --- a/pkg/plugin/config.go +++ b/pkg/plugin/config.go @@ -30,6 +30,14 @@ type Config struct { // event queue. EventQueueWorkers int `json:"eventQueueWorkers"` + // StartupEventHandlingTimeoutSeconds gives the maximum duration, in seconds, that we are + // allowed to wait to finish handling all of the initial events generated by reading the cluster + // state on startup. + // + // If event processing takes longer than this time, then plugin creation will fail, and the + // scheduler pod will retry. + StartupEventHandlingTimeoutSeconds int `json:"startupEventHandlingTimeoutSeconds"` + // RandomizeScores, if true, will cause the scheduler to score a node with a random number in // the range [minScore + 1, trueScore], instead of the trueScore RandomizeScores bool `json:"randomizeScores"` @@ -132,6 +140,10 @@ func (c *Config) validate() (string, error) { return "eventQueueWorkers", errors.New("value must be > 0") } + if c.StartupEventHandlingTimeoutSeconds <= 0 { + return "startupEventHandlingTimeoutSeconds", errors.New("value must be > 0") + } + if c.DumpState != nil { if path, err := c.DumpState.validate(); err != nil { return fmt.Sprintf("dumpState.%s", path), err diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 6ca4f30e9..8dc8eb749 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync/atomic" "time" "go.uber.org/zap" @@ -87,10 +88,13 @@ func makeAutoscaleEnforcerPlugin( handle: h, vmClient: vmClient, - // remaining fields are set by p.readClusterState and p.makePrometheusRegistry - state: pluginState{ //nolint:exhaustruct // see above. + state: pluginState{ lock: util.NewChanMutex(), ongoingMigrationDeletions: make(map[util.NamespacedName]int), + pods: make(map[util.NamespacedName]*podState), + nodes: make(map[string]*nodeState), + maxTotalReservableCPU: 0, // set during event handling + maxTotalReservableMem: 0, // set during event handling conf: config, }, metrics: PromMetrics{}, //nolint:exhaustruct // set by makePrometheusRegistry @@ -105,8 +109,8 @@ func makeAutoscaleEnforcerPlugin( } // makePrometheusRegistry sets p.metrics, which we need to do before calling - // newEventQueueSet and readClusterState, because we set metrics eventQueueSet and for each - // node while we build the state. + // newEventQueueSet or handling events, because we set metrics in eventQueueSet and for each + // node as watch events get handled. promReg := p.makePrometheusRegistry() // Start watching Pod/VM events, adding them to a shared queue to process them in order @@ -118,6 +122,25 @@ func makeAutoscaleEnforcerPlugin( } } + // A note about event handling: + // + // Before returning from this function, we want to make sure that we're caught up to the watch + // events generated by initially reading the cluster state (i.e. the initial List()). + // + // Doing this is non-trivial, so we accomplish it in pieces: + // + // 1. Using watch.WatchModeSync to force queueing events *before* returning from creating the + // watcher (note: and therefore, before any start to be handled); and + // 2. For each event created from the initial List(), increment a counter to track the number of + // these events, and decrement it as events are handled. + // + // The initial state building is complete when the counter reaches zero, at which point we close + // the channel that this function will wait on. + + var initEventsCount atomic.Int32 + var initEvents *eventCounter + incEventCount := func() { initEventsCount.Add(1) } + hlogger := logger.Named("handlers") nwc := nodeWatchCallbacks{ submitNodeDeletion: func(logger *zap.Logger, nodeName string) { @@ -125,8 +148,16 @@ func makeAutoscaleEnforcerPlugin( }, } pwc := podWatchCallbacks{ - submitStarted: func(logger *zap.Logger, pod *corev1.Pod) { - pushToQueue(logger, pod.Name, func() { p.handleStarted(hlogger, pod) }) + submitStarted: func(logger *zap.Logger, pod *corev1.Pod, preexisting bool) { + if preexisting { + incEventCount() + } + pushToQueue(logger, pod.Name, func() { + p.handleStarted(hlogger, pod) + if preexisting { + initEvents.dec() + } + }) }, submitDeletion: func(logger *zap.Logger, name util.NamespacedName) { // NOTE: It's important that the name we use here is the same as the one we use for @@ -179,7 +210,7 @@ func makeAutoscaleEnforcerPlugin( podIndex := watch.NewIndexedStore(podStore, watch.NewNameIndex[corev1.Pod]()) logger.Info("Starting VM watcher") - vmStore, err := p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex) + _, err = p.watchVMEvents(ctx, logger, watchMetrics, vwc, podIndex) if err != nil { return nil, fmt.Errorf("Error starting VM watcher: %w", err) } @@ -191,12 +222,13 @@ func makeAutoscaleEnforcerPlugin( watchMetrics.MustRegister(promReg) - // ... but before handling the events, read the current cluster state: - logger.Info("Reading initial cluster state") - if err = p.readClusterState(ctx, logger, vmStore); err != nil { - return nil, fmt.Errorf("Error reading cluster state: %w", err) - } + // Set up tracking the initial events, now that we know the count: + totalQueued := initEventsCount.Load() + initEvents = newEventCounter(totalQueued) + // Start handling the queued events. Any handling of initial events will gradually reduce + // initEventsCount, and eventually we'll close(initEventsDone) to mark initial event handling as + // complete. for i := 0; i < config.EventQueueWorkers; i += 1 { // copy the loop variable to avoid it escaping pre Go 1.22 go func(ctx context.Context, idx int) { @@ -216,6 +248,21 @@ func makeAutoscaleEnforcerPlugin( return nil, fmt.Errorf("Error starting prometheus server: %w", err) } + // Wait for all the initial events to be handled. + logger.Info("Waiting on initial events processing to be done", zap.Int32("count", totalQueued)) + initEventsTimeout := time.Second * time.Duration(p.state.conf.StartupEventHandlingTimeoutSeconds) + select { + case <-initEvents.done(): + // Done + case <-time.After(initEventsTimeout): + return nil, fmt.Errorf( + "Timed out waiting on initial events processing to complete after %s (%d remaining)", + initEventsTimeout, + initEvents.getRemaining(), + ) + } + logger.Info("Initial events processing complete") + if err := p.startPermitHandler(ctx, logger.Named("agent-handler")); err != nil { return nil, fmt.Errorf("permit handler: %w", err) } @@ -235,6 +282,41 @@ func makeAutoscaleEnforcerPlugin( return &p, nil } +// monotonically decreasing event counter that closes a channel once all events have been completed +// with dec(). +// +// Used to make sure we've processed all the initial events before returning from +// makeAutoscaleEnforcerPlugin(). +type eventCounter struct { + remaining atomic.Int32 + signalDone chan struct{} +} + +func newEventCounter(remaining int32) *eventCounter { + c := &eventCounter{ + remaining: atomic.Int32{}, + signalDone: make(chan struct{}), + } + + c.remaining.Store(remaining) + return c +} + +func (c *eventCounter) dec() { + r := c.remaining.Add(-1) + if r == 0 { + close(c.signalDone) + } +} + +func (c *eventCounter) getRemaining() int32 { + return c.remaining.Load() +} + +func (c *eventCounter) done() <-chan struct{} { + return c.signalDone +} + // Name returns the name of the AutoscaleEnforcer plugin // // Required for framework.Plugin diff --git a/pkg/plugin/state.go b/pkg/plugin/state.go index 4ca1b69d4..9f8db29dd 100644 --- a/pkg/plugin/state.go +++ b/pkg/plugin/state.go @@ -1268,324 +1268,3 @@ func (e *AutoscaleEnforcer) startMigration(ctx context.Context, logger *zap.Logg return true, nil } - -// readClusterState sets the initial node and pod maps for the plugin's state, getting its -// information from the K8s cluster -// -// This method expects that all pluginState fields except pluginState.conf are set to their zero -// value, and will panic otherwise. Accordingly, this is only called during plugin initialization. -func (p *AutoscaleEnforcer) readClusterState( - ctx context.Context, - logger *zap.Logger, - vmStore *watch.Store[vmapi.VirtualMachine], -) error { - logger = logger.With(zap.String("action", "read cluster state")) - - p.state.lock.Lock() - defer p.state.lock.Unlock() - - // Check that all fields are equal to their zero value, per the function documentation. - hasNonNilField := p.state.nodes != nil || p.state.pods != nil || - p.state.maxTotalReservableCPU != 0 || p.state.maxTotalReservableMem != 0 - - if hasNonNilField { - panic(errors.New("readClusterState called with non-nil pluginState field")) - } - if p.state.conf == nil { - panic(errors.New("readClusterState called with nil config")) - } - - // There's a couple challenges we have to deal with here, particularly around discrepancies in - // the list of pods vs VMs. - // - // Our solution to this problem is _mostly_ to prefer going from VMs -> pods (i.e. "get pod info - // related to VM") rather than the other way around, and going through the non-VM pods - // separately. If a VM says it belongs to a pod that doesn't exist, we ignore the error and move - // on. Because of this, it's better to fetch the VMs first and then pods, so that any VM that - // was present at the start and still running has its pod visible to us. VMs that are started - // between the VM listing and pod listing won't be handled, but that's ok because we are - // probably about to schedule them anyways. - // - // As a final note: the VM store is already present, and its startup guarantees that the listing - // has already been made. - - logger.Info("Fetching VMs from existing store") - vms := vmStore.Items() - - logger.Info("Listing Pods") - pods, err := p.handle.ClientSet().CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("Error listing Pods: %w", err) - } - - p.state.nodes = make(map[string]*nodeState) - p.state.pods = make(map[util.NamespacedName]*podState) - - // Store the VMs by name, so that we can access them as we're going through pods - logger.Info("Building initial vmSpecs map") - vmSpecs := make(map[util.NamespacedName]*vmapi.VirtualMachine) - for _, vm := range vms { - vmSpecs[util.GetNamespacedName(vm)] = vm - } - - skippedPods := 0 - - // Add all VM pods to the map, by filtering out from the pod list. We'll take care of the non-VM - // pods in a separate pass after this. - logger.Info("Adding VM Pods to pods map") - for i := range pods.Items { - pod := &pods.Items[i] - podName := util.GetNamespacedName(pod) - - vmName := util.TryPodOwnerVirtualMachine(pod) - if vmName == nil { - continue - } - - // new logger just for this loop iteration, with info about the Pod - logger := logger.With(util.PodNameFields(pod)) - if pod.Spec.NodeName != "" { - logger = logger.With(zap.String("node", pod.Spec.NodeName)) - } - - migrationName := util.TryPodOwnerVirtualMachineMigration(pod) - if migrationName != nil { - logger = logger.With(zap.Object("virtualmachinemigration", *migrationName)) - } - - logSkip := func(format string, args ...any) { - logger.Warn("Skipping VM", zap.Error(fmt.Errorf(format, args...))) - skippedPods += 1 - } - - if p.state.conf.ignoredNamespace(pod.Namespace) { - logSkip("VM is in ignored namespace") - continue - } else if pod.Spec.NodeName == "" { - logSkip("VM pod's Spec.NodeName = \"\" (maybe it hasn't been scheduled yet?)") - continue - } - - vmInfo, err := api.ExtractVmInfoFromPod(logger, pod) - if err != nil { - return fmt.Errorf("Error extracting VM info for %v: %w", vmName, err) - } - - ns, err := p.state.getOrFetchNodeState(ctx, logger, p.metrics, p.nodeStore, pod.Spec.NodeName) - if err != nil { - logSkip("Couldn't find Node that VM pod is on (maybe it has since been removed?): %w", err) - continue - } - - // Build the pod state, update the node - ps := &podState{ - name: podName, - node: ns, - cpu: podResourceState[vmapi.MilliCPU]{ - Reserved: vmInfo.Cpu.Max, - Buffer: vmInfo.Cpu.Max - vmInfo.Cpu.Use, - CapacityPressure: 0, - Min: vmInfo.Cpu.Min, - Max: vmInfo.Cpu.Max, - }, - mem: podResourceState[api.Bytes]{ - Reserved: vmInfo.Max().Mem, - Buffer: vmInfo.Max().Mem - vmInfo.Using().Mem, - CapacityPressure: 0, - Min: vmInfo.Min().Mem, - Max: vmInfo.Max().Mem, - }, - vm: &vmPodState{ - Name: *vmName, - - MqIndex: -1, - Metrics: nil, - MigrationState: nil, - - MemSlotSize: vmInfo.Mem.SlotSize, - Config: vmInfo.Config, - }, - } - - // If scaling isn't enabled *or* the pod is involved in an ongoing migration, then we can be - // more precise about usage (because scaling is forbidden while migrating). - if !vmInfo.Config.ScalingEnabled || migrationName != nil { - ps.cpu.Buffer = 0 - ps.cpu.Reserved = vmInfo.Cpu.Use - - ps.mem.Buffer = 0 - ps.mem.Reserved = vmInfo.Using().Mem - } - - oldNodeCPUReserved := ns.cpu.Reserved - oldNodeMemReserved := ns.mem.Reserved - oldNodeCPUBuffer := ns.cpu.Buffer - oldNodeMemBuffer := ns.mem.Buffer - - ns.cpu.Reserved += ps.cpu.Reserved - ns.cpu.Buffer += ps.cpu.Buffer - ns.mem.Reserved += ps.mem.Reserved - ns.mem.Buffer += ps.mem.Buffer - - cpuVerdict := fmt.Sprintf( - "pod = %v/%v (node %v -> %v / %v, %v -> %v buffer)", - ps.cpu.Reserved, vmInfo.Cpu.Max, oldNodeCPUReserved, ns.cpu.Reserved, ns.cpu.Total, oldNodeCPUBuffer, ns.cpu.Buffer, - ) - memVerdict := fmt.Sprintf( - "pod = %v/%v (node %v -> %v / %v, %v -> %v buffer", - ps.mem.Reserved, vmInfo.Max().Mem, oldNodeMemReserved, ns.mem.Reserved, ns.mem.Total, oldNodeMemBuffer, ns.mem.Buffer, - ) - - logger.Info( - "Adding VM pod to node", - zap.Object("verdict", verdictSet{ - cpu: cpuVerdict, - mem: memVerdict, - }), - ) - - ns.updateMetrics(p.metrics) - - ns.pods[podName] = ps - p.state.pods[podName] = ps - } - - // Add the non-VM pods to the map - logger.Info("Adding non-VM Pods to pods map") - for i := range pods.Items { - pod := &pods.Items[i] - podName := util.GetNamespacedName(pod) - - if util.TryPodOwnerVirtualMachine(pod) != nil { - continue // skip VMs - } - - // new logger just for this loop iteration, with info about the Pod - logger := logger.With(util.PodNameFields(pod)) - if pod.Spec.NodeName != "" { - logger = logger.With(zap.String("node", pod.Spec.NodeName)) - } - - logSkip := func(format string, args ...any) { - logger.Warn("Skipping non-VM pod", zap.Error(fmt.Errorf(format, args...))) - skippedPods += 1 - } - - if util.PodCompleted(pod) { - logSkip("Pod is in its final, complete state (phase = %q), so will not use any resources", pod.Status.Phase) - continue - } - - if p.state.conf.ignoredNamespace(pod.Namespace) { - logSkip("non-VM pod is in ignored namespace") - continue - } - - if pod.Spec.NodeName == "" { - logSkip("Spec.NodeName = \"\" (maybe it hasn't been scheduled yet?)") - continue - } - - ns, err := p.state.getOrFetchNodeState(ctx, logger, p.metrics, p.nodeStore, pod.Spec.NodeName) - if err != nil { - logSkip("Couldn't find Node that non-VM Pod is on (maybe it has since been removed?): %w", err) - continue - } - - // TODO: this is largely duplicated from Reserve, so we should deduplicate it (probably into - // trans.go or something). - podRes := extractPodResources(pod) - - oldNodeCpuReserved := ns.cpu.Reserved - oldNodeMemReserved := ns.mem.Reserved - - ns.cpu.Reserved += podRes.VCPU - ns.mem.Reserved += podRes.Mem - - ps := &podState{ - name: podName, - node: ns, - vm: nil, - cpu: podResourceState[vmapi.MilliCPU]{ - Reserved: podRes.VCPU, - Buffer: 0, - CapacityPressure: 0, - Min: podRes.VCPU, - Max: podRes.VCPU, - }, - mem: podResourceState[api.Bytes]{ - Reserved: podRes.Mem, - Buffer: 0, - CapacityPressure: 0, - Min: podRes.Mem, - Max: podRes.Mem, - }, - } - - cpuVerdict := fmt.Sprintf( - "pod %v (node %v -> %v)", - &podRes.VCPU, oldNodeCpuReserved, ns.cpu.Reserved, - ) - memVerdict := fmt.Sprintf( - "pod %v (node %v -> %v)", - &podRes.Mem, oldNodeMemReserved, ns.mem.Reserved, - ) - - logger.Info( - "Adding non-VM pod to node", - zap.Object("verdict", verdictSet{ - cpu: cpuVerdict, - mem: memVerdict, - }), - ) - - ns.updateMetrics(p.metrics) - - ns.pods[podName] = ps - p.state.pods[podName] = ps - } - - // Human-visible sanity checks on item counts: - logger.Info(fmt.Sprintf( - "Done loading state, found: %d nodes, %d Pods (%d skipped)", - len(p.state.nodes), len(p.state.pods), skippedPods, - )) - - // At this point, everything's been added to the state. We just need to make sure that we're not - // over-budget on anything: - logger.Info("Checking for any over-budget nodes") - overBudgetCount := 0 - for nodeName, ns := range p.state.nodes { - overBudget := []string{} - - if ns.cpu.Reserved-ns.cpu.Buffer > ns.cpu.Total { - overBudget = append(overBudget, fmt.Sprintf( - "expected CPU usage (reserved %d - buffer %d) > total %d", - ns.cpu.Reserved, ns.cpu.Buffer, ns.cpu.Total, - )) - } - if ns.mem.Reserved > ns.mem.Total { - overBudget = append(overBudget, fmt.Sprintf( - "expected memory usage (reserved %d - buffer %d) > total %d", - ns.mem.Reserved, ns.mem.Buffer, ns.mem.Total, - )) - } - - if len(overBudget) == 0 { - continue - } - - overBudgetCount += 1 - message := overBudget[0] - if len(overBudget) == 2 { - message = fmt.Sprintf("%s and %s", overBudget[0], overBudget[1]) - } - logger.Error("Node is over budget", zap.String("node", nodeName), zap.String("error", message)) - } - - if overBudgetCount != 0 { - logger.Error(fmt.Sprintf("Found %d nodes over budget", overBudgetCount)) - } - - return nil -} diff --git a/pkg/plugin/watch.go b/pkg/plugin/watch.go index a3a3c1d8f..22f8ddfbd 100644 --- a/pkg/plugin/watch.go +++ b/pkg/plugin/watch.go @@ -51,7 +51,7 @@ func (e *AutoscaleEnforcer) watchNodeEvents( watch.Accessors[*corev1.NodeList, corev1.Node]{ Items: func(list *corev1.NodeList) []corev1.Node { return list.Items }, }, - watch.InitModeSync, + watch.InitModeSync, // Doesn't matter because AddFunc is nil and node store is only used for events. metav1.ListOptions{}, watch.HandlerFuncs[*corev1.Node]{ DeleteFunc: func(node *corev1.Node, mayBeStale bool) { @@ -63,7 +63,7 @@ func (e *AutoscaleEnforcer) watchNodeEvents( } type podWatchCallbacks struct { - submitStarted func(*zap.Logger, *corev1.Pod) + submitStarted func(_ *zap.Logger, _ *corev1.Pod, preexisting bool) submitDeletion func(*zap.Logger, util.NamespacedName) submitStartMigration func(_ *zap.Logger, podName, migrationName util.NamespacedName, source bool) submitEndMigration func(_ *zap.Logger, podName, migrationName util.NamespacedName) @@ -103,7 +103,7 @@ func (e *AutoscaleEnforcer) watchPodEvents( watch.Accessors[*corev1.PodList, corev1.Pod]{ Items: func(list *corev1.PodList) []corev1.Pod { return list.Items }, }, - watch.InitModeSync, // note: doesn't matter, because AddFunc = nil. + watch.InitModeSync, // required so that events are queued before watchPodEvents() returns metav1.ListOptions{}, watch.HandlerFuncs[*corev1.Pod]{ AddFunc: func(pod *corev1.Pod, preexisting bool) { @@ -123,7 +123,7 @@ func (e *AutoscaleEnforcer) watchPodEvents( // definitely don't miss anything). logger.Warn("Received add event for new Pod already running", zap.Object("pod", name)) } - callbacks.submitStarted(logger, pod) + callbacks.submitStarted(logger, pod, preexisting) } }, UpdateFunc: func(oldPod *corev1.Pod, newPod *corev1.Pod) { @@ -137,7 +137,7 @@ func (e *AutoscaleEnforcer) watchPodEvents( // Check if a pod is now running. if oldPod.Status.Phase == corev1.PodPending && newPod.Status.Phase == corev1.PodRunning { logger.Info("Received update event for Pod now running", zap.Object("pod", name)) - callbacks.submitStarted(logger, newPod) + callbacks.submitStarted(logger, newPod, false) } // Check if pod is "completed" - handle that the same as deletion. @@ -239,7 +239,7 @@ func (e *AutoscaleEnforcer) watchVMEvents( watch.Accessors[*vmapi.VirtualMachineList, vmapi.VirtualMachine]{ Items: func(list *vmapi.VirtualMachineList) []vmapi.VirtualMachine { return list.Items }, }, - watch.InitModeSync, // Must sync here so that initial cluster state is read correctly. + watch.InitModeSync, // Doesn't matter because AddFunc is nil, and vmStore is only used for events. metav1.ListOptions{}, watch.HandlerFuncs[*vmapi.VirtualMachine]{ UpdateFunc: func(oldVM, newVM *vmapi.VirtualMachine) {