Skip to content

Patches based implementation for DRA snapshot. #8090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
@@ -111,7 +110,7 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
err := snapshot.SetClusterState(tc.nodes, nil, nil)
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -281,7 +280,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, nil); err != nil {
assert.NoError(b, err)
}

2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
@@ -406,7 +406,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

var draSnapshot drasnapshot.Snapshot
var draSnapshot *drasnapshot.Snapshot
if a.ctx.DynamicResourceAllocationEnabled && a.ctx.DraProvider != nil {
draSnapshot, err = a.ctx.DraProvider.Snapshot()
if err != nil {
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ type testCase struct {
desc string
nodes []*apiv1.Node
pods []*apiv1.Pod
draSnapshot drasnapshot.Snapshot
draSnapshot *drasnapshot.Snapshot
draEnabled bool
wantUnneeded []string
wantUnremovable []*simulator.UnremovableNode
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -1044,7 +1043,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
// build orchestrator
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), nil)
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1154,7 +1153,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1197,7 +1196,7 @@ func TestBinpackingLimiter(t *testing.T) {

context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1257,7 +1256,7 @@ func TestScaleUpNoHelp(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1412,7 +1411,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1496,7 +1495,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, podList, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, podList, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1672,7 +1671,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := processorstest.NewTestProcessors(&context)
9 changes: 4 additions & 5 deletions cluster-autoscaler/core/scaleup/resource/manager_test.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -72,7 +71,7 @@ func TestDeltaForNode(t *testing.T) {

ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

@@ -115,7 +114,7 @@ func TestResourcesLeft(t *testing.T) {

ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

@@ -168,7 +167,7 @@ func TestApplyLimits(t *testing.T) {

ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

@@ -235,7 +234,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
assert.NoError(t, err)

nodes := []*corev1.Node{n1}
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
@@ -335,7 +335,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)

var draSnapshot drasnapshot.Snapshot
var draSnapshot *drasnapshot.Snapshot
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil {
draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot()
if err != nil {
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@@ -86,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {

nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1, ready7, readyToBeDeleted6}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := snapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)

ctx := context.AutoscalingContext{
@@ -173,7 +172,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {

nodes := []*apiv1.Node{unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := snapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)

// Fill cache
@@ -264,7 +263,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {

nodes := []*apiv1.Node{ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err := snapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)

ctx := context.AutoscalingContext{
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ type ClusterSnapshotStore interface {

// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot *drasnapshot.Snapshot) error

// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot without checking scheduler predicates.
ForceAddPod(pod *apiv1.Pod, nodeName string) error
@@ -93,7 +93,7 @@ type ClusterSnapshotStore interface {
RemoveSchedulerNodeInfo(nodeName string) error

// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
DraSnapshot() drasnapshot.Snapshot
DraSnapshot() *drasnapshot.Snapshot

// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert().
// Use WithForkedSnapshot() helper function instead if possible.
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, e
if err != nil {
return nil, err
}

if s.draEnabled {
return s.ClusterSnapshotStore.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo)
}
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert"

"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
@@ -40,7 +39,7 @@ func BenchmarkAddNodeInfo(b *testing.B) {
b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil, nil))
b.StartTimer()
for _, node := range nodes {
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
@@ -62,7 +61,7 @@ func BenchmarkListNodeInfos(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = clusterSnapshot.SetClusterState(nodes, nil, nil)
if err != nil {
assert.NoError(b, err)
}
@@ -92,14 +91,14 @@ func BenchmarkAddPods(b *testing.B) {
clustersnapshot.AssignTestPodsToNodes(pods, nodes)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = clusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
err = clusterSnapshot.SetClusterState(nodes, nil, nil)
if err != nil {
assert.NoError(b, err)
}
@@ -128,7 +127,7 @@ func BenchmarkForkAddRevert(b *testing.B) {
clustersnapshot.AssignTestPodsToNodes(pods, nodes)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
err = clusterSnapshot.SetClusterState(nodes, pods, nil)
assert.NoError(b, err)
tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000)
tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000)
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.