Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: Transaction-based speculative reserve and logic unification #936

Merged
merged 2 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,19 +654,17 @@ func (e *AutoscaleEnforcer) Score(
return 0, framework.NewStatus(framework.Error, "Error fetching state for node")
}

var resources api.Resources
if vmInfo != nil {
resources = vmInfo.Using()
} else {
resources = extractPodResources(pod)
}

// Special case: return minimum score if we don't have room
noRoom := resources.VCPU > node.remainingReservableCPU() ||
resources.Mem > node.remainingReservableMem()
if noRoom {
overbudget, verdict := e.speculativeReserve(node, vmInfo, pod, false, func(_ verdictSet, overBudget bool) bool {
return overBudget
})
if overbudget {
score := framework.MinNodeScore
logger.Warn("No room on node, giving minimum score (typically handled by Filter method)", zap.Int64("score", score))
logger.Warn(
"No room on node, giving minimum score (typically handled by Filter method)",
zap.Int64("score", score),
zap.Object("verdict", verdict),
)
return score, nil
}

Expand Down
26 changes: 10 additions & 16 deletions pkg/plugin/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/tychoish/fun/srv"
"go.uber.org/zap"

vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
)

Expand Down Expand Up @@ -282,30 +283,23 @@ func (e *AutoscaleEnforcer) handleResources(
return api.Resources{VCPU: pod.cpu.Reserved, Mem: pod.mem.Reserved}, 200, nil
}

if lastPermit != nil {
cpuVerdict := makeResourceTransitioner(&node.cpu, &pod.cpu).
handleLastPermit(lastPermit.VCPU)
memVerdict := makeResourceTransitioner(&node.mem, &pod.mem).
handleLastPermit(lastPermit.Mem)
logger.Info(
"Handled last permit info from pod",
zap.Object("verdict", verdictSet{
cpu: cpuVerdict,
mem: memVerdict,
}),
)
}

cpuFactor := cu.VCPU
if !supportsFractionalCPU {
cpuFactor = 1000
}
memFactor := cu.Mem

var lastCPUPermit *vmapi.MilliCPU
var lastMemPermit *api.Bytes
if lastPermit != nil {
lastCPUPermit = &lastPermit.VCPU
lastMemPermit = &lastPermit.Mem
}

cpuVerdict := makeResourceTransitioner(&node.cpu, &pod.cpu).
handleRequested(req.VCPU, startingMigration, cpuFactor)
handleRequested(req.VCPU, lastCPUPermit, startingMigration, cpuFactor)
memVerdict := makeResourceTransitioner(&node.mem, &pod.mem).
handleRequested(req.Mem, startingMigration, memFactor)
handleRequested(req.Mem, lastMemPermit, startingMigration, memFactor)

logger.Info(
"Handled requested resources from pod",
Expand Down
160 changes: 80 additions & 80 deletions pkg/plugin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/watch"
"github.com/neondatabase/autoscaling/pkg/util/xact"
)

// pluginState stores the private state for the plugin, used both within and outside of the
Expand Down Expand Up @@ -644,45 +645,50 @@ func (e *AutoscaleEnforcer) reserveResources(
return false, nil, fmt.Errorf("%s: %w", msg, err)
}

var add api.Resources
if vmInfo != nil {
add = vmInfo.Using()
} else {
add = extractPodResources(pod)
}

shouldDeny := add.VCPU > node.remainingReservableCPU() || add.Mem > node.remainingReservableMem()

if shouldDeny {
e.metrics.IncReserveShouldDeny(pod, node)
}

if shouldDeny && opts.allowDeny {
cpuShortVerdict := "NOT ENOUGH"
if add.VCPU <= node.remainingReservableCPU() {
cpuShortVerdict = "OK"
}
memShortVerdict := "NOT ENOUGH"
if add.Mem <= node.remainingReservableMem() {
memShortVerdict = "OK"
accept := func(verdict verdictSet, overBudget bool) bool {
shouldDeny := overBudget
if shouldDeny {
e.metrics.IncReserveShouldDeny(pod, node)
}

verdict := verdictSet{
cpu: fmt.Sprintf(
"need %v, %v of %v used, so %v available (%s)",
add.VCPU, node.cpu.Reserved, node.cpu.Total, node.remainingReservableCPU(), cpuShortVerdict,
),
mem: fmt.Sprintf(
"need %v, %v of %v used, so %v available (%s)",
add.Mem, node.mem.Reserved, node.mem.Total, node.remainingReservableMem(), memShortVerdict,
),
if shouldDeny && opts.allowDeny {
logger.Error(
"Can't reserve resources for Pod (not enough available)",
zap.Object("verdict", verdict),
)
return false
}

logger.Error("Can't reserve resources for Pod (not enough available)", zap.Object("verdict", verdict))
return false, &verdict, nil
if opts.allowDeny {
logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict))
} else if shouldDeny /* want to deny, but can't */ {
logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict))
} else /* don't want to deny, but also couldn't if we wanted to */ {
logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict))
}
return true
}

// Construct the final state
ok, verdict := e.speculativeReserve(node, vmInfo, pod, opts.includeBuffer, accept)
return ok, &verdict, nil
}

