Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(aws-sqs): Respect scaleOnInFlight value #4358

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276))

### Deprecations

Expand Down
19 changes: 12 additions & 7 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ const (
defaultScaleOnInFlight = true
)

var awsSqsQueueMetricNames = []string{
var awsSqsQueueMetricNamesForScalingInFlight = []string{
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
}

var awsSqsQueueMetricNamesForNotScalingInFlight = []string{
"ApproximateNumberOfMessages",
}

type awsSqsQueueScaler struct {
metricType v2.MetricTargetType
metadata *awsSqsQueueMetadata
Expand All @@ -45,6 +49,7 @@ type awsSqsQueueMetadata struct {
awsAuthorization awsAuthorizationMetadata
scalerIndex int
scaleOnInFlight bool
awsSqsQueueMetricNames []string
}

// NewAwsSqsQueueScaler creates a new awsSqsQueueScaler
Expand Down Expand Up @@ -104,10 +109,10 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
}
}

if !meta.scaleOnInFlight {
awsSqsQueueMetricNames = []string{
"ApproximateNumberOfMessages",
}
if meta.scaleOnInFlight {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight
} else {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight
}

if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" {
Expand Down Expand Up @@ -198,7 +203,7 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam
// Get SQS Queue Length
func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) {
input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice(awsSqsQueueMetricNames),
AttributeNames: aws.StringSlice(s.metadata.awsSqsQueueMetricNames),
QueueUrl: aws.String(s.metadata.queueURL),
}

Expand All @@ -208,7 +213,7 @@ func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) {
}

var approximateNumberOfMessages int64
for _, awsSqsQueueMetric := range awsSqsQueueMetricNames {
for _, awsSqsQueueMetric := range s.metadata.awsSqsQueueMetricNames {
metricValue, err := strconv.ParseInt(*output.Attributes[awsSqsQueueMetric], 10, 32)
if err != nil {
return -1, err
Expand Down
48 changes: 43 additions & 5 deletions pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,43 @@ var awsSQSMetricIdentifiers = []awsSQSMetricIdentifier{
{&testAWSSQSMetadata[1], 1, "s1-aws-sqs-DeleteArtifactQ"},
}

var awsSQSGetMetricTestData = []*awsSqsQueueMetadata{
{queueURL: testAWSSQSProperQueueURL},
{queueURL: testAWSSQSErrorQueueURL},
{queueURL: testAWSSQSBadDataQueueURL},
var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight enabled"},
{map[string]string{
"queueURL": testAWSSQSErrorQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"error queue"},
{map[string]string{
"queueURL": testAWSSQSBadDataQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"bad data"},
}

func TestSQSParseMetadata(t *testing.T) {
Expand Down Expand Up @@ -343,8 +376,13 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {
}

func TestAWSSQSScalerGetMetrics(t *testing.T) {
for _, meta := range awsSQSGetMetricTestData {
for index, testData := range awsSQSGetMetricTestData {
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, ScalerIndex: index}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()}

value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName")
switch meta.queueURL {
case testAWSSQSErrorQueueURL:
Expand Down