Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provide scaler for Amazon managed service for Prometheus #5373

Merged
merged 37 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d5bdd71
chore: add large lag threshold test for Kafka scalers (#5361)
dttung2905 Jan 12, 2024
dd24cc2
Add support aws secretmanager authentication (#5162)
geoffrey1330 Jan 12, 2024
a37620e
chore(deps): update actions/upload-artifact digest to 1eb3cb2 (#5369)
renovate[bot] Jan 12, 2024
f58f9a6
chore: fix jetstream flaky test (#5372)
JorTurFer Jan 13, 2024
ccccc67
chore: bump Go to 1.21.6 (#5370)
zroubalik Jan 13, 2024
5c7c7f2
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
332e51e
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
1ad5671
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
ba8f6ac
Update pkg/scalers/aws_sigv4.go
sguruvar Jan 15, 2024
2034dba
Update tests/scalers/aws/aws_managed_prometheus_pod_identity/aws_mana…
sguruvar Jan 15, 2024
c29fb15
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
4b53d8e
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
68e52fe
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
ebaa539
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
4bb9d44
Update pkg/scalers/aws_sigv4.go
sguruvar Jan 15, 2024
f1a3f72
Update tests/scalers/aws/aws_managed_prometheus_pod_identity/aws_mana…
sguruvar Jan 15, 2024
8c6a39a
https://github.com/kedacore/keda/issues/2214
Jan 15, 2024
12b1817
Merge branch 'main' of https://github.com/sguruvar/keda
sguruvar Jan 15, 2024
2ba1144
Update CHANGELOG.md
sguruvar Jan 15, 2024
5c6efaf
Update pkg/scalers/aws_sigv4.go
sguruvar Jan 15, 2024
8813dad
https://github.com/kedacore/keda/issues/2214
sguruvar Jan 16, 2024
f76045b
Merge branch 'main' into main
tomkerkhove Jan 16, 2024
f805d13
Merge remote-tracking branch 'upstream/main'
sguruvar Jan 16, 2024
90f50e5
https://github.com/kedacore/keda/issues/2214
sguruvar Jan 16, 2024
ee28386
Update pkg/scalers/aws/aws_sigv4.go
sguruvar Jan 16, 2024
147186e
Update pkg/scalers/aws/aws_sigv4.go
sguruvar Jan 16, 2024
a31fa7c
Merge remote-tracking branch 'upstream/main'
sguruvar Jan 16, 2024
f73c7aa
https://github.com/kedacore/keda/issues/2214
sguruvar Jan 16, 2024
c3bb6ab
Apply suggestions from code review
JorTurFer Jan 17, 2024
268fe43
Apply suggestions from code review
JorTurFer Jan 17, 2024
bcafe6f
https://github.com/kedacore/keda/issues/2214
sguruvar Jan 17, 2024
5a5b8b2
Merge branch 'main' of https://github.com/sguruvar/keda
sguruvar Jan 17, 2024
b16e7fc
Update pkg/scalers/aws/aws_sigv4.go
sguruvar Jan 18, 2024
d86d3e5
Update pkg/scalers/aws/aws_sigv4.go
sguruvar Jan 18, 2024
a564efd
Update pkg/scalers/aws/aws_sigv4.go
sguruvar Jan 18, 2024
3376b82
https://github.com/kedacore/keda/issues/2214
sguruvar Jan 18, 2024
31d2afa
Merge branch 'main' into main
zroubalik Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Here is an overview of all new **experimental** features:
- **Kafka Scaler**: Add more logging to check Sarama DescribeTopics method ([#5102](https://github.com/kedacore/keda/issues/5102))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))
- **Prometheus Metrics**: Introduce paused ScaledObjects in Prometheus metrics ([#4430](https://github.com/kedacore/keda/issues/4430))
- **Prometheus Scaler**: Provide scaler for Amazon managed service for Prometheus ([#2214](https://github.com/kedacore/keda/issues/2214))
- **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069))

### Fixes
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.24.1
github.com/aws/aws-sdk-go-v2/config v1.26.2
github.com/aws/aws-sdk-go-v2/credentials v1.16.13
github.com/aws/aws-sdk-go-v2/service/amp v1.22.1
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.32.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.26.7
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.18.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,8 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.10/go.mod h1:6UV4SZkVvmO
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.20/go.mod h1:bfTcsThj5a9P5pIGRy0QudJ8k4+issxXX+O6Djnd5Cs=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsMJDJ2sLur1gRBhEM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/service/amp v1.22.1 h1:09O7NJKub+PsLAi1S+j/melSkjQROVV2RsDGqt3i34k=
github.com/aws/aws-sdk-go-v2/service/amp v1.22.1/go.mod h1:zXysWREb7sWv3Mr80IBeQmbbWtBD4OvA5r/W+E+aSyA=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.32.1 h1:IQ+uLXwS5Eelikc5ZdR0P55XPo+tqWh+k872KdpAjFA=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.32.1/go.mod h1:G63GKqSBLpBmO3tN1/PwM2NC65XvSd00zJWTZk202bc=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.26.7 h1:X60rMbnylU1xmmhv4+/N78t+lKOCC4ELst5eR25dyqg=
Expand Down
90 changes: 90 additions & 0 deletions pkg/scalers/aws/aws_sigv4.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package aws
sguruvar marked this conversation as resolved.
Show resolved Hide resolved

sguruvar marked this conversation as resolved.
Show resolved Hide resolved
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"net/http"
"time"

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/amp"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
httputils "github.com/kedacore/keda/v2/pkg/util"
)

// Custom round tripper to sign requests
sguruvar marked this conversation as resolved.
Show resolved Hide resolved
type roundTripper struct {
client *amp.Client
region string
}

var (
// ErrAwsAMPNoAwsRegion is returned when "awsRegion" is missing from the config.
ErrAwsAMPNoAwsRegion = errors.New("no awsRegion given")
)

func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
sguruvar marked this conversation as resolved.
Show resolved Hide resolved
cred, err := rt.client.Options().Credentials.Retrieve(req.Context())
if err != nil {
return nil, err
}
// Sign request
hasher := sha256.New()
reqCxt := v4.SetPayloadHash(req.Context(), hex.EncodeToString(hasher.Sum([]byte{})))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
reqHash := v4.GetPayloadHash(reqCxt)
err = rt.client.Options().HTTPSignerV4.SignHTTP(req.Context(), cred, req, reqHash, "aps", rt.region, time.Now())
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
// Create default transport
transport := httputils.CreateHTTPTransport(false)

// Send signed request
return transport.RoundTrip(req)
}

func parseAwsAMPMetadata(config *scalersconfig.ScalerConfig) (*awsConfigMetadata, error) {
sguruvar marked this conversation as resolved.
Show resolved Hide resolved
meta := awsConfigMetadata{}

auth, err := GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth
return &meta, nil
}

// NewSigV4RoundTripper returns a new http.RoundTripper that will sign requests
// using Amazon's Signature Verification V4 signing procedure. The request will
// then be handed off to the next RoundTripper provided by next. If next is nil,
// http.DefaultTransport will be used.
//
// Credentials for signing are retrieving used the default AWS credential chain.
// If credentials could not be found, an error will be returned.
func NewSigV4RoundTripper(config *scalersconfig.ScalerConfig) (http.RoundTripper, error) {
// parseAwsAMPMetadata can return an error if AWS info is missing
// but this can happen if we check for them on not AWS scalers
// which is probably the reason to create a SigV4RoundTripper.
// To prevent failures we check if the metadata is nil
// (missing AWS info) and we hide the error
metadata, _ := parseAwsAMPMetadata(config)
if metadata == nil {
return nil, nil
}
awsCfg, err := GetAwsConfig(context.Background(), metadata.awsRegion, metadata.awsAuthorization)
if err != nil {
return nil, err
}

client := amp.NewFromConfig(*awsCfg, func(o *amp.Options) {})
rt := &roundTripper{
client: client,
region: metadata.awsRegion,
}

return rt, nil
}
26 changes: 26 additions & 0 deletions pkg/scalers/aws/aws_sigv4_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package aws
sguruvar marked this conversation as resolved.
Show resolved Hide resolved

import (
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/kedacore/keda/v2/pkg/util"
)

func TestSigV4RoundTripper(t *testing.T) {
transport := util.CreateHTTPTransport(false)

cli := &http.Client{Transport: transport}

req, err := http.NewRequest(http.MethodGet, "https://aps-workspaces.us-west-2.amazonaws.com/workspaces/ws-38377ca8-8db3-4b58-812d-b65a81837bb8/api/v1/query?query=vector(10)", strings.NewReader("Hello, world!"))
require.NoError(t, err)
r, err := cli.Do(req)
require.NotEmpty(t, r)
require.NoError(t, err)
defer r.Body.Close()

require.NotNil(t, req)
}
11 changes: 11 additions & 0 deletions pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/authentication"
"github.com/kedacore/keda/v2/pkg/scalers/aws"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
"github.com/kedacore/keda/v2/pkg/scalers/gcp"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -130,6 +131,16 @@ func NewPrometheusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
if err == nil && gcpTransport != nil {
httpClient.Transport = gcpTransport
}

awsTransport, err := aws.NewSigV4RoundTripper(config)
if err != nil {
logger.V(1).Error(err, "failed to get AWS client HTTP transport ")
return nil, err
}

if err == nil && awsTransport != nil {
httpClient.Transport = awsTransport
}
}

return &prometheusScaler{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//go:build e2e
// +build e2e

package aws_managed_prometheus_test

import (
"context"
"encoding/base64"
"fmt"
"os"
"testing"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/amp"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

. "github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file
var _ = godotenv.Load("../../../.env")

const (
testName = "aws-prometheus-test"
)

type templateData struct {
TestNamespace string
DeploymentName string
ScaledObjectName string
SecretName string
AwsAccessKeyID string
AwsSecretAccessKey string
AwsRegion string
WorkspaceID string
}

const (
secretTemplate = `apiVersion: v1
kind: Secret
metadata:
name: {{.SecretName}}
namespace: {{.TestNamespace}}
data:
AWS_ACCESS_KEY_ID: {{.AwsAccessKeyID}}
AWS_SECRET_ACCESS_KEY: {{.AwsSecretAccessKey}}
`

triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-aws-credentials
namespace: {{.TestNamespace}}
spec:
secretTargetRef:
- parameter: awsAccessKeyID # Required.
name: {{.SecretName}} # Required.
key: AWS_ACCESS_KEY_ID # Required.
- parameter: awsSecretAccessKey # Required.
name: {{.SecretName}} # Required.
key: AWS_SECRET_ACCESS_KEY # Required.
`

deploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
replicas: 0
selector:
matchLabels:
app: {{.DeploymentName}}
template:
metadata:
labels:
app: {{.DeploymentName}}
spec:
containers:
- name: nginx
image: nginxinc/nginx-unprivileged
ports:
- containerPort: 80
`

scaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
maxReplicaCount: 2
minReplicaCount: 0
cooldownPeriod: 1
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 15
triggers:
- type: prometheus
authenticationRef:
name: keda-trigger-auth-aws-credentials
metadata:
awsRegion: {{.AwsRegion}}
serverAddress: "https://aps-workspaces.{{.AwsRegion}}.amazonaws.com/workspaces/{{.WorkspaceID}}"
query: "vector(100)"
threshold: "50.0"
`
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
secretName = fmt.Sprintf("%s-secret", testName)
workspaceID = fmt.Sprintf("workspace-%d", GetRandomNumber())
awsAccessKeyID = os.Getenv("TF_AWS_ACCESS_KEY")
awsSecretAccessKey = os.Getenv("TF_AWS_SECRET_KEY")
awsRegion = os.Getenv("TF_AWS_REGION")
)

func TestScaler(t *testing.T) {
require.NotEmpty(t, awsAccessKeyID, "AwsAccessKeyID env variable is required for AWS e2e test")
require.NotEmpty(t, awsSecretAccessKey, "awsSecretAccessKey env variable is required for AWS e2e test")

t.Log("--- setting up ---")

ampClient := createAMPClient()
workspaceOutput, _ := ampClient.CreateWorkspace(context.Background(), nil)
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
workspaceID = *workspaceOutput.WorkspaceId

kc := GetKubernetesClient(t)

data, templates := getTemplateData()
CreateKubernetesResources(t, kc, testNamespace, data, templates)
t.Log(secretTemplate)
t.Log("--- assert ---")
expectedReplicaCountNumber := 2 // as mentioned above, as the AMP returns 100 and the threshold set to 50, the expected replica count is 100 / 50 = 2
assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1),
"replica count should be %d after a minute", expectedReplicaCountNumber)

t.Log("--- cleaning up ---")
deleteWSInput := amp.DeleteWorkspaceInput{
WorkspaceId: &workspaceID,
}
input := &deleteWSInput
_, err := ampClient.DeleteWorkspace(context.Background(), input)
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Log("Unable to delete AMP workspace", err)
}
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
SecretName: secretName,
AwsAccessKeyID: base64.StdEncoding.EncodeToString([]byte(awsAccessKeyID)),
AwsSecretAccessKey: base64.StdEncoding.EncodeToString([]byte(awsSecretAccessKey)),
AwsRegion: awsRegion,
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
WorkspaceID: workspaceID,
}, []Template{
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
}
}

func createAMPClient() *amp.Client {
configOptions := make([]func(*config.LoadOptions) error, 0)
configOptions = append(configOptions, config.WithRegion(awsRegion))
cfg, _ := config.LoadDefaultConfig(context.TODO(), configOptions...)
cfg.Credentials = credentials.NewStaticCredentialsProvider(awsAccessKeyID, awsSecretAccessKey, "")
return amp.NewFromConfig(cfg)
}
Loading
Loading