// speculativeReserve reserves the pod, and then calls accept() to see whether the pod should
// actually be added.
//
// If accept() returns false, no changes to the state will be made.
func (e *AutoscaleEnforcer) speculativeReserve(
node *nodeState,
vmInfo *api.VmInfo,
pod *corev1.Pod,
includeBuffer bool,
accept func(verdict verdictSet, overBudget bool) bool,
) (ok bool, _ verdictSet) {

// Construct the speculative state of the pod
//
// We'll pass this into (resourceTransitioner).handleReserve(), but only commit the changes if
// the caller allows us to.

var cpuState podResourceState[vmapi.MilliCPU]
var memState podResourceState[api.Bytes]
Expand Down Expand Up @@ -721,88 +727,82 @@ func (e *AutoscaleEnforcer) reserveResources(
// If scaling is disabled, we don't have to worry about this, and if there's an ongoing
// migration, scaling is forbidden.
migrating := util.TryPodOwnerVirtualMachineMigration(pod) != nil
if !vmInfo.Config.ScalingEnabled || migrating || !opts.includeBuffer {
if !vmInfo.Config.ScalingEnabled || migrating || !includeBuffer {
cpuState.Buffer = 0
cpuState.Reserved = vmInfo.Using().VCPU
memState.Buffer = 0
memState.Reserved = vmInfo.Using().Mem
}
} else {
res := extractPodResources(pod)

cpuState = podResourceState[vmapi.MilliCPU]{
Reserved: add.VCPU,
Reserved: res.VCPU,
Buffer: 0,
CapacityPressure: 0,
Min: add.VCPU,
Max: add.VCPU,
Min: res.VCPU,
Max: res.VCPU,
}
memState = podResourceState[api.Bytes]{
Reserved: add.Mem,
Reserved: res.Mem,
Buffer: 0,
CapacityPressure: 0,
Min: add.Mem,
Max: add.Mem,
Min: res.Mem,
Max: res.Mem,
}
}

podName := util.GetNamespacedName(pod)

ps := &podState{
name: podName,
node: node,
cpu: cpuState,
mem: memState,
vm: vmState,
}
newNodeReservedCPU := node.cpu.Reserved + ps.cpu.Reserved
newNodeReservedMem := node.mem.Reserved + ps.mem.Reserved
newNodeBufferCPU := node.cpu.Buffer + ps.cpu.Buffer
newNodeBufferMem := node.mem.Buffer + ps.mem.Buffer

var verdict verdictSet

if ps.cpu.Buffer != 0 || ps.mem.Buffer != 0 {
verdict = verdictSet{
cpu: fmt.Sprintf(
"node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v",
node.cpu.Reserved, node.cpu.Buffer, ps.cpu.Reserved, ps.cpu.Buffer, newNodeReservedCPU, newNodeBufferCPU, node.cpu.Total,
),
mem: fmt.Sprintf(
"node reserved %v [buffer %v] + %v [buffer %v] -> %v [buffer %v] of total %v",
node.mem.Reserved, node.mem.Buffer, ps.mem.Reserved, ps.mem.Buffer, newNodeReservedMem, newNodeBufferMem, node.mem.Total,
),

// Speculatively try reserving the pod.
nodeXactCPU := xact.New(&node.cpu)
nodeXactMem := xact.New(&node.mem)

cpuOverBudget, cpuVerdict := makeResourceTransitioner(nodeXactCPU.Value(), &ps.cpu).handleReserve()
memOverBudget, memVerdict := makeResourceTransitioner(nodeXactMem.Value(), &ps.mem).handleReserve()

overBudget := cpuOverBudget || memOverBudget

verdict := verdictSet{
cpu: cpuVerdict,
mem: memVerdict,
}

const verdictNotEnough = "NOT ENOUGH"
const verdictOk = "OK"

if overBudget {
cpuShortVerdict := verdictNotEnough
if !cpuOverBudget {
cpuShortVerdict = verdictOk
}
} else {
verdict = verdictSet{
cpu: fmt.Sprintf(
"node reserved %v + %v -> %v of total %v",
node.cpu.Reserved, ps.cpu.Reserved, newNodeReservedCPU, node.cpu.Total,
),
mem: fmt.Sprintf(
"node reserved %v + %v -> %v of total %v",
node.mem.Reserved, ps.mem.Reserved, newNodeReservedMem, node.mem.Total,
),
verdict.cpu = fmt.Sprintf("%s: %s", cpuShortVerdict, verdict.cpu)
memShortVerdict := verdictNotEnough
if !memOverBudget {
memShortVerdict = verdictOk
}
verdict.mem = fmt.Sprintf("%s: %s", memShortVerdict, verdict.mem)
}

if opts.allowDeny {
logger.Info("Allowing reserve resources for Pod", zap.Object("verdict", verdict))
} else if shouldDeny /* but couldn't */ {
logger.Warn("Reserved resources for Pod above totals", zap.Object("verdict", verdict))
} else {
logger.Info("Reserved resources for Pod", zap.Object("verdict", verdict))
if !accept(verdict, overBudget) {
return false, verdict
}

node.cpu.Reserved = newNodeReservedCPU
node.mem.Reserved = newNodeReservedMem
node.cpu.Buffer = newNodeBufferCPU
node.mem.Buffer = newNodeBufferMem
nodeXactCPU.Commit()
nodeXactMem.Commit()

node.pods[podName] = ps
e.state.pods[podName] = ps

node.updateMetrics(e.metrics)

return true, &verdict, nil
return true, verdict
}

// This method is /basically/ the same as e.Unreserve, but the API is different and it has different
Expand Down
Loading
Loading