Skip to content

Commit

Permalink
fix: Handle placeholder instance decommission safely in AWS ASGs
Browse files Browse the repository at this point in the history
This merge resolves the issue where the Kubernetes Cluster Autoscaler incorrectly decommissions actual instances instead of placeholders within AWS ASGs. The fix ensures that only placeholders are considered for scaling down when recent scaling activities fail, thereby preventing the accidental removal of active nodes. Enhanced unit tests and checks are included to ensure robustness.

Fixes #5829
  • Loading branch information
ruiscosta committed May 10, 2024
1 parent fb69323 commit 9dfbb4d
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 140 deletions.
136 changes: 68 additions & 68 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,37 +308,37 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

// Initialize the success flag for recent scaling activity.
var recentScalingActivitySuccess = false
var err error

// Check if there are any placeholder instances in the list.
if m.HasPlaceholder(instances) {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}
}

for _, instance := range instances {
if m.isPlaceholderInstance(instance) {
if !recentScalingActivitySuccess {
// Log that scaling down due to unsuccessful recent activity
klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
continue // Continue to the next iteration after handling placeholder
}
klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name)
continue
} else {
// Initialize the success flag for recent scaling activity.
var recentScalingActivitySuccess = false
var err error

// Check if there are any placeholder instances in the list.
if m.HasPlaceholder(instances) {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}
}

for _, instance := range instances {
if m.isPlaceholderInstance(instance) {
if !recentScalingActivitySuccess {
// Log that scaling down due to unsuccessful recent activity
klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
continue // Continue to the next iteration after handling placeholder
}
klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name)
continue
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
Expand Down Expand Up @@ -376,42 +376,42 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
}
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
}
}

// isPlaceholderInstance checks if the given instance is only a placeholder
Expand Down Expand Up @@ -697,4 +697,4 @@ func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool {
// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
}
95 changes: 47 additions & 48 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,52 +168,51 @@ func TestCreatePlaceholders(t *testing.T) {
}

func TestGetMostRecentScalingActivity(t *testing.T) {
a := &autoScalingMock{}
asgCache := &asgCache{
awsService: &awsWrapper{
autoScalingI: a,
},
}

asg := &asg{AwsRef: AwsRef{Name: "test-asg"}}

// Test case: Successful scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}},
}, nil).Once()

success, err := asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.True(t, success)

// Test case: Failed scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Test case: No activities found
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Verify that all expectations are met
a.AssertExpectations(t)
}
a := &autoScalingMock{}
asgCache := &asgCache{
awsService: &awsWrapper{
autoScalingI: a,
},
}

asg := &asg{AwsRef: AwsRef{Name: "test-asg"}}

// Test case: Successful scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}},
}, nil).Once()

success, err := asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.True(t, success)

// Test case: Failed scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Test case: No activities found
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Verify that all expectations are met
a.AssertExpectations(t)
}
48 changes: 24 additions & 24 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,24 +589,24 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-10 * time.Minute)),
},
{
StatusCode: aws.String("Failed"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)
a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-10 * time.Minute)),
},
{
StatusCode: aws.String("Failed"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

provider.Refresh()

Expand All @@ -620,12 +620,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
}
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)
newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)

}

Expand Down Expand Up @@ -757,4 +757,4 @@ func TestHasInstance(t *testing.T) {
present, err = provider.HasInstance(node4)
assert.NoError(t, err)
assert.False(t, present)
}
}

0 comments on commit 9dfbb4d

Please sign in to comment.