Skip to content

Commit

Permalink
feat(scaler/prometheus): support authentication for Google Managed Pr…
Browse files Browse the repository at this point in the history
…ometheus (#4675)
  • Loading branch information
nettoclaudio committed Jun 13, 2023
1 parent 2234a6f commit b8a8d57
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 33 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repos:
hooks:
- id: trailing-whitespace
- id: detect-private-key
exclude: pkg/scalers/prometheus_scaler_test.go
- id: end-of-file-fixer
- id: check-merge-conflict
- id: mixed-line-ending
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Security:** Enable secret scanning in GitHub repo
- **RabbitMQ Scaler**: Add support for `unsafeSsl` in trigger metadata ([#4448](https://github.com/kedacore/keda/issues/4448))
- **Prometheus Metrics**: Add new metric with KEDA build info ([#4647](https://github.com/kedacore/keda/issues/4647))
- **Prometheus Scaler**: Add support for Google Managed Prometheus ([#4675](https://github.com/kedacore/keda/pull/4675))

### Fixes

Expand Down
95 changes: 74 additions & 21 deletions pkg/scalers/gcp_common.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,90 @@
package scalers

import (
"fmt"
"context"
"errors"
"net/http"
"os"

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

var (
gcpScopeMonitoringRead = "https://www.googleapis.com/auth/monitoring.read"

errGoogleApplicationCrendentialsNotFound = errors.New("google application credentials not found")
)

type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
GoogleApplicationCredentialsFile string
podIdentityProviderEnabled bool
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}

switch {
case config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
default:
switch {
case metadata["credentialsFromEnv"] != "":
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
case metadata["credentialsFromEnvFile"] != "":
meta.GoogleApplicationCredentialsFile = resolvedEnv[metadata["credentialsFromEnvFile"]]
default:
return nil, fmt.Errorf("GoogleApplicationCredentials not found")
func (a *gcpAuthorizationMetadata) tokenSource(ctx context.Context, scopes ...string) (oauth2.TokenSource, error) {
if a.podIdentityProviderEnabled {
return google.DefaultTokenSource(ctx, scopes...)
}

if a.GoogleApplicationCredentials != "" {
creds, err := google.CredentialsFromJSON(ctx, []byte(a.GoogleApplicationCredentials), scopes...)
if err != nil {
return nil, err
}

return creds.TokenSource, nil
}

if a.GoogleApplicationCredentialsFile != "" {
data, err := os.ReadFile(a.GoogleApplicationCredentialsFile)
if err != nil {
return nil, err
}

creds, err := google.CredentialsFromJSON(ctx, data, scopes...)
if err != nil {
return nil, err
}

return creds.TokenSource, nil
}

return nil, errGoogleApplicationCrendentialsNotFound
}

func getGCPAuthorization(config *ScalerConfig) (*gcpAuthorizationMetadata, error) {
if config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderGCP {
return &gcpAuthorizationMetadata{podIdentityProviderEnabled: true}, nil
}

if creds := config.AuthParams["GoogleApplicationCredentials"]; creds != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentials: creds}, nil
}

if creds := config.TriggerMetadata["credentialsFromEnv"]; creds != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentials: config.ResolvedEnv[creds]}, nil
}

if credsFile := config.TriggerMetadata["credentialsFromEnvFile"]; credsFile != "" {
return &gcpAuthorizationMetadata{GoogleApplicationCredentialsFile: config.ResolvedEnv[credsFile]}, nil
}
return &meta, nil

return nil, errGoogleApplicationCrendentialsNotFound
}

func getGCPOAuth2HTTPTransport(config *ScalerConfig, base http.RoundTripper, scopes ...string) (http.RoundTripper, error) {
a, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}

ts, err := a.tokenSource(context.Background(), scopes...)
if err != nil {
return nil, err
}

return &oauth2.Transport{Source: ts, Base: base}, nil
}
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
meta.activationValue = activationValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
meta.activationTargetValue = activationTargetValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/gcp_storage_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func parseGcsMetadata(config *ScalerConfig, logger logr.Logger) (*gcsMetadata, e
meta.blobPrefix = val
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -67,12 +68,12 @@ type prometheusMetadata struct {

type promQueryResult struct {
Status string `json:"status"`
Data struct {

Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric struct {
} `json:"metric"`
Value []interface{} `json:"value"`
Metric struct{} `json:"metric"`
Value []interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
}
Expand Down Expand Up @@ -109,16 +110,25 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) {
} else {
// could be the case of azure managed prometheus. Try and get the roundtripper.
// If its not the case of azure managed prometheus, we will get both transport and err as nil and proceed assuming no auth.
transport, err := azure.TryAndGetAzureManagedPrometheusHTTPRoundTripper(config.PodIdentity, config.TriggerMetadata)

azureTransport, err := azure.TryAndGetAzureManagedPrometheusHTTPRoundTripper(config.PodIdentity, config.TriggerMetadata)
if err != nil {
logger.V(1).Error(err, "error while init Azure Managed Prometheus client http transport")
return nil, err
}

// transport should not be nil if its a case of azure managed prometheus
if transport != nil {
httpClient.Transport = transport
if azureTransport != nil {
httpClient.Transport = azureTransport
}

gcpTransport, err := getGCPOAuth2HTTPTransport(config, httpClient.Transport, gcpScopeMonitoringRead)
if err != nil && !errors.Is(err, errGoogleApplicationCrendentialsNotFound) {
logger.V(1).Error(err, "failed to get GCP client HTTP transport (either using Google application credentials or workload identity)")
return nil, err
}

if err == nil && gcpTransport != nil {
httpClient.Transport = gcpTransport
}
}

Expand Down Expand Up @@ -292,7 +302,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
if err != nil {
return -1, err
}
_ = r.Body.Close()
defer r.Body.Close()

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
err := fmt.Errorf("prometheus query api returned error. status: %d response: %s", r.StatusCode, string(b))
Expand Down
132 changes: 132 additions & 0 deletions pkg/scalers/prometheus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package scalers

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)
Expand Down Expand Up @@ -363,3 +369,129 @@ func TestPrometheusScalerCustomHeaders(t *testing.T) {

assert.NoError(t, err)
}

func TestPrometheusScaler_ExecutePromQuery_WithGCPNativeAuthentication(t *testing.T) {
fakeGoogleOAuthServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"token_type": "Bearer", "access_token": "fake_access_token"}`)
}))
defer fakeGoogleOAuthServer.Close()

fakeGCPCredsJSON, err := json.Marshal(map[string]string{
"type": "service_account",
"private_key": `-----BEGIN RSA PRIVATE KEY-----
MIIBOwIBAAJBAOfgBHLEOcXo2X+8SSzF1rEsTewRzZIOZAak4XRULY+dBd1bsGBM
+dOb9a65cJbDuL3zmTZnfAxjmh2ueNTZvOcCAwEAAQJBAMwwibpG8llF48KInCfB
UH5U9YmdY9nqskrnh2JZfoWnpBbGxtqg0vbdmvEL2bcbeUnudF25mPpoONw1F6G6
5IECIQD0ouUBttDMacs5XqQppYCb8eAmiMkJxwgtJfPb9iGm0wIhAPKlXzNgIsMP
v3sqXcOO3tNjEohptOpEyLWyCt3Htm0dAiB9w/CvfOjC7fCIQdtrfaYshaCSrueL
m0Lc0xIXFuYd+QIgZ9DpkomnVd3/BytxQqJ2I+tXmpXfmfwkA9lRXOJ94uECIQC8
IisErx3ap2o99Zn+Yotv/TGZkS+lfMLdbcOBr8a57Q==
-----END RSA PRIVATE KEY-----`,
"token_uri": fakeGoogleOAuthServer.URL,
})
require.NoError(t, err)

fakeGCPCredsPath := filepath.Join(t.TempDir(), "fake_application_default_credentials.json")

f, err := os.Create(fakeGCPCredsPath)
require.NoError(t, err)
_, err = f.Write(fakeGCPCredsJSON)
require.NoError(t, err)
require.NoError(t, f.Close())

newFakeServer := func(t *testing.T) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/projects/my-fake-project/location/global/prometheus/api/v1/query", r.URL.Path)
assert.True(t, r.URL.Query().Has("time"))
assert.Equal(t, "sum(rate(http_requests_total{instance=\"my-instance\"}[5m]))", r.URL.Query().Get("query"))

if !assert.Equal(t, "Bearer fake_access_token", r.Header.Get("Authorization")) {
w.WriteHeader(http.StatusUnauthorized)
}

assert.NoError(t, json.NewEncoder(w).Encode(map[string]any{
"status": "success",
"data": map[string]any{
"resultType": "vector",
"result": []map[string]any{
{"metric": map[string]string{}, "value": []any{1686063687, "777"}},
},
},
}))
}))
}

tests := map[string]struct {
config func(*testing.T, *ScalerConfig) *ScalerConfig
}{
"using GCP workload identity": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeGCPCredsPath)
config.PodIdentity = kedav1alpha1.AuthPodIdentity{
Provider: kedav1alpha1.PodIdentityProviderGCP,
}
return config
},
},

"with Google app credentials on auth params": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.AuthParams = map[string]string{
"GoogleApplicationCredentials": string(fakeGCPCredsJSON),
}
return config
},
},

"with Google app credentials on envs": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.TriggerMetadata["credentialsFromEnv"] = "GCP_APP_CREDENTIALS"
config.ResolvedEnv = map[string]string{
"GCP_APP_CREDENTIALS": string(fakeGCPCredsJSON),
}
return config
},
},

"with Google app credentials file on auth params": {
config: func(t *testing.T, config *ScalerConfig) *ScalerConfig {
config.TriggerMetadata["credentialsFromEnvFile"] = "GCP_APP_CREDENTIALS"
config.ResolvedEnv = map[string]string{
"GCP_APP_CREDENTIALS": fakeGCPCredsPath,
}
return config
},
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
server := newFakeServer(t)
defer server.Close()

baseConfig := &ScalerConfig{
TriggerMetadata: map[string]string{
"serverAddress": server.URL + "/v1/projects/my-fake-project/location/global/prometheus",
"query": "sum(rate(http_requests_total{instance=\"my-instance\"}[5m]))",
"threshold": "100",
},
}

require.NotNil(t, tt.config, "you must provide a config generator func")
config := tt.config(t, baseConfig)

scaler, err := NewPrometheusScaler(config)
require.NoError(t, err)

s, ok := scaler.(*prometheusScaler)
require.True(t, ok, "Scaler must be a Prometheus Scaler")
_, ok = s.httpClient.Transport.(*oauth2.Transport)
require.True(t, ok, "HTTP transport must be Google OAuth2")

got, err := s.ExecutePromQuery(context.TODO())
require.NoError(t, err)
assert.Equal(t, float64(777), got)
})
}
}

0 comments on commit b8a8d57

Please sign in to comment.