diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 4667e285b153..4f01ee591fae 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,13 +308,36 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } + // Initialize the success flag for recent scaling activity. + var recentScalingActivitySuccess = false + var err error + + // Check if there are any placeholder instances in the list. + if m.HasPlaceholder(instances) { + // Log the check for placeholders in the ASG. + klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name) + + // Retrieve the most recent scaling activity to determine its success state. + recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) + + // Handle errors from retrieving scaling activity. + if err != nil { + // Log the error if the scaling activity check fails and return the error. + klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) + return err // Return error to prevent further processing with uncertain state information. + } + } + for _, instance := range instances { - // check if the instance is a placeholder - a requested instance that was never created by the node group - // if it is, just decrease the size of the node group, as there's no specific instance we can remove if m.isPlaceholderInstance(instance) { - klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+ - "of deleting instance", instance.Name) - m.decreaseAsgSizeByOneNoLock(commonAsg) + if !recentScalingActivitySuccess { + // Log that scaling down due to unsuccessful recent activity + klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name) + m.decreaseAsgSizeByOneNoLock(commonAsg) + continue // Continue to the next iteration after handling placeholder + } + klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name) + continue } else { // check if the instance is already terminating - if it is, don't bother terminating again // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement @@ -352,6 +375,45 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { return nil } +func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String(asg.Name), + MaxRecords: aws.Int64(1), + } + + var response *autoscaling.DescribeScalingActivitiesOutput + var err error + attempts := 3 + + for i := 0; i < attempts; i++ { + response, err = m.awsService.DescribeScalingActivities(input) + if err == nil { + break + } + klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) + time.Sleep(time.Second * 2) + } + + if err != nil { + klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) + return false, err + } + + if len(response.Activities) == 0 { + klog.Info("No scaling activities found for ASG:", asg.Name) + return false, nil + } + + lastActivity := response.Activities[0] + if *lastActivity.StatusCode == "Successful" { + klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) + return true, nil + } else { + klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + } +} + // isPlaceholderInstance checks if the given instance is only a placeholder func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool { return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) @@ -620,6 +682,18 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn } } +func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool { + // check if there's a placeholder instance and verify most recent scaling activity before terminating it + var containsPlaceholder bool = false + for _, instance := range instances { + if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) { + containsPlaceholder = true + break + } + } + return containsPlaceholder +} + // Cleanup closes the channel to signal the go routine to stop that is handling the cache func (m *asgCache) Cleanup() { close(m.interrupt) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index b536194fdf64..76652d565007 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -166,3 +166,53 @@ func TestCreatePlaceholders(t *testing.T) { }) } } + +func TestGetMostRecentScalingActivity(t *testing.T) { + a := &autoScalingMock{} + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + }, + } + + asg := &asg{AwsRef: AwsRef{Name: "test-asg"}} + + // Test case: Successful scaling activity + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}}, + }, nil).Once() + + success, err := asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.True(t, success) + + // Test case: Failed scaling activity + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}}, + }, nil).Once() + + success, err = asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.False(t, success) + + // Test case: No activities found + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{}, + }, nil).Once() + + success, err = asgCache.getMostRecentScalingActivity(asg) + assert.NoError(t, err) + assert.False(t, success) + + // Verify that all expectations are met + a.AssertExpectations(t) +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 0033d27c68ea..6bba0a9de75a 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -17,8 +17,6 @@ limitations under the License. package aws import ( - "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" apiv1 "k8s.io/api/core/v1" @@ -27,6 +25,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/config" + "testing" + "time" ) var testAwsManager = &AwsManager{ @@ -589,12 +589,30 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Successful"), + StartTime: aws.Time(time.Now().Add(-10 * time.Minute)), + }, + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) + provider.Refresh() initialSize, err := asgs[0].TargetSize() assert.NoError(t, err) assert.Equal(t, 2, initialSize) - node := &apiv1.Node{ Spec: apiv1.NodeSpec{ ProviderID: "aws:///us-east-1a/i-placeholder-test-asg-1", @@ -608,6 +626,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { newSize, err := asgs[0].TargetSize() assert.NoError(t, err) assert.Equal(t, 1, newSize) + } func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {