Skip to content

Commit

Permalink
plugin: Add transactions for speculative operations (#936)
Browse files Browse the repository at this point in the history
This commit adds 'pkg/util/xact' with usage in scheduler plugin such
that all places that previously implemented checks for "can this pod fit
on that node" are now done by speculatively reserving the pod, checking
if it left the node over-full, and then deciding whether to accept it.

This is unified through a new method speculativeReserve(). Internally,
speculativeReserve() calls a new method on resourceTransitioner, which
makes follow-up work easier.
  • Loading branch information
sharnoff committed Jun 16, 2024
1 parent c830bce commit e045b75
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 91 deletions.
20 changes: 9 additions & 11 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,19 +654,17 @@ func (e *AutoscaleEnforcer) Score(
return 0, framework.NewStatus(framework.Error, "Error fetching state for node")
}

var resources api.Resources
if vmInfo != nil {
resources = vmInfo.Using()
} else {
resources = extractPodResources(pod)
}

// Special case: return minimum score if we don't have room
noRoom := resources.VCPU > node.remainingReservableCPU() ||
resources.Mem > node.remainingReservableMem()
if noRoom {
overbudget, verdict := e.speculativeReserve(node, vmInfo, pod, false, func(_ verdictSet, overBudget bool) bool {
return overBudget
})
if overbudget {
score := framework.MinNodeScore
logger.Warn("No room on node, giving minimum score (typically handled by Filter method)", zap.Int64("score", score))
logger.Warn(
"No room on node, giving minimum score (typically handled by Filter method)",
zap.Int64("score", score),
zap.Object("verdict", verdict),
)
return score, nil
}

Expand Down
160 changes: 80 additions & 80 deletions pkg/plugin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/watch"
"github.com/neondatabase/autoscaling/pkg/util/xact"
)

// pluginState stores the private state for the plugin, used both within and outside of the
Expand Down Expand Up @@ -644,45 +645,50 @@ func (e *AutoscaleEnforcer) reserveResources(
return false, nil, fmt.Errorf("%s: %w", msg, err)
}

var add api.Resources
if vmInfo != nil {
add = vmInfo.Using()
} else {
add = extractPodResources(pod)
}

shouldDeny := add.VCPU > node.remainingReservableCPU() || add.Mem > node.remainingReservableMem()

if shouldDeny {
e.metrics.IncReserveShouldDeny(pod, node)
}

if shouldDeny && opts.allowDeny {
cpuShortVerdict := "NOT ENOUGH"
if add.VCPU <= node.remainingReservableCPU() {
cpuShortVerdict = "OK"
}
memShortVerdict := "NOT ENOUGH"
if add.Mem <= node.remainingReservableMem() {
memShortVerdict = "OK"
accept := func(verdict verdictSet, overBudget bool) bool {
shouldDeny := overBudget
if shouldDeny {
e.metrics.IncReserveShouldDeny(pod, node)
}

verdict := verdictSet{
cpu: fmt.Sprintf(
"need %v, %v of %v used, so %v available (%s)",
add.VCPU, node.cpu.Reserved, node.cpu.Total, node.remainingReservableCPU(), cpuShortVerdict,
),
mem: fmt.Sprintf(
"need %v, %v of %v used, so %v available (%s)",
add.Mem, node.mem.Reserved, node.mem.Total, node.remainingReservableMem(), memShortVerdict,
),
if shouldDeny && opts.allowDeny {
logger.Error(
"Can't reserve resources for Pod (not enough available)",
zap.Object("verdict", verdict),
)
return false
}

logger.Error("Can't reserve resources for Pod (not enough available)", zap.Object("verdict", verdict))
return false, &verdict, nil
if opts.allowDeny {
logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict))
} else if shouldDeny /* want to deny, but can't */ {
logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict))
} else /* don't want to deny, but also couldn't if we wanted to */ {
logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict))
}
return true
}

// Construct the final state
ok, verdict := e.speculativeReserve(node, vmInfo, pod, opts.includeBuffer, accept)
return ok, &verdict, nil
}

// speculativeReserve reserves the pod, and then calls accept() to see whether the pod should
// actually be added.
//
// If accept() returns false, no changes to the state will be made.
func (e *AutoscaleEnforcer) speculativeReserve(
node *nodeState,
vmInfo *api.VmInfo,
pod *corev1.Pod,
includeBuffer bool,
accept func(verdict verdictSet, overBudget bool) bool,
) (ok bool, _ verdictSet) {

// Construct the speculative state of the pod
//
// We'll pass this into (resourceTransitioner).handleReserve(), but only commit the changes if
// the caller allows us to.

var cpuState podResourceState[vmapi.MilliCPU]
var memState podResourceState[api.Bytes]
Expand Down Expand Up @@ -721,88 +727,82 @@ func (e *AutoscaleEnforcer) reserveResources(
// If scaling is disabled, we don't have to worry about this, and if there's an ongoing
// migration, scaling is forbidden.
migrating := util.TryPodOwnerVirtualMachineMigration(pod) != nil
if !vmInfo.Config.ScalingEnabled || migrating || !opts.includeBuffer {
if !vmInfo.Config.ScalingEnabled || migrating || !includeBuffer {
cpuState.Buffer = 0
cpuState.Reserved = vmInfo.Using().VCPU
memState.Buffer = 0
memState.Reserved = vmInfo.Using().Mem
}
} else {
res := extractPodResources(pod)

cpuState = podResourceState[vmapi.MilliCPU]{
Reserved: add.VCPU,
Reserved: res.VCPU,
Buffer: 0,
CapacityPressure: 0,
Min: add.VCPU,
Max: add.VCPU,
Min: res.VCPU,
Max: res.VCPU,
}
memState = podResourceState[api.Bytes]{
Reserved: add.Mem,
Reserved: res.Mem,
Buffer: 0,
CapacityPressure: 0,
Min: add.Mem,
Max: add.Mem,
Min: res.Mem,
Max: res.Mem,
}
}

podName := util.GetNamespacedName(pod)

ps := &podState{
name: podName,
node: node,
cpu: cpuState,
mem: memState,
vm: vmState,
}
newNodeReservedCPU := node.cpu.Reserved + ps.cpu.Reserved
newNodeReservedMem := node.mem.Reserved + ps.mem.Reserved
newNodeBufferCPU := node.cpu.Buffer + ps.cpu.Buffer
newNodeBufferMem := node.mem.Buffer + ps.mem.Buffer

var verdict verdictSet

if ps.cpu.Buffer != 0 || ps.mem.Buffer != 0 {
verdict = verdictSet{
cpu: fmt.Sprintf(
"node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v",
node.cpu.Reserved, node.cpu.Buffer, ps.cpu.Reserved, ps.cpu.Buffer, newNodeReservedCPU, newNodeBufferCPU, node.cpu.Total,
),
mem: fmt.Sprintf(
"node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v",
node.mem.Reserved, node.mem.Buffer, ps.mem.Reserved, ps.mem.Buffer, newNodeReservedMem, newNodeBufferMem, node.mem.Total,
),

// Speculatively try reserving the pod.
nodeXactCPU := xact.New(&node.cpu)
nodeXactMem := xact.New(&node.mem)

cpuOverBudget, cpuVerdict := makeResourceTransitioner(nodeXactCPU.Value(), &ps.cpu).handleReserve()
memOverBudget, memVerdict := makeResourceTransitioner(nodeXactMem.Value(), &ps.mem).handleReserve()

overBudget := cpuOverBudget || memOverBudget

verdict := verdictSet{
cpu: cpuVerdict,
mem: memVerdict,
}

const verdictNotEnough = "NOT ENOUGH"
const verdictOk = "OK"

if overBudget {
cpuShortVerdict := verdictNotEnough
if !cpuOverBudget {
cpuShortVerdict = verdictOk
}
} else {
verdict = verdictSet{
cpu: fmt.Sprintf(
"node reserved %v + %v -> %v of total %v",
node.cpu.Reserved, ps.cpu.Reserved, newNodeReservedCPU, node.cpu.Total,
),
mem: fmt.Sprintf(
"node reserved %v + %v -> %v of total %v",
node.mem.Reserved, ps.mem.Reserved, newNodeReservedMem, node.mem.Total,
),
verdict.cpu = fmt.Sprintf("%s: %s", cpuShortVerdict, verdict.cpu)
memShortVerdict := verdictNotEnough
if !memOverBudget {
memShortVerdict = verdictOk
}
verdict.mem = fmt.Sprintf("%s: %s", memShortVerdict, verdict.mem)
}

if opts.allowDeny {
logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict))
} else if shouldDeny /* but couldn't */ {
logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict))
} else {
logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict))
if !accept(verdict, overBudget) {
return false, verdict
}

