Skip to content

Commit

Permalink
Add new AWS podIdentity
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>
  • Loading branch information
JorTurFer committed Dec 31, 2023
1 parent 3c08404 commit 66de066
Show file tree
Hide file tree
Showing 72 changed files with 3,977 additions and 469 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-e2e.yml
Expand Up @@ -22,7 +22,7 @@ jobs:
id: checkUserMember
with:
username: ${{ github.actor }}
team: 'keda-e2e-test-executors'
team: "keda-e2e-test-executors"
GITHUB_TOKEN: ${{ secrets.GH_CHECKING_USER_AUTH }}

- name: Update comment with the execution url
Expand Down Expand Up @@ -221,5 +221,5 @@ jobs:
uses: actions/upload-artifact@c7d193f32edcb7bfad88892161225aeda64e9392 # v4
with:
name: e2e-test-logs
path: '${{ github.workspace }}/tests/**/*.log'
path: "${{ github.workspace }}/**/*.log"
if-no-files-found: ignore
2 changes: 1 addition & 1 deletion .github/workflows/template-main-e2e-test.yml
Expand Up @@ -51,5 +51,5 @@ jobs:
if: ${{ always() }}
with:
name: e2e-test-logs
path: '${{ github.workspace }}/tests/**/*.log'
path: "${{ github.workspace }}/**/*.log"
if-no-files-found: ignore
2 changes: 1 addition & 1 deletion .github/workflows/template-smoke-tests.yml
Expand Up @@ -48,5 +48,5 @@ jobs:
if: ${{ always() }}
with:
name: smoke-test-logs ${{ inputs.runs-on }}-${{ inputs.kubernetesVersion }}
path: "${{ github.workspace }}/tests/**/*.log"
path: "${{ github.workspace }}/**/*.log"
if-no-files-found: ignore
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -45,3 +45,6 @@ __debug_bin

# GO Test result
report.xml

