diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 8dc8eb749..394a310df 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -654,19 +654,17 @@ func (e *AutoscaleEnforcer) Score( return 0, framework.NewStatus(framework.Error, "Error fetching state for node") } - var resources api.Resources - if vmInfo != nil { - resources = vmInfo.Using() - } else { - resources = extractPodResources(pod) - } - // Special case: return minimum score if we don't have room - noRoom := resources.VCPU > node.remainingReservableCPU() || - resources.Mem > node.remainingReservableMem() - if noRoom { + overbudget, verdict := e.speculativeReserve(node, vmInfo, pod, false, func(_ verdictSet, overBudget bool) bool { + return overBudget + }) + if overbudget { score := framework.MinNodeScore - logger.Warn("No room on node, giving minimum score (typically handled by Filter method)", zap.Int64("score", score)) + logger.Warn( + "No room on node, giving minimum score (typically handled by Filter method)", + zap.Int64("score", score), + zap.Object("verdict", verdict), + ) return score, nil } diff --git a/pkg/plugin/run.go b/pkg/plugin/run.go index 9ac8b04fa..b49da078c 100644 --- a/pkg/plugin/run.go +++ b/pkg/plugin/run.go @@ -13,6 +13,7 @@ import ( "github.com/tychoish/fun/srv" "go.uber.org/zap" + vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/api" ) @@ -282,30 +283,23 @@ func (e *AutoscaleEnforcer) handleResources( return api.Resources{VCPU: pod.cpu.Reserved, Mem: pod.mem.Reserved}, 200, nil } - if lastPermit != nil { - cpuVerdict := makeResourceTransitioner(&node.cpu, &pod.cpu). - handleLastPermit(lastPermit.VCPU) - memVerdict := makeResourceTransitioner(&node.mem, &pod.mem). - handleLastPermit(lastPermit.Mem) - logger.Info( - "Handled last permit info from pod", - zap.Object("verdict", verdictSet{ - cpu: cpuVerdict, - mem: memVerdict, - }), - ) - } - cpuFactor := cu.VCPU if !supportsFractionalCPU { cpuFactor = 1000 } memFactor := cu.Mem + var lastCPUPermit *vmapi.MilliCPU + var lastMemPermit *api.Bytes + if lastPermit != nil { + lastCPUPermit = &lastPermit.VCPU + lastMemPermit = &lastPermit.Mem + } + cpuVerdict := makeResourceTransitioner(&node.cpu, &pod.cpu). - handleRequested(req.VCPU, startingMigration, cpuFactor) + handleRequested(req.VCPU, lastCPUPermit, startingMigration, cpuFactor) memVerdict := makeResourceTransitioner(&node.mem, &pod.mem). - handleRequested(req.Mem, startingMigration, memFactor) + handleRequested(req.Mem, lastMemPermit, startingMigration, memFactor) logger.Info( "Handled requested resources from pod", diff --git a/pkg/plugin/state.go b/pkg/plugin/state.go index 9f8db29dd..ce134e94e 100644 --- a/pkg/plugin/state.go +++ b/pkg/plugin/state.go @@ -21,6 +21,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/watch" + "github.com/neondatabase/autoscaling/pkg/util/xact" ) // pluginState stores the private state for the plugin, used both within and outside of the @@ -644,45 +645,50 @@ func (e *AutoscaleEnforcer) reserveResources( return false, nil, fmt.Errorf("%s: %w", msg, err) } - var add api.Resources - if vmInfo != nil { - add = vmInfo.Using() - } else { - add = extractPodResources(pod) - } - - shouldDeny := add.VCPU > node.remainingReservableCPU() || add.Mem > node.remainingReservableMem() - - if shouldDeny { - e.metrics.IncReserveShouldDeny(pod, node) - } - - if shouldDeny && opts.allowDeny { - cpuShortVerdict := "NOT ENOUGH" - if add.VCPU <= node.remainingReservableCPU() { - cpuShortVerdict = "OK" - } - memShortVerdict := "NOT ENOUGH" - if add.Mem <= node.remainingReservableMem() { - memShortVerdict = "OK" + accept := func(verdict verdictSet, overBudget bool) bool { + shouldDeny := overBudget + if shouldDeny { + e.metrics.IncReserveShouldDeny(pod, node) } - verdict := verdictSet{ - cpu: fmt.Sprintf( - "need %v, %v of %v used, so %v available (%s)", - add.VCPU, node.cpu.Reserved, node.cpu.Total, node.remainingReservableCPU(), cpuShortVerdict, - ), - mem: fmt.Sprintf( - "need %v, %v of %v used, so %v available (%s)", - add.Mem, node.mem.Reserved, node.mem.Total, node.remainingReservableMem(), memShortVerdict, - ), + if shouldDeny && opts.allowDeny { + logger.Error( + "Can't reserve resources for Pod (not enough available)", + zap.Object("verdict", verdict), + ) + return false } - logger.Error("Can't reserve resources for Pod (not enough available)", zap.Object("verdict", verdict)) - return false, &verdict, nil + if opts.allowDeny { + logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict)) + } else if shouldDeny /* want to deny, but can't */ { + logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict)) + } else /* don't want to deny, but also couldn't if we wanted to */ { + logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict)) + } + return true } - // Construct the final state + ok, verdict := e.speculativeReserve(node, vmInfo, pod, opts.includeBuffer, accept) + return ok, &verdict, nil +} + +// speculativeReserve reserves the pod, and then calls accept() to see whether the pod should +// actually be added. +// +// If accept() returns false, no changes to the state will be made. +func (e *AutoscaleEnforcer) speculativeReserve( + node *nodeState, + vmInfo *api.VmInfo, + pod *corev1.Pod, + includeBuffer bool, + accept func(verdict verdictSet, overBudget bool) bool, +) (ok bool, _ verdictSet) { + + // Construct the speculative state of the pod + // + // We'll pass this into (resourceTransitioner).handleReserve(), but only commit the changes if + // the caller allows us to. var cpuState podResourceState[vmapi.MilliCPU] var memState podResourceState[api.Bytes] @@ -721,31 +727,31 @@ func (e *AutoscaleEnforcer) reserveResources( // If scaling is disabled, we don't have to worry about this, and if there's an ongoing // migration, scaling is forbidden. migrating := util.TryPodOwnerVirtualMachineMigration(pod) != nil - if !vmInfo.Config.ScalingEnabled || migrating || !opts.includeBuffer { + if !vmInfo.Config.ScalingEnabled || migrating || !includeBuffer { cpuState.Buffer = 0 cpuState.Reserved = vmInfo.Using().VCPU memState.Buffer = 0 memState.Reserved = vmInfo.Using().Mem } } else { + res := extractPodResources(pod) + cpuState = podResourceState[vmapi.MilliCPU]{ - Reserved: add.VCPU, + Reserved: res.VCPU, Buffer: 0, CapacityPressure: 0, - Min: add.VCPU, - Max: add.VCPU, + Min: res.VCPU, + Max: res.VCPU, } memState = podResourceState[api.Bytes]{ - Reserved: add.Mem, + Reserved: res.Mem, Buffer: 0, CapacityPressure: 0, - Min: add.Mem, - Max: add.Mem, + Min: res.Mem, + Max: res.Mem, } } - podName := util.GetNamespacedName(pod) - ps := &podState{ name: podName, node: node, @@ -753,56 +759,50 @@ func (e *AutoscaleEnforcer) reserveResources( mem: memState, vm: vmState, } - newNodeReservedCPU := node.cpu.Reserved + ps.cpu.Reserved - newNodeReservedMem := node.mem.Reserved + ps.mem.Reserved - newNodeBufferCPU := node.cpu.Buffer + ps.cpu.Buffer - newNodeBufferMem := node.mem.Buffer + ps.mem.Buffer - - var verdict verdictSet - - if ps.cpu.Buffer != 0 || ps.mem.Buffer != 0 { - verdict = verdictSet{ - cpu: fmt.Sprintf( - "node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v", - node.cpu.Reserved, node.cpu.Buffer, ps.cpu.Reserved, ps.cpu.Buffer, newNodeReservedCPU, newNodeBufferCPU, node.cpu.Total, - ), - mem: fmt.Sprintf( - "node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v", - node.mem.Reserved, node.mem.Buffer, ps.mem.Reserved, ps.mem.Buffer, newNodeReservedMem, newNodeBufferMem, node.mem.Total, - ), + + // Speculatively try reserving the pod. + nodeXactCPU := xact.New(&node.cpu) + nodeXactMem := xact.New(&node.mem) + + cpuOverBudget, cpuVerdict := makeResourceTransitioner(nodeXactCPU.Value(), &ps.cpu).handleReserve() + memOverBudget, memVerdict := makeResourceTransitioner(nodeXactMem.Value(), &ps.mem).handleReserve() + + overBudget := cpuOverBudget || memOverBudget + + verdict := verdictSet{ + cpu: cpuVerdict, + mem: memVerdict, + } + + const verdictNotEnough = "NOT ENOUGH" + const verdictOk = "OK" + + if overBudget { + cpuShortVerdict := verdictNotEnough + if !cpuOverBudget { + cpuShortVerdict = verdictOk } - } else { - verdict = verdictSet{ - cpu: fmt.Sprintf( - "node reserved %v + %v -> %v of total %v", - node.cpu.Reserved, ps.cpu.Reserved, newNodeReservedCPU, node.cpu.Total, - ), - mem: fmt.Sprintf( - "node reserved %v + %v -> %v of total %v", - node.mem.Reserved, ps.mem.Reserved, newNodeReservedMem, node.mem.Total, - ), + verdict.cpu = fmt.Sprintf("%s: %s", cpuShortVerdict, verdict.cpu) + memShortVerdict := verdictNotEnough + if !memOverBudget { + memShortVerdict = verdictOk } + verdict.mem = fmt.Sprintf("%s: %s", memShortVerdict, verdict.mem) } - if opts.allowDeny { - logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict)) - } else if shouldDeny /* but couldn't */ { - logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict)) - } else { - logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict)) + if !accept(verdict, overBudget) { + return false, verdict } - node.cpu.Reserved = newNodeReservedCPU - node.mem.Reserved = newNodeReservedMem - node.cpu.Buffer = newNodeBufferCPU - node.mem.Buffer = newNodeBufferMem + nodeXactCPU.Commit() + nodeXactMem.Commit() node.pods[podName] = ps e.state.pods[podName] = ps node.updateMetrics(e.metrics) - return true, &verdict, nil + return true, verdict } // This method is /basically/ the same as e.Unreserve, but the API is different and it has different diff --git a/pkg/plugin/trans.go b/pkg/plugin/trans.go index 3dd3a488d..1fa345b81 100644 --- a/pkg/plugin/trans.go +++ b/pkg/plugin/trans.go @@ -64,85 +64,205 @@ func (s verdictSet) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } -// handleLastPermit updates reserved values of r.pod and r.node with the most -// recent info about the last permit received by the agent. This allows a new -// scheduler to learn about previous scheduler's state. +// handleReserve adds the resources from the pod to the node, reporting if the node was over-budget // -// Why the new scheduler needs this info? It's always possible for the scheduler -// to get killed and immediately restart, without the agent believing there was -// any disconnect, which could lead to unintentional over-committing of resources -// from the Buffer values if too many agents request upscaling on the first -// request to the scheduler. -func (r resourceTransitioner[T]) handleLastPermit(lastPermit T) (verdict string) { - oldState := r.snapshotState() +// Unlike handleRequested, this method should be called to add a NEW pod to the node. +// +// This is used in combination with Xact to speculatively *try* reserving a pod, and then revert if +// it would result in being over-budget. +func (r resourceTransitioner[T]) handleReserve() (overbudget bool, verdict string) { + callback := func(oldState, newState resourceState[T]) string { + if oldState.pod.Buffer != 0 { + return fmt.Sprintf( + "node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v", + // node reserved %v [buffer %v] + %v [buffer %v] -> + oldState.node.Reserved, oldState.node.Buffer, newState.pod.Reserved, newState.pod.Buffer, + // -> %v [buffer %v] of total %v + newState.node.Reserved, newState.node.Buffer, oldState.node.Total, + ) + } else { + return fmt.Sprintf( + "node reserved %v + %v -> %v of total %v", + oldState.node.Reserved, newState.pod.Reserved, newState.node.Reserved, oldState.node.Total, + ) + } + } + + callbackUnexpected := func(message string) verdictCallback[T] { + return func(_, _ resourceState[T]) string { + panic(errors.New(message)) + } + } + + // Currently, the caller provides the requested value via the Pod's Reserved field. + // In order to convert this to work with handleRequestedGeneric, we need to explicitly represent + // the increase from zero to pod.Reserved, so we do that by setting the Pod's value to zero and + // passing in the requested amount separately. + requested := r.pod.Reserved + r.pod.Reserved = 0 + + verdict = r.handleRequestedGeneric( + requested, + requestedOptions[T]{ + // by setting factor and forceApprovalMinimum to the requested amount, we force that + // handleRequestedGeneric MUST reserve exactly that amount. + // Then, we leave it up to the caller to accept/reject by returning whether the node was + // overbudget, at the very end. + factor: requested, + forceApprovalMinimum: requested, + // only used for migrations + convertIncreaseIntoPressure: false, + // Yes, add buffer, because this is for reserving a pod for the first time. If the pod + // was already known, it's the caller's responsibility to set buffer appropriately. + addBuffer: true, + + callbackNoChange: callback, + callbackDecreaseAutoApproved: callbackUnexpected("got 'decrease approved' from logic to reserve new pod"), + callbackIncreaseTurnedToPressure: callback, + callbackIncreaseRejected: callbackUnexpected("got 'increase rejected' from logic to reserve new pod, but it is infallible"), + callbackIncreasePartiallyApproved: callbackUnexpected("got 'partially approved' from logic to reserve new pod, but it is infallible"), + callbackIncreaseFullyApproved: callback, + }, + ) + + overbudget = r.node.Reserved > r.node.Total + + return overbudget, verdict +} - if lastPermit <= r.pod.Reserved { - r.node.Reserved -= r.pod.Reserved - lastPermit - r.pod.Reserved = lastPermit +// handleRequested updates r.pod and r.node with changes to match the requested resources, within +// what's possible given the remaining resources. +// +// Any permitted increases are required to be a multiple of factor. +// +// Unlike handleReserve, this method should be called to update the resources for a preexisting pod +// on the node. +// +// A pretty-formatted summary of the outcome is returned as the verdict, for logging. +func (r resourceTransitioner[T]) handleRequested( + requested T, + lastPermit *T, + startingMigration bool, + factor T, +) (verdict string) { + normalVerdictCallback := func(oldState, newState resourceState[T]) string { + fmtString := "Register %d%s -> %d%s (pressure %d -> %d); " + + "node reserved %d%s -> %d%s (of %d), " + + "node capacityPressure %d -> %d (%d -> %d spoken for)" - var podBuffer string + var oldPodBuffer string var oldNodeBuffer string var newNodeBuffer string if r.pod.Buffer != 0 { - podBuffer = fmt.Sprintf(" [buffer %d]", r.pod.Buffer) + oldPodBuffer = fmt.Sprintf(" [buffer %d]", oldState.pod.Buffer) oldNodeBuffer = fmt.Sprintf(" [buffer %d]", oldState.node.Buffer) + newNodeBuffer = fmt.Sprintf(" [buffer %d]", newState.node.Buffer) + } - r.node.Buffer -= r.pod.Buffer - r.pod.Buffer = 0 - - newNodeBuffer = fmt.Sprintf(" [buffer %d]", r.node.Buffer) + var wanted string + if newState.pod.Reserved != requested { + wanted = fmt.Sprintf(" (wanted %d)", requested) } - totalReservable := r.node.Total - verdict = fmt.Sprintf( - "pod reserved %d%s -> %d, "+ - "node reserved %d%s -> %d%s (of %d)", - oldState.pod.Reserved, podBuffer, r.pod.Reserved, - oldState.node.Reserved, oldNodeBuffer, r.node.Reserved, newNodeBuffer, totalReservable, + return fmt.Sprintf( + fmtString, + // Register %d%s -> %d%s (pressure %d -> %d) + oldState.pod.Reserved, oldPodBuffer, newState.pod.Reserved, wanted, oldState.pod.CapacityPressure, newState.pod.CapacityPressure, + // node reserved %d%s -> %d%s (of %d) + oldState.node.Reserved, oldNodeBuffer, newState.node.Reserved, newNodeBuffer, oldState.node.Total, + // node capacityPressure %d -> %d (%d -> %d spoken for) + oldState.node.CapacityPressure, newState.node.CapacityPressure, oldState.node.PressureAccountedFor, newState.node.PressureAccountedFor, ) - } else { - // This is an unexpected case that possible to happen in some unlikely scenarios such as: - // 1. Agent receives a permit from scheduler (let’s say it’s equal to `a` for a specific resource) - // 2. scheduler dies - // 3. vm bounds decrease (the max value is `b` and we have `b < a`). - // 4. new scheduler reads the cluster state and sets up the buffer values - // => agent’s last permit is greater (`a`) than plugin’s reserved value (`b`) - // This might also happen in case of processing a stale request from an agent - - verdict = fmt.Sprintf( - "unexpected last permit, no changes: last permit (%v) is greater than pod reserved (%v)", - lastPermit, r.pod.Reserved, + } + + migrationVerdictCallback := func(oldState, newState resourceState[T]) string { + fmtString := "Denying increase %d -> %d because the pod is starting migration; " + + "node capacityPressure %d -> %d (%d -> %d spoken for)" + + return fmt.Sprintf( + fmtString, + // Denying increase %d -> %d because ... + oldState.pod.Reserved, requested, + // node capacityPressure %d -> %d (%d -> %d spoken for) + oldState.node.CapacityPressure, newState.node.CapacityPressure, oldState.node.PressureAccountedFor, newState.node.PressureAccountedFor, ) } - return -} -// handleRequested updates r.pod and r.node with changes to match the requested resources, within -// what's possible given the remaining resources. -// -// Any permitted increases are required to be a multiple of factor. -// -// A pretty-formatted summary of the outcome is returned as the verdict, for logging. -func (r resourceTransitioner[T]) handleRequested(requested T, startingMigration bool, factor T) (verdict string) { - oldState := r.snapshotState() + var forceApprovalMinimum T + if lastPermit != nil { + forceApprovalMinimum = *lastPermit + } - totalReservable := r.node.Total - // note: it's possible to temporarily have reserved > totalReservable, after loading state or - // config change; we have to use SaturatingSub here to account for that. - remainingReservable := util.SaturatingSub(totalReservable, oldState.node.Reserved) + return r.handleRequestedGeneric( + requested, + requestedOptions[T]{ + factor: factor, + forceApprovalMinimum: forceApprovalMinimum, + // Can't increase during migrations. + // + // But we _will_ add the pod's request to the node's pressure, noting that its migration + // will resolve it. + convertIncreaseIntoPressure: startingMigration, + // don't add buffer to the node; autoscaler-agent requests should reset it. + addBuffer: false, + + callbackNoChange: normalVerdictCallback, + callbackDecreaseAutoApproved: normalVerdictCallback, + callbackIncreaseTurnedToPressure: migrationVerdictCallback, + callbackIncreaseRejected: normalVerdictCallback, + callbackIncreasePartiallyApproved: normalVerdictCallback, + callbackIncreaseFullyApproved: normalVerdictCallback, + }, + ) +} - // Note: The correctness of this function depends on the autoscaler-agents and previous - // scheduler being well-behaved. This function will fail to prevent overcommitting when: +type requestedOptions[T constraints.Unsigned] struct { + // factor provides a multiple binding the result of any increases from handleRequestedGeneric() // - // 1. A previous scheduler allowed more than it should have, and the autoscaler-agents are - // sticking to those resource levels; or - // 2. autoscaler-agents are making an initial communication that requests an increase from - // their current resource allocation + // For handling autoscaler-agent requests, this is the value of a compute unit's worth of that + // resource (e.g. 0.25 CPU or 1 GiB memory). + // For initially reserving a Pod, factor is set equal to the total additional resources, which + // turns handleRequestedGeneric() into a binary function that either grants the entire request, + // or none of it. + factor T + + // forceApprovalMinimum sets the threshold above which handleRequestedGeneric() is allowed to + // reject the request - i.e. if the request is less than or equal to forceApprovalMinimum, it + // must be approved. // - // So long as neither of the above happen, we should be ok, because we'll inevitably lower the - // "buffered" amount until all pods for this node have contacted us, and the reserved amount - // should then be back under r.node.total. It _may_ still be above totalReservable, but that's - // expected to happen sometimes! + // This is typically set to a non-zero value when reserving resources for a Pod that has already + // been scheduled (so there's nothing we can do about it), or when handling an autoscaler-agent + // request that provides what a previous scheduler approved (via lastPermit). + forceApprovalMinimum T + + // convertIncreaseIntoPressure causes handleRequestedGeneric() to reject any requested increases + // in reserved resources, and instead add the amount of the increase to the CapacityPressure of + // the Pod and Node. + convertIncreaseIntoPressure bool + + // addBuffer causes handleRequestedGeneric() to additionally add the pod's Buffer field to the + // node, under the assumption that the Buffer is completely new. + // + // Note that if addBuffer is true, buffer will be added *even if the reservation is rejected*. + addBuffer bool + + callbackNoChange verdictCallback[T] + callbackDecreaseAutoApproved verdictCallback[T] + callbackIncreaseTurnedToPressure verdictCallback[T] + callbackIncreaseRejected verdictCallback[T] + callbackIncreasePartiallyApproved verdictCallback[T] + callbackIncreaseFullyApproved verdictCallback[T] +} + +type verdictCallback[T constraints.Unsigned] func(oldState, newState resourceState[T]) string + +func (r resourceTransitioner[T]) handleRequestedGeneric( + requested T, + opts requestedOptions[T], +) (verdict string) { + oldState := r.snapshotState() + + var verdictGenerator verdictCallback[T] if requested <= r.pod.Reserved { // Decrease "requests" are actually just notifications it's already happened @@ -152,34 +272,18 @@ func (r resourceTransitioner[T]) handleRequested(requested T, startingMigration r.pod.CapacityPressure = 0 r.node.CapacityPressure -= oldState.pod.CapacityPressure - // use shared verdict below. - - } else if startingMigration /* implied: && requested > r.pod.reserved */ { - // Can't increase during migrations. - // - // But we _will_ add the pod's request to the node's pressure, noting that its migration - // will resolve it. + if requested == r.pod.Reserved { + verdictGenerator = opts.callbackNoChange + } else /* requested < r.pod.Reserved */ { + verdictGenerator = opts.callbackDecreaseAutoApproved + } + } else if opts.convertIncreaseIntoPressure /* implied: requested > pod.Reserved */ { r.pod.CapacityPressure = requested - r.pod.Reserved r.node.CapacityPressure = r.node.CapacityPressure + r.pod.CapacityPressure - oldState.pod.CapacityPressure - // note: we don't need to handle buffer here because migration is never started as the first - // communication, so buffers will be zero already. - if r.pod.Buffer != 0 { - panic(errors.New("r.pod.buffer != 0")) - } - - fmtString := "Denying increase %d -> %d because the pod is starting migration; " + - "node capacityPressure %d -> %d (%d -> %d spoken for)" - verdict = fmt.Sprintf( - fmtString, - // Denying increase %d -> %d because ... - oldState.pod.Reserved, requested, - // node capacityPressure %d -> %d (%d -> %d spoken for) - oldState.node.CapacityPressure, r.node.CapacityPressure, oldState.node.PressureAccountedFor, r.node.PressureAccountedFor, - ) - return verdict - } else /* typical "request for increase" */ { - // The following comment was made 2022-11-28 (updated 2023-04-06): + verdictGenerator = opts.callbackIncreaseTurnedToPressure + } else /* implied: requested > pod.Reserved && !opts.convertIncreaseIntoPressure */ { + // The following comment was made 2022-11-28 (updated 2023-04-06, 2024-05-DD): (TODO: set date) // // Note: this function as currently written will actively cause the autoscaler-agent to use // resources that are uneven w.r.t. the number of compute units they represent. @@ -199,59 +303,48 @@ func (r resourceTransitioner[T]) handleRequested(requested T, startingMigration // // Please think carefully before changing this. + // note: it's entirely possible to have Reserved > Total, under a variety of + // undesirable-but-impossible-to-prevent circumstances. + remainingReservable := util.SaturatingSub(r.node.Total, r.node.Reserved) + increase := requested - r.pod.Reserved + // Increases are bounded by what's left in the node, rounded down to the nearest multiple of // the factor. - maxIncrease := (remainingReservable / factor) * factor + maxIncrease := (remainingReservable / opts.factor) * opts.factor + // ... but we must allow at least opts.forceApprovalMinimum + maxIncrease = util.Max(maxIncrease, opts.forceApprovalMinimum) + if increase > maxIncrease /* increases are bound by what's left in the node */ { r.pod.CapacityPressure = increase - maxIncrease // adjust node pressure accordingly. We can have old < new or new > old, so we shouldn't // directly += or -= (implicitly relying on overflow). r.node.CapacityPressure = r.node.CapacityPressure - oldState.pod.CapacityPressure + r.pod.CapacityPressure increase = maxIncrease // cap at maxIncrease. + + verdictGenerator = opts.callbackIncreasePartiallyApproved } else { // If we're not capped by maxIncrease, relieve pressure coming from this pod r.node.CapacityPressure -= r.pod.CapacityPressure r.pod.CapacityPressure = 0 + + verdictGenerator = opts.callbackIncreaseFullyApproved } r.pod.Reserved += increase r.node.Reserved += increase - - // use shared verdict below. } - fmtString := "Register %d%s -> %d%s (pressure %d -> %d); " + - "node reserved %d%s -> %d%s (of %d), " + - "node capacityPressure %d -> %d (%d -> %d spoken for)" - - var podBuffer string - var oldNodeBuffer string - var newNodeBuffer string if r.pod.Buffer != 0 { - podBuffer = fmt.Sprintf(" [buffer %d]", r.pod.Buffer) - oldNodeBuffer = fmt.Sprintf(" [buffer %d]", oldState.node.Buffer) - - r.node.Buffer -= r.pod.Buffer - r.pod.Buffer = 0 - - newNodeBuffer = fmt.Sprintf(" [buffer %d]", r.node.Buffer) - } - - var wanted string - if r.pod.Reserved != requested { - wanted = fmt.Sprintf(" (wanted %d)", requested) + if opts.addBuffer { + r.node.Buffer += r.pod.Buffer + } else /* !opts.addBuffer - buffer is only needed until the first request, so we can reset it */ { + r.node.Buffer -= r.pod.Buffer + r.pod.Buffer = 0 + } } - verdict = fmt.Sprintf( - fmtString, - // Register %d%s -> %d%s (pressure %d -> %d) - oldState.pod.Reserved, podBuffer, r.pod.Reserved, wanted, oldState.pod.CapacityPressure, r.pod.CapacityPressure, - // node reserved %d%s -> %d%s (of %d) - oldState.node.Reserved, oldNodeBuffer, r.node.Reserved, newNodeBuffer, totalReservable, - // node capacityPressure %d -> %d (%d -> %d spoken for) - oldState.node.CapacityPressure, r.node.CapacityPressure, oldState.node.PressureAccountedFor, r.node.PressureAccountedFor, - ) - return verdict + newState := r.snapshotState() + return verdictGenerator(oldState, newState) } // handleDeleted updates r.node with changes to match the removal of r.pod diff --git a/pkg/util/xact/xact.go b/pkg/util/xact/xact.go new file mode 100644 index 000000000..ea64bffb0 --- /dev/null +++ b/pkg/util/xact/xact.go @@ -0,0 +1,34 @@ +package xact + +// Xact represents a single in-memory transaction, to aid with separating calculations from their +// application. +type Xact[T any] struct { + tmp T + base *T +} + +// New returns a new transaction object (called Xact) operating on the given pointer +// +// NOTE: Any copying is shallow -- if T contains pointers, any changes to the values behind those +// will NOT be delayed until (*Xact[T]).Commit(). +func New[T any](ptr *T) *Xact[T] { + return &Xact[T]{ + tmp: *ptr, + base: ptr, + } +} + +// Value returns a pointer to the temporary value stored in the Xact +// +// The returned value can be freely modified; it will have no effect until the transaction is +// committed with Commit(). +func (x *Xact[T]) Value() *T { + return &x.tmp +} + +// Commit assigns the temporary value back to the original pointer that the Xact was created with +// +// A transaction can be committed multiple times, if it's useful to reuse it. +func (x *Xact[T]) Commit() { + *x.base = x.tmp +}