Skip to content

Commit

Permalink
feat(scaler): make GCP PubSub more flexible for metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Chung <ken8203@gmail.com>
  • Loading branch information
ken8203 committed Mar 27, 2023
1 parent 078c51b commit 336b6b6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243))

### Fixes

Expand Down
67 changes: 32 additions & 35 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ import (
)

const (
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
pubSubStackDriverOldestUnackedMessageAgeMetricName = "pubsub.googleapis.com/subscription/oldest_unacked_message_age"

pubsubModeSubscriptionSize = "SubscriptionSize"
pubsubModeOldestUnackedMessageAge = "OldestUnackedMessageAge"
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
prefixPubSubStackDriverSubscription = "pubsub.googleapis.com/subscription/"

pubSubModeSubscriptionSize = "SubscriptionSize"
pubSubDefaultValue = 10
)

var regexpCompositeSubscriptionIDPrefix = regexp.MustCompile(compositeSubscriptionIDPrefix)
Expand Down Expand Up @@ -67,8 +64,8 @@ func NewPubSubScaler(config *ScalerConfig) (Scaler, error) {
}

func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetadata, error) {
meta := pubsubMetadata{}
meta.mode = pubsubModeSubscriptionSize
// set subscription size to the default mode
meta := pubsubMetadata{mode: pubSubModeSubscriptionSize}

mode, modePresent := config.TriggerMetadata["mode"]
value, valuePresent := config.TriggerMetadata["value"]
Expand All @@ -78,7 +75,6 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
return nil, errors.New("you can use either mode and value fields or subscriptionSize field")
}
logger.Info("subscriptionSize field is deprecated. Use mode and value fields instead")
meta.mode = pubsubModeSubscriptionSize
subSizeValue, err := strconv.ParseFloat(subSize, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %w", err)
Expand All @@ -87,15 +83,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
} else {
if modePresent {
meta.mode = mode
}

switch meta.mode {
case pubsubModeSubscriptionSize:
meta.value = defaultTargetSubscriptionSize
case pubsubModeOldestUnackedMessageAge:
meta.value = defaultTargetOldestUnackedMessageAge
default:
return nil, fmt.Errorf("trigger mode %s must be one of %s, %s", meta.mode, pubsubModeSubscriptionSize, pubsubModeOldestUnackedMessageAge)
meta.value = pubSubDefaultValue
}

if valuePresent {
Expand Down Expand Up @@ -167,22 +155,20 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec

// GetMetricsAndActivity connects to Stack Driver and finds the size of the pub sub subscription
func (s *pubsubScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var value float64
var err error
mode := s.metadata.mode

switch s.metadata.mode {
case pubsubModeSubscriptionSize:
value, err = s.getMetrics(ctx, pubSubStackDriverSubscriptionSizeMetricName)
if err != nil {
s.logger.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, false, err
}
case pubsubModeOldestUnackedMessageAge:
value, err = s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
s.logger.Error(err, "error getting oldest unacked message age")
return []external_metrics.ExternalMetricValue{}, false, err
}
// SubscriptionSize is actually NumUndeliveredMessages in GCP PubSub.
// Considering backward compatibility, fallback "SubscriptionSize" to "NumUndeliveredMessages"
if mode == pubSubModeSubscriptionSize {
mode = "NumUndeliveredMessages"
}

metricType := prefixPubSubStackDriverSubscription + snakeCase(mode)

value, err := s.getMetrics(ctx, metricType)
if err != nil {
s.logger.Error(err, "error getting metric", metricType)
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, value)
Expand Down Expand Up @@ -234,3 +220,14 @@ func getSubscriptionData(s *pubsubScaler) (string, string) {
}
return subscriptionID, projectID
}

var (
regexpFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
regexpAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
)

func snakeCase(camelCase string) string {
snake := regexpFirstCap.ReplaceAllString(camelCase, "${1}_${2}")
snake = regexpAllCap.ReplaceAllString(snake, "${1}_${2}")
return strings.ToLower(snake)
}
23 changes: 22 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
// all properly formed
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": pubsubModeOldestUnackedMessageAge, "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": "OldestUnackedMessageAge", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
{nil, map[string]string{"subscriptionName": "", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
Expand Down Expand Up @@ -112,3 +112,24 @@ func TestGcpPubSubSubscriptionName(t *testing.T) {
}
}
}

func TestGcpPubSubSnakeCase(t *testing.T) {
testCases := []struct {
input string
want string
}{
{"PullAckRequestCount", "pull_ack_request_count"},
{"AckLatencies", "ack_latencies"},
{"AckMessageCount", "ack_message_count"},
{"BacklogBytes", "backlog_bytes"},
{"NumOutstandingMessages", "num_outstanding_messages"},
{"NumUndeliveredMessages", "num_undelivered_messages"},
}

for _, tc := range testCases {
got := snakeCase(tc.input)
if got != tc.want {
t.Fatalf(`want "%s" but got "%s"`, tc.want, got)
}
}
}

0 comments on commit 336b6b6

Please sign in to comment.