# KEDA Certs
certs/*
10 changes: 8 additions & 2 deletions CHANGELOG.md
Expand Up @@ -51,7 +51,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Introduce new AWS Authentication ([#4134](https://github.com/kedacore/keda/issues/4134))

#### Experimental

Expand All @@ -61,8 +61,13 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Add CloudEventSource metrics in Prometheus & OpenTelemetry ([#3531](https://github.com/kedacore/keda/issues/3531))
- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Add validations for replica counts when creating ScaledObjects ([#5288](https://github.com/kedacore/keda/issues/5288))
- **General**: Bubble up AuthRef TriggerAuthentication errors as ScaledObject events ([#5190](https://github.com/kedacore/keda/issues/5190))
- **General**: Enhance podIdentity Role Assumption in AWS by Direct Integration with OIDC/Federation ([#5178](https://github.com/kedacore/keda/issues/5178))
- **General**: Fix issue where paused annotation being set to false still leads to scaled objects/jobs being paused ([#5215](https://github.com/kedacore/keda/issues/5215))
- **General**: Implement Credentials Cache for AWS Roles to reduce AWS API calls ([#5297](https://github.com/kedacore/keda/issues/5297))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
- **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070))
Expand All @@ -81,6 +86,7 @@ Here is an overview of all new **experimental** features:
- **General**: Fix otelgrpc DoS vulnerability ([#5208](https://github.com/kedacore/keda/issues/5208))
- **General**: Prevented memory leak generated by not correctly cleaning http connections ([#5248](https://github.com/kedacore/keda/issues/5248))
- **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083))
- **General**: ScaledObject Validating Webhook should support dry-run=server requests ([#5306](https://github.com/kedacore/keda/issues/5306))
- **AWS Scalers**: Ensure session tokens are included when instantiating AWS credentials ([#5156](https://github.com/kedacore/keda/issues/5156))
- **Azure Pipelines**: No more HTTP 400 errors produced by poolName with spaces ([#5107](https://github.com/kedacore/keda/issues/5107))
- **GCP pubsub scaler**: Added `project_id` to filter for metrics queries ([#5256](https://github.com/kedacore/keda/issues/5256))
Expand All @@ -103,7 +109,7 @@ New deprecation(s):
- **General**: Clean up previously deprecated code in Azure Data Explorer Scaler about clientSecret for 2.13 release ([#5051](https://github.com/kedacore/keda/issues/5051))

### Other

- **General**: Create a common utility function to get parameter value from config ([#5037](https://github.com/kedacore/keda/issues/5037))
- **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089))
- **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094))
- **General**: Reduce amount of gauge creations for OpenTelemetry metrics ([#5101](https://github.com/kedacore/keda/issues/5101))
Expand Down
1 change: 0 additions & 1 deletion apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions apis/keda/v1alpha1/scaledobject_types.go
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"fmt"
"reflect"
"strconv"

autoscalingv2 "k8s.io/api/autoscaling/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -193,6 +194,11 @@ func (so *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", so.Namespace, so.Name)
}

func (so *ScaledObject) HasPausedReplicaAnnotation() bool {
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
return pausedReplicasAnnotationFound
}

// HasPausedAnnotition returns whether this ScaledObject has PausedAnnotation or PausedReplicasAnnotation
func (so *ScaledObject) HasPausedAnnotation() bool {
_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
Expand All @@ -207,8 +213,16 @@ func (so *ScaledObject) NeedToBePausedByAnnotation() bool {
return so.Status.PausedReplicaCount != nil
}

_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
return pausedAnnotationFound
pausedAnnotationValue, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
if !pausedAnnotationFound {
return false
}
shouldPause, err := strconv.ParseBool(pausedAnnotationValue)
if err != nil {
// if annotation value is not a boolean, we assume user wants to pause the ScaledObject
return true
}
return shouldPause
}

// IsUsingModifiers determines whether scalingModifiers are defined or not
Expand Down
64 changes: 50 additions & 14 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Expand Up @@ -53,22 +53,54 @@ func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager) error {
kc = mgr.GetClient()
restMapper = mgr.GetRESTMapper()
return ctrl.NewWebhookManagedBy(mgr).
WithValidator(&ScaledObjectCustomValidator{}).
For(so).
Complete()
}

// +kubebuilder:webhook:path=/validate-keda-sh-v1alpha1-scaledobject,mutating=false,failurePolicy=ignore,sideEffects=None,groups=keda.sh,resources=scaledobjects,verbs=create;update,versions=v1alpha1,name=vscaledobject.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &ScaledObject{}
// ScaledObjectCustomValidator is a custom validator for ScaledObject objects
type ScaledObjectCustomValidator struct{}

func (socv ScaledObjectCustomValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (warnings admission.Warnings, err error) {
request, err := admission.RequestFromContext(ctx)
if err != nil {
return nil, err
}
so := obj.(*ScaledObject)
return so.ValidateCreate(request.DryRun)
}

func (socv ScaledObjectCustomValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) {
request, err := admission.RequestFromContext(ctx)
if err != nil {
return nil, err
}
so := newObj.(*ScaledObject)
old := oldObj.(*ScaledObject)
return so.ValidateUpdate(old, request.DryRun)
}

func (socv ScaledObjectCustomValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (warnings admission.Warnings, err error) {
request, err := admission.RequestFromContext(ctx)
if err != nil {
return nil, err
}
so := obj.(*ScaledObject)
return so.ValidateDelete(request.DryRun)
}

var _ webhook.CustomValidator = &ScaledObjectCustomValidator{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (so *ScaledObject) ValidateCreate() (admission.Warnings, error) {
func (so *ScaledObject) ValidateCreate(dryRun *bool) (admission.Warnings, error) {
val, _ := json.MarshalIndent(so, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("validating scaledobject creation for %s", string(val)))
return validateWorkload(so, "create")
return validateWorkload(so, "create", *dryRun)
}

func (so *ScaledObject) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
func (so *ScaledObject) ValidateUpdate(old runtime.Object, dryRun *bool) (admission.Warnings, error) {
val, _ := json.MarshalIndent(so, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("validating scaledobject update for %s", string(val)))

Expand All @@ -77,10 +109,10 @@ func (so *ScaledObject) ValidateUpdate(old runtime.Object) (admission.Warnings,
return nil, nil
}

return validateWorkload(so, "update")
return validateWorkload(so, "update", *dryRun)
}

func (so *ScaledObject) ValidateDelete() (admission.Warnings, error) {
func (so *ScaledObject) ValidateDelete(_ *bool) (admission.Warnings, error) {
return nil, nil
}

Expand All @@ -95,10 +127,10 @@ func isRemovingFinalizer(so *ScaledObject, old runtime.Object) bool {
return len(so.ObjectMeta.Finalizers) == 0 && len(oldSo.ObjectMeta.Finalizers) == 1 && soSpecString == oldSoSpecString
}

func validateWorkload(so *ScaledObject, action string) (admission.Warnings, error) {
func validateWorkload(so *ScaledObject, action string, dryRun bool) (admission.Warnings, error) {
metricscollector.RecordScaledObjectValidatingTotal(so.Namespace, action)

verifyFunctions := []func(*ScaledObject, string) error{
verifyFunctions := []func(*ScaledObject, string, bool) error{
verifyCPUMemoryScalers,
verifyTriggers,
verifyScaledObjects,
Expand All @@ -107,7 +139,7 @@ func validateWorkload(so *ScaledObject, action string) (admission.Warnings, erro
}

for i := range verifyFunctions {
err := verifyFunctions[i](so, action)
err := verifyFunctions[i](so, action, dryRun)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +149,7 @@ func validateWorkload(so *ScaledObject, action string) (admission.Warnings, erro
return nil, nil
}

func verifyReplicaCount(incomingSo *ScaledObject, action string) error {
func verifyReplicaCount(incomingSo *ScaledObject, action string, _ bool) error {
err := CheckReplicaCountBoundsAreValid(incomingSo)
if err != nil {
scaledobjectlog.WithValues("name", incomingSo.Name).Error(err, "validation error")
Expand All @@ -126,7 +158,7 @@ func verifyReplicaCount(incomingSo *ScaledObject, action string) error {
return nil
}

func verifyTriggers(incomingSo *ScaledObject, action string) error {
func verifyTriggers(incomingSo *ScaledObject, action string, _ bool) error {
err := ValidateTriggers(incomingSo.Spec.Triggers)
if err != nil {
scaledobjectlog.WithValues("name", incomingSo.Name).Error(err, "validation error")
Expand All @@ -135,7 +167,7 @@ func verifyTriggers(incomingSo *ScaledObject, action string) error {
return err
}

func verifyHpas(incomingSo *ScaledObject, action string) error {
func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
hpaList := &autoscalingv2.HorizontalPodAutoscalerList{}
opt := &client.ListOptions{
Namespace: incomingSo.Namespace,
Expand Down Expand Up @@ -190,7 +222,7 @@ func verifyHpas(incomingSo *ScaledObject, action string) error {
return nil
}

func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error {
soList := &ScaledObjectList{}
opt := &client.ListOptions{
Namespace: incomingSo.Namespace,
Expand Down Expand Up @@ -241,7 +273,11 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
return nil
}

func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool) error {
if dryRun {
return nil
}

var podSpec *corev1.PodSpec
for _, trigger := range incomingSo.Spec.Triggers {
if trigger.Type == cpuString || trigger.Type == memoryString {
Expand Down
29 changes: 29 additions & 0 deletions apis/keda/v1alpha1/scaledobject_webhook_test.go
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = It("should validate the so creation when there isn't any hpa", func() {
Expand Down Expand Up @@ -232,6 +233,34 @@ var _ = It("should validate the so creation with cpu and memory when deployment
}).ShouldNot(HaveOccurred())
})

var _ = It("shouldn't validate the creation with cpu and memory when deployment is missing", func() {

namespaceName := "deployment-missing"
namespace := createNamespace(namespaceName)
so := createScaledObject(soName, namespaceName, workloadName, "apps/v1", "Deployment", true, map[string]string{}, "")

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
return k8sClient.Create(context.Background(), so)
}).Should(HaveOccurred())
})

var _ = It("should validate the creation with cpu and memory when deployment is missing and dry-run is true", func() {

namespaceName := "deployment-missing-dry-run"
namespace := createNamespace(namespaceName)
so := createScaledObject(soName, namespaceName, workloadName, "apps/v1", "Deployment", true, map[string]string{}, "")

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
return k8sClient.Create(context.Background(), so, client.DryRunAll)
}).ShouldNot(HaveOccurred())
})

var _ = It("shouldn't validate the so creation with cpu and memory when deployment hasn't got memory request", func() {

namespaceName := "deployment-no-memory-request"
Expand Down
17 changes: 16 additions & 1 deletion apis/keda/v1alpha1/triggerauthentication_types.go
Expand Up @@ -118,9 +118,9 @@ const (
PodIdentityProviderAzure PodIdentityProvider = "azure"
PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload"
PodIdentityProviderGCP PodIdentityProvider = "gcp"
PodIdentityProviderSpiffe PodIdentityProvider = "spiffe"
PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks"
PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam"
PodIdentityProviderAws PodIdentityProvider = "aws"
)

// PodIdentityAnnotationEKS specifies aws role arn for aws-eks Identity Provider
Expand All @@ -133,9 +133,17 @@ const (
// AuthPodIdentity allows users to select the platform native identity
// mechanism
type AuthPodIdentity struct {
// +kubebuilder:validation:Enum=azure;azure-workload;gcp;aws;aws-eks;aws-kiam
Provider PodIdentityProvider `json:"provider"`
// +optional
IdentityID *string `json:"identityId"`
// +optional
// RoleArn sets the AWS RoleArn to be used. Mutually exclusive with IdentityOwner
RoleArn string `json:"roleArn"`
// +kubebuilder:validation:Enum=keda;workload
// +optional
// IdentityOwner configures which identity has to be used during auto discovery, keda or the scaled workload. Mutually exclusive with roleArn
IdentityOwner *string `json:"identityOwner"`
}

func (a *AuthPodIdentity) GetIdentityID() string {
Expand All @@ -145,6 +153,13 @@ func (a *AuthPodIdentity) GetIdentityID() string {
return *a.IdentityID
}

func (a *AuthPodIdentity) IsWorkloadIdentityOwner() bool {
if a.IdentityOwner == nil {
return false
}
return *a.IdentityOwner == workloadString
}

// AuthConfigMapTargetRef is used to authenticate using a reference to a config map
type AuthConfigMapTargetRef AuthTargetRef

Expand Down
9 changes: 9 additions & 0 deletions apis/keda/v1alpha1/triggerauthentication_webhook.go
Expand Up @@ -28,6 +28,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

const (
kedaString = "keda"
workloadString = "workload"
)

var triggerauthenticationlog = logf.Log.WithName("triggerauthentication-validation-webhook")

func (ta *TriggerAuthentication) SetupWebhookWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -113,6 +118,10 @@ func validateSpec(spec *TriggerAuthenticationSpec) (admission.Warnings, error) {
if spec.PodIdentity.IdentityID != nil && *spec.PodIdentity.IdentityID == "" {
return nil, fmt.Errorf("identityid of PodIdentity should not be empty. If it's set, identityId has to be different than \"\"")
}
case PodIdentityProviderAws:
if spec.PodIdentity.RoleArn != "" && spec.PodIdentity.IsWorkloadIdentityOwner() {
return nil, fmt.Errorf("roleArn of PodIdentity can't be set if KEDA isn't identityOwner")
}
default:
return nil, nil
}
Expand Down

0 comments on commit 66de066

Please sign in to comment.