From 9dfbb4db143a5bfbbd9b6e7887b0d509be2e267c Mon Sep 17 00:00:00 2001 From: Rui Costa Date: Fri, 10 May 2024 14:28:45 -0500 Subject: [PATCH] fix: Handle placeholder instance decommission safely in AWS ASGs This merge resolves the issue where the Kubernetes Cluster Autoscaler incorrectly decommissions actual instances instead of placeholders within AWS ASGs. The fix ensures that only placeholders are considered for scaling down when recent scaling activities fail, thereby preventing the accidental removal of active nodes. Enhanced unit tests and checks are included to ensure robustness. Fixes #5829 --- .../cloudprovider/aws/auto_scaling_groups.go | 136 +++++++++--------- .../aws/auto_scaling_groups_test.go | 95 ++++++------ .../aws/aws_cloud_provider_test.go | 48 +++---- 3 files changed, 139 insertions(+), 140 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 30662cab059e..4f01ee591fae 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,37 +308,37 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } - // Initialize the success flag for recent scaling activity. - var recentScalingActivitySuccess = false - var err error - - // Check if there are any placeholder instances in the list. - if m.HasPlaceholder(instances) { - // Log the check for placeholders in the ASG. - klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name) - - // Retrieve the most recent scaling activity to determine its success state. - recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) - - // Handle errors from retrieving scaling activity. - if err != nil { - // Log the error if the scaling activity check fails and return the error. - klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) - return err // Return error to prevent further processing with uncertain state information. - } - } - - for _, instance := range instances { - if m.isPlaceholderInstance(instance) { - if !recentScalingActivitySuccess { - // Log that scaling down due to unsuccessful recent activity - klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name) - m.decreaseAsgSizeByOneNoLock(commonAsg) - continue // Continue to the next iteration after handling placeholder - } - klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name) - continue - } else { + // Initialize the success flag for recent scaling activity. + var recentScalingActivitySuccess = false + var err error + + // Check if there are any placeholder instances in the list. + if m.HasPlaceholder(instances) { + // Log the check for placeholders in the ASG. + klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name) + + // Retrieve the most recent scaling activity to determine its success state. + recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) + + // Handle errors from retrieving scaling activity. + if err != nil { + // Log the error if the scaling activity check fails and return the error. + klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) + return err // Return error to prevent further processing with uncertain state information. + } + } + + for _, instance := range instances { + if m.isPlaceholderInstance(instance) { + if !recentScalingActivitySuccess { + // Log that scaling down due to unsuccessful recent activity + klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name) + m.decreaseAsgSizeByOneNoLock(commonAsg) + continue // Continue to the next iteration after handling placeholder + } + klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name) + continue + } else { // check if the instance is already terminating - if it is, don't bother terminating again // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement // unnecessarily. @@ -376,42 +376,42 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) { - input := &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String(asg.Name), - MaxRecords: aws.Int64(1), - } - - var response *autoscaling.DescribeScalingActivitiesOutput - var err error - attempts := 3 - - for i := 0; i < attempts; i++ { - response, err = m.awsService.DescribeScalingActivities(input) - if err == nil { - break - } - klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) - time.Sleep(time.Second * 2) - } - - if err != nil { - klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) - return false, err - } - - if len(response.Activities) == 0 { - klog.Info("No scaling activities found for ASG:", asg.Name) - return false, nil - } - - lastActivity := response.Activities[0] - if *lastActivity.StatusCode == "Successful" { - klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) - return true, nil - } else { - klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) - return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) - } + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String(asg.Name), + MaxRecords: aws.Int64(1), + } + + var response *autoscaling.DescribeScalingActivitiesOutput + var err error + attempts := 3 + + for i := 0; i < attempts; i++ { + response, err = m.awsService.DescribeScalingActivities(input) + if err == nil { + break + } + klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) + time.Sleep(time.Second * 2) + } + + if err != nil { + klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) + return false, err + } + + if len(response.Activities) == 0 { + klog.Info("No scaling activities found for ASG:", asg.Name) + return false, nil + } + + lastActivity := response.Activities[0] + if *lastActivity.StatusCode == "Successful" { + klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) + return true, nil + } else { + klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + } } // isPlaceholderInstance checks if the given instance is only a placeholder @@ -697,4 +697,4 @@ func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool { // Cleanup closes the channel to signal the go routine to stop that is handling the cache func (m *asgCache) Cleanup() { close(m.interrupt) -} \ No newline at end of file +} diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index 525bc073a83e..76652d565007 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -168,52 +168,51 @@ func TestCreatePlaceholders(t *testing.T) { } func TestGetMostRecentScalingActivity(t *testing.T) { - a := &autoScalingMock{} - asgCache := &asgCache{ - awsService: &awsWrapper{ - autoScalingI: a, - }, - } - - asg := &asg{AwsRef: AwsRef{Name: "test-asg"}} - - // Test case: Successful scaling activity - a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }).Return(&autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}}, - }, nil).Once() - - success, err := asgCache.getMostRecentScalingActivity(asg) - assert.NoError(t, err) - assert.True(t, success) - - // Test case: Failed scaling activity - a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }).Return(&autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}}, - }, nil).Once() - - success, err = asgCache.getMostRecentScalingActivity(asg) - assert.NoError(t, err) - assert.False(t, success) - - // Test case: No activities found - a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }).Return(&autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{}, - }, nil).Once() - - success, err = asgCache.getMostRecentScalingActivity(asg) - assert.NoError(t, err) - assert.False(t, success) - - // Verify that all expectations are met - a.AssertExpectations(t) -} + a := &autoScalingMock{} + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + }, + } + + asg := &asg{AwsRef: AwsRef{Name: "test-asg"}} + + // Test case: Successful scaling activity + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}}, + }, nil).Once() + + success, err := asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.True(t, success) + // Test case: Failed scaling activity + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}}, + }, nil).Once() + + success, err = asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.False(t, success) + + // Test case: No activities found + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{}, + }, nil).Once() + + success, err = asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.False(t, success) + + // Verify that all expectations are met + a.AssertExpectations(t) +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index c4f5ffcb9cc5..6bba0a9de75a 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -589,24 +589,24 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) - a.On("DescribeScalingActivities", - &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }, - ).Return( - &autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{ - { - StatusCode: aws.String("Successful"), - StartTime: aws.Time(time.Now().Add(-10 * time.Minute)), - }, - { - StatusCode: aws.String("Failed"), - StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), - }, - }, - }, nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Successful"), + StartTime: aws.Time(time.Now().Add(-10 * time.Minute)), + }, + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) provider.Refresh() @@ -620,12 +620,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { } err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) - a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) - newSize, err := asgs[0].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 1, newSize) + newSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 1, newSize) } @@ -757,4 +757,4 @@ func TestHasInstance(t *testing.T) { present, err = provider.HasInstance(node4) assert.NoError(t, err) assert.False(t, present) -} \ No newline at end of file +}