Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support --scale-down-delay-after-* per nodegroup #5729

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,8 @@ func (csr *ClusterStateRegistry) Stop() {
close(csr.interrupt)
}

// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase
// count.
// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime
// are reset.
// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are
// left intact.
func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
// RegisterScaleUp registers scale-up for give node group
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
Expand Down Expand Up @@ -246,7 +241,14 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr
}

// RegisterScaleDown registers node scale down.
func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) {
func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
nodeName string, currentTime time.Time, expectedDeleteTime time.Time) {
request := &ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: nodeName,
Time: currentTime,
ExpectedDeleteTime: expectedDeleteTime,
}
csr.Lock()
defer csr.Unlock()
csr.scaleDownRequests = append(csr.scaleDownRequests, request)
Expand Down Expand Up @@ -310,16 +312,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
// RegisterFailedScaleUp should be called after getting error from cloudprovider
// when trying to scale-up node group. It will mark this group as not safe to autoscale
// for some time.
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerFailedScaleUpNoLock(nodeGroup, reason, cloudprovider.InstanceErrorInfo{
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: string(reason),
ErrorMessage: errorMessage,
}, gpuResourceName, gpuType, currentTime)
}

// RegisterFailedScaleDown records failed scale-down for a nodegroup.
// We don't need to implement this function for cluster state registry
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
}

func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
Expand Down
41 changes: 19 additions & 22 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"

"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestEmptyOK(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))

provider.AddNodeGroup("ng1", 0, 10, 3)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
// clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second)
// clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))

provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1"))
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestExpiredScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand All @@ -476,13 +477,7 @@ func TestRegisterScaleDown(t *testing.T) {
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
now := time.Now()

clusterstate.RegisterScaleDown(&ScaleDownRequest{
NodeGroup: provider.GetNodeGroup("ng1"),
NodeName: "ng1-1",
ExpectedDeleteTime: now.Add(time.Minute),
Time: now,
})
clusterstate.RegisterScaleDown(provider.GetNodeGroup("ng1"), "ng1-1", now.Add(time.Minute), now)
assert.Equal(t, 1, len(clusterstate.scaleDownRequests))
clusterstate.updateScaleRequests(now.Add(5 * time.Minute))
assert.Equal(t, 0, len(clusterstate.scaleDownRequests))
Expand Down Expand Up @@ -794,7 +789,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second}))

// After failed scale-up, node group should be still healthy, but should backoff from scale-ups
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -826,7 +821,7 @@ func TestScaleUpBackoff(t *testing.T) {
assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now))

// Another failed scale up should cause longer backoff
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
Expand Down Expand Up @@ -860,7 +855,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, clusterstate.NodeGroupScaleUpSafety(ng1, now))

// The backoff should be cleared after a successful scale-up
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now)
ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4)
Expand Down Expand Up @@ -935,6 +930,7 @@ func TestUpdateScaleUp(t *testing.T) {

provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 5)
provider.AddNodeGroup("ng2", 1, 10, 5)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(
Expand All @@ -948,29 +944,30 @@ func TestUpdateScaleUp(t *testing.T) {
nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}),
)

clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now)
// Test cases for `RegisterScaleUp`
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// expect no change of times on negative delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// update times on positive delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))

// if we get below 0 scalup is deleted
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])

// If new scalup is registered with negative delta nothing should happen
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
}

Expand All @@ -986,9 +983,9 @@ func TestScaleUpFailures(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))

clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.APIError, "", "", "", now.Add(time.Minute))
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.APIError), "", "", "", now.Add(time.Minute))

failures := clusterstate.GetScaleUpFailures()
assert.Equal(t, map[string][]ScaleUpFailure{
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ type AutoscalingOptions struct {
ScaleDownDelayAfterDelete time.Duration
// ScaleDownDelayAfterFailure sets the duration before the next scale down attempt if scale down results in an error
ScaleDownDelayAfterFailure time.Duration
// ScaleDownDelayTypeLocal sets if the --scale-down-delay-after-* flags should be applied locally per nodegroup
// or globally across all nodegroups
ScaleDownDelayTypeLocal bool
// ScaleDownNonEmptyCandidatesCount is the maximum number of non empty nodes
// considered at once as candidates for scale down.
ScaleDownNonEmptyCandidatesCount int
Expand Down
7 changes: 6 additions & 1 deletion cluster-autoscaler/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ const (
DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime"
// DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option
DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization"
// DefaultScaleDownUnneededTime identifies ScaleDownUnneededTime autoscaling option

// DefaultScaleDownUnneededTime is the default time duration for which CA waits before deleting an unneeded node
DefaultScaleDownUnneededTime = 10 * time.Minute
// DefaultScaleDownUnreadyTime identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTime = 20 * time.Minute
// DefaultScaleDownUtilizationThreshold identifies ScaleDownUtilizationThreshold autoscaling option
DefaultScaleDownUtilizationThreshold = 0.5
// DefaultScaleDownGpuUtilizationThreshold identifies ScaleDownGpuUtilizationThreshold autoscaling option
DefaultScaleDownGpuUtilizationThreshold = 0.5
// DefaultScaleDownDelayAfterFailure is the default value for ScaleDownDelayAfterFailure autoscaling option
DefaultScaleDownDelayAfterFailure = 3 * time.Minute
// DefaultScanInterval is the default scan interval for CA
DefaultScanInterval = 10 * time.Second
)
10 changes: 4 additions & 6 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
Expand All @@ -31,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand All @@ -45,7 +45,6 @@ import (
// Actuator is responsible for draining and deleting nodes.
type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions options.NodeDeleteOptions
Expand All @@ -66,8 +65,8 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
if len(ctx.DrainPriorityConfig) > 0 {
Expand All @@ -77,7 +76,6 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
}
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
Expand All @@ -102,7 +100,7 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
a.nodeDeletionScheduler.ReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Now().Sub(deletionStartTime)) }()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()

results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}
Expand Down
14 changes: 10 additions & 4 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -1186,13 +1187,16 @@ func TestStartDeletion(t *testing.T) {
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
}

scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)

// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, 0*time.Second)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: false}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
Expand Down Expand Up @@ -1424,12 +1428,14 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}
Expand Down
Loading
Loading