Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scaler/prometheus): support authentication for Google Managed Prometheus #4675

Merged
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repos:
hooks:
- id: trailing-whitespace
- id: detect-private-key
exclude: pkg/scalers/prometheus_scaler_test.go
- id: end-of-file-fixer
- id: check-merge-conflict
- id: mixed-line-ending
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Security:** Enable secret scanning in GitHub repo
- **RabbitMQ Scaler**: Add support for `unsafeSsl` in trigger metadata ([#4448](https://github.com/kedacore/keda/issues/4448))
- **Prometheus Metrics**: Add new metric with KEDA build info ([#4647](https://github.com/kedacore/keda/issues/4647))
- **Prometheus Scaler**: Add support for Google Managed Prometheus ([#4675](https://github.com/kedacore/keda/pull/4675))

### Fixes

Expand Down
95 changes: 74 additions & 21 deletions pkg/scalers/gcp_common.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,90 @@
package scalers

import (
"fmt"
"context"
"errors"
"net/http"
"os"

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

var (
gcpScopeMonitoringRead = "https://www.googleapis.com/auth/monitoring.read"

errGoogleApplicationCrendentialsNotFound = errors.New("google application credentials not found")
)

type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
GoogleApplicationCredentialsFile string
podIdentityProviderEnabled bool
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}

switch {
case config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
default:
switch {
case metadata["credentialsFromEnv"] != "":
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
case metadata["credentialsFromEnvFile"] != "":
meta.GoogleApplicationCredentialsFile = resolvedEnv[metadata["credentialsFromEnvFile"]]
default:
return nil, fmt.Errorf("GoogleApplicationCredentials not found")
func (a *gcpAuthorizationMetadata) tokenSource(ctx context.Context, scopes ...string) (oauth2.TokenSource, error) {
if a.podIdentityProviderEnabled {
return google.DefaultTokenSource(ctx, scopes...)
}

if a.GoogleApplicationCredentials != "" {
creds, err := google.CredentialsFromJSON(ctx, []byte(a.GoogleApplicationCredentials), scopes...)
if err != nil {
return nil, err
}

return creds.TokenSource, nil
}

if a.GoogleApplicationCredentialsFile != "" {
data, err := os.ReadFile(a.GoogleApplicationCredentialsFile)
if err != nil {
return nil, err
}

creds, err := google.CredentialsFromJSON(ctx, data, scopes...)
if err != nil {
return nil, err
}

return creds.TokenSource, nil
}

return nil, errGoogleApplicationCrendentialsNotFound
}

func getGCPAuthorization(config *ScalerConfig) (*gcpAuthorizationMetadata, error) {
if config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderGCP {
return &gcpAuthorizationMetadata{podIdentityProviderEnabled: true}, nil
}

if creds := config.AuthParams["GoogleApplicationCredentials"]; creds != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentials: creds}, nil
}

if creds := config.TriggerMetadata["credentialsFromEnv"]; creds != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentials: config.ResolvedEnv[creds]}, nil
}

if credsFile := config.TriggerMetadata["credentialsFromEnvFile"]; credsFile != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentialsFile: config.ResolvedEnv[credsFile]}, nil
}
return &meta, nil

return nil, errGoogleApplicationCrendentialsNotFound
}

func getGCPOAuth2HTTPTransport(config *ScalerConfig, base http.RoundTripper, scopes ...string) (http.RoundTripper, error) {
a, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}

ts, err := a.tokenSource(context.Background(), scopes...)
if err != nil {
return nil, err
}

return &oauth2.Transport{Source: ts, Base: base}, nil
}
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
meta.activationValue = activationValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
meta.activationTargetValue = activationTargetValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_storage_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func parseGcsMetadata(config *ScalerConfig, logger logr.Logger) (*gcsMetadata, e
meta.blobPrefix = val
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -67,12 +68,12 @@ type prometheusMetadata struct {

type promQueryResult struct {
Status string `json:"status"`
Data struct {

Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric struct {
} `json:"metric"`
Value []interface{} `json:"value"`
Metric struct{} `json:"metric"`
Value []interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
}
Expand Down Expand Up @@ -109,16 +110,25 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) {
} else {
// could be the case of azure managed prometheus. Try and get the roundtripper.
// If its not the case of azure managed prometheus, we will get both transport and err as nil and proceed assuming no auth.
transport, err := azure.TryAndGetAzureManagedPrometheusHTTPRoundTripper(config.PodIdentity, config.TriggerMetadata)

azureTransport, err := azure.TryAndGetAzureManagedPrometheusHTTPRoundTripper(config.PodIdentity, config.TriggerMetadata)
if err != nil {
logger.V(1).Error(err, "error while init Azure Managed Prometheus client http transport")
return nil, err
}

// transport should not be nil if its a case of azure managed prometheus
if transport != nil {
httpClient.Transport = transport
if azureTransport != nil {
httpClient.Transport = azureTransport
}

gcpTransport, err := getGCPOAuth2HTTPTransport(config, httpClient.Transport, gcpScopeMonitoringRead)
if err != nil && !errors.Is(err, errGoogleApplicationCrendentialsNotFound) {
logger.V(1).Error(err, "failed to get GCP client HTTP transport (either using Google application credentials or workload identity)")
return nil, err
}

if err == nil && gcpTransport != nil {
httpClient.Transport = gcpTransport
}
}

Expand Down Expand Up @@ -292,7 +302,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
if err != nil {
return -1, err
}
_ = r.Body.Close()
defer r.Body.Close()

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
err := fmt.Errorf("prometheus query api returned error. status: %d response: %s", r.StatusCode, string(b))
Expand Down
132 changes: 132 additions & 0 deletions pkg/scalers/prometheus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package scalers

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)
Expand Down Expand Up @@ -363,3 +369,129 @@ func TestPrometheusScalerCustomHeaders(t *testing.T) {

assert.NoError(t, err)
}

func TestPrometheusScaler_ExecutePromQuery_WithGCPNativeAuthentication(t *testing.T) {
fakeGoogleOAuthServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"token_type": "Bearer", "access_token": "fake_access_token"}`)
}))
defer fakeGoogleOAuthServer.Close()

fakeGCPCredsJSON, err := json.Marshal(map[string]string{
"type": "service_account",
"private_key": `-----BEGIN RSA PRIVATE KEY-----
MIIBOwIBAAJBAOfgBHLEOcXo2X+8SSzF1rEsTewRzZIOZAak4XRULY+dBd1bsGBM
+dOb9a65cJbDuL3zmTZnfAxjmh2ueNTZvOcCAwEAAQJBAMwwibpG8llF48KInCfB
UH5U9YmdY9nqskrnh2JZfoWnpBbGxtqg0vbdmvEL2bcbeUnudF25mPpoONw1F6G6
5IECIQD0ouUBttDMacs5XqQppYCb8eAmiMkJxwgtJfPb9iGm0wIhAPKlXzNgIsMP
v3sqXcOO3tNjEohptOpEyLWyCt3Htm0dAiB9w/CvfOjC7fCIQdtrfaYshaCSrueL
m0Lc0xIXFuYd+QIgZ9DpkomnVd3/BytxQqJ2I+tXmpXfmfwkA9lRXOJ94uECIQC8
IisErx3ap2o99Zn+Yotv/TGZkS+lfMLdbcOBr8a57Q==
-----END RSA PRIVATE KEY-----`,
"token_uri": fakeGoogleOAuthServer.URL,
})
require.NoError(t, err)

fakeGCPCredsPath := filepath.Join(t.TempDir(), "fake_application_default_credentials.json")

f, err := os.Create(fakeGCPCredsPath)
require.NoError(t, err)
_, err = f.Write(fakeGCPCredsJSON)
require.NoError(t, err)
require.NoError(t, f.Close())

newFakeServer := func(t *testing.T) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/projects/my-fake-project/location/global/prometheus/api/v1/query", r.URL.Path)
assert.True(t, r.URL.Query().Has("time"))
assert.Equal(t, "sum(rate(http_requests_total{instance=\"my-instance\"}[5m]))", r.URL.Query().Get("query"))

if !assert.Equal(t, "Bearer fake_access_token", r.Header.Get("Authorization")) {
w.WriteHeader(http.StatusUnauthorized)
}

assert.NoError(t, json.NewEncoder(w).Encode(map[string]any{
"status": "success",
"data": map[string]any{
"resultType": "vector",
"result": []map[string]any{
{"metric": map[string]string{}, "value": []any{1686063687, "777"}},
},
},
}))
}))
}

tests := map[string]struct {
config func(*testing.T, *ScalerConfig) *ScalerConfig
}{
"using GCP workload identity": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeGCPCredsPath)
config.PodIdentity = kedav1alpha1.AuthPodIdentity{
Provider: kedav1alpha1.PodIdentityProviderGCP,
}
return config
},
},

"with Google app credentials on auth params": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.AuthParams = map[string]string{
"GoogleApplicationCredentials": string(fakeGCPCredsJSON),
}
return config
},
},

"with Google app credentials on envs": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.TriggerMetadata["credentialsFromEnv"] = "GCP_APP_CREDENTIALS"
config.ResolvedEnv = map[string]string{
"GCP_APP_CREDENTIALS": string(fakeGCPCredsJSON),
}
return config
},
},

"with Google app credentials file on auth params": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.TriggerMetadata["credentialsFromEnvFile"] = "GCP_APP_CREDENTIALS"
config.ResolvedEnv = map[string]string{
"GCP_APP_CREDENTIALS": fakeGCPCredsPath,
}
return config
},
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
server := newFakeServer(t)
defer server.Close()

baseConfig := &ScalerConfig{
TriggerMetadata: map[string]string{
"serverAddress": server.URL + "/v1/projects/my-fake-project/location/global/prometheus",
"query": "sum(rate(http_requests_total{instance=\"my-instance\"}[5m]))",
"threshold": "100",
},
}

require.NotNil(t, tt.config, "you must provide a config generator func")
config := tt.config(t, baseConfig)

scaler, err := NewPrometheusScaler(config)
require.NoError(t, err)

s, ok := scaler.(*prometheusScaler)
require.True(t, ok, "Scaler must be a Prometheus Scaler")
_, ok = s.httpClient.Transport.(*oauth2.Transport)
require.True(t, ok, "HTTP transport must be Google OAuth2")

got, err := s.ExecutePromQuery(context.TODO())
require.NoError(t, err)
assert.Equal(t, float64(777), got)
})
}
}