Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support csv-format for WATCH_NAMESPACE env var #5631

Merged
merged 9 commits into from
Apr 10, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
Expand Down
2 changes: 2 additions & 0 deletions controllers/eventing/cloudeventsource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/metricscollector"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
"github.com/kedacore/keda/v2/pkg/util"
)

// CloudEventSourceReconciler reconciles a EventSource object
Expand Down Expand Up @@ -115,6 +116,7 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req
func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&eventingv1alpha1.CloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
WithEventFilter(util.IgnoreOtherNamespaces()).
Complete(r)
}

Expand Down
2 changes: 2 additions & 0 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
"github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledjobs;scaledjobs/finalizers;scaledjobs/status,verbs="*"
Expand Down Expand Up @@ -91,6 +92,7 @@ func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options control
kedacontrollerutil.PausedPredicate{},
predicate.GenerationChangedPredicate{},
))).
WithEventFilter(util.IgnoreOtherNamespaces()).
Complete(r)
}

Expand Down
2 changes: 2 additions & 0 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
"github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs="*"
Expand Down Expand Up @@ -134,6 +135,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
predicate.GenerationChangedPredicate{},
),
)).
WithEventFilter(util.IgnoreOtherNamespaces()).
// Trigger a reconcile only when the HPA spec,label or annotation changes.
// Ignore updates to HPA status
Owns(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(
Expand Down
2 changes: 2 additions & 0 deletions controllers/keda/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/util"
)

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
Expand Down Expand Up @@ -90,6 +91,7 @@ func (r *TriggerAuthenticationReconciler) Reconcile(ctx context.Context, req ctr
func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
WithEventFilter(util.IgnoreOtherNamespaces()).
Complete(r)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
attribute.Key("scaledObject").String(scaledObject),
)

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
cback := func(_ context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), opt)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws/aws_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func GetAwsConfig(ctx context.Context, awsRegion string, awsAuthorization Author

if metadata.awsAuthorization.AwsRoleArn != "" {
stsSvc := sts.NewFromConfig(cfg)
stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, metadata.awsAuthorization.AwsRoleArn, func(options *stscreds.AssumeRoleOptions) {})
stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, metadata.awsAuthorization.AwsRoleArn, func(_ *stscreds.AssumeRoleOptions) {})
cfg.Credentials = aws.NewCredentialsCache(stsCredentialProvider)
}
return &cfg, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws/aws_sigv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewSigV4RoundTripper(config *scalersconfig.ScalerConfig) (http.RoundTripper
return nil, err
}

client := amp.NewFromConfig(*awsCfg, func(o *amp.Options) {})
client := amp.NewFromConfig(*awsCfg, func(_ *amp.Options) {})
rt := &roundTripper{
client: client,
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/util/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package util
import (
"fmt"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// GetWatchNamespaces returns the namespaces the operator should be watching for changes
Expand All @@ -15,11 +18,29 @@ func GetWatchNamespaces() (map[string]cache.Config, error) {
return map[string]cache.Config{}, fmt.Errorf("%s must be set", WatchNamespaceEnvVar)
}

if ns == "" {
if ns == "" || ns == "\"\"" {
return map[string]cache.Config{}, nil
}
nss := strings.Split(ns, ",")
nssMap := make(map[string]cache.Config)
for _, n := range nss {
nssMap[n] = cache.Config{}
}

return nssMap, nil
}

return map[string]cache.Config{
ns: {},
}, nil
// IgnoreOtherNamespaces returns the predicate for watched events that will filter out those that are not coming
// from a watched namespace (empty namespace or unset env var denotes all)
func IgnoreOtherNamespaces() predicate.Predicate {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
nss, e := GetWatchNamespaces()
if len(nss) == 0 || e != nil {
return predicate.And() // no-op predicate that returns always true
}
return predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool {
_, ok := nss[e.Object.GetNamespace()]
return ok
},
}
}