node.cpu.Reserved = newNodeReservedCPU
node.mem.Reserved = newNodeReservedMem
node.cpu.Buffer = newNodeBufferCPU
node.mem.Buffer = newNodeBufferMem
nodeXactCPU.Commit()
nodeXactMem.Commit()

node.pods[podName] = ps
e.state.pods[podName] = ps

node.updateMetrics(e.metrics)

return true, &verdict, nil
return true, verdict
}

// This method is /basically/ the same as e.Unreserve, but the API is different and it has different
Expand Down
32 changes: 32 additions & 0 deletions pkg/plugin/trans.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,43 @@ func (r resourceTransitioner[T]) handleLastPermit(lastPermit T) (verdict string)
return
}

// handleReserve adds the resources from the pod to the node, reporting if the node was over-budget
//
// Unlike handleRequested, this method should be called to add a NEW pod to the node.
//
// This is used in combination with Xact to speculatively *try* reserving a pod, and then revert if
// it would result in being over-budget.
func (r resourceTransitioner[T]) handleReserve() (overbudget bool, verdict string) {
oldState := r.snapshotState()

r.node.Reserved += r.pod.Reserved
r.node.Buffer += r.pod.Buffer

if r.pod.Buffer != 0 {
verdict = fmt.Sprintf(
"node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v",
oldState.node.Reserved, oldState.node.Buffer, r.pod.Reserved, r.pod.Buffer, r.node.Reserved, r.node.Buffer, oldState.node.Total,
)
} else {
verdict = fmt.Sprintf(
"node reserved %v + %v -> %v of total %v",
oldState.node.Reserved, r.pod.Reserved, r.node.Reserved, oldState.node.Total,
)
}

overbudget = r.node.Reserved > r.node.Total

return overbudget, verdict
}

// handleRequested updates r.pod and r.node with changes to match the requested resources, within
// what's possible given the remaining resources.
//
// Any permitted increases are required to be a multiple of factor.
//
// Unlike handleReserve, this method should be called to update the resources for a preexisting pod
// on the node.
//
// A pretty-formatted summary of the outcome is returned as the verdict, for logging.
func (r resourceTransitioner[T]) handleRequested(requested T, startingMigration bool, factor T) (verdict string) {
oldState := r.snapshotState()
Expand Down
34 changes: 34 additions & 0 deletions pkg/util/xact/xact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package xact

// Xact represents a single in-memory transaction, to aid with separating calculations from their
// application.
type Xact[T any] struct {
tmp T
base *T
}

// New returns a new transaction object (called Xact) operating on the given pointer
//
// NOTE: Any copying is shallow -- if T contains pointers, any changes to the values behind those
// will NOT be delayed until (*Xact[T]).Commit().
func New[T any](ptr *T) *Xact[T] {
return &Xact[T]{
tmp: *ptr,
base: ptr,
}
}

// Value returns a pointer to the temporary value stored in the Xact
//
// The returned value can be freely modified; it will have no effect until the transaction is
// committed with Commit().
func (x *Xact[T]) Value() *T {
return &x.tmp
}

// Commit assigns the temporary value back to the original pointer that the Xact was created with
//
// A transaction can be committed multiple times, if it's useful to reuse it.
func (x *Xact[T]) Commit() {
*x.base = x.tmp
}

0 comments on commit e045b75

Please sign in to comment.