Skip to content

Commit

Permalink
fix: trigger labels are set properly with cn event links enabled (#8066)
Browse files Browse the repository at this point in the history
* fix: trigger labels are set properly with cn event links enabled

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: trigger default tests expect namespace label to be defaulted

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: triggers are properly filtered to match the correct broker

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Jul 5, 2024
1 parent 5f6713a commit 2f1f866
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 16 deletions.
13 changes: 10 additions & 3 deletions pkg/apis/eventing/v1/trigger_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package v1
import (
"context"

"knative.dev/eventing/pkg/apis/feature"

"knative.dev/pkg/apis"
)

Expand All @@ -29,7 +31,7 @@ const (
func (t *Trigger) SetDefaults(ctx context.Context) {
withNS := apis.WithinParent(ctx, t.ObjectMeta)
t.Spec.SetDefaults(withNS)
setLabels(t)
setLabels(ctx, t)
}

func (ts *TriggerSpec) SetDefaults(ctx context.Context) {
Expand All @@ -42,8 +44,13 @@ func (ts *TriggerSpec) SetDefaults(ctx context.Context) {
ts.Delivery.SetDefaults(ctx)
}

func setLabels(t *Trigger) {
if t.Spec.Broker != "" {
func setLabels(ctx context.Context, t *Trigger) {
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && t.Spec.BrokerRef != nil {
if len(t.Labels) == 0 {
t.Labels = map[string]string{}
}
t.Labels[brokerLabel] = t.Spec.BrokerRef.Name
} else if t.Spec.Broker != "" {
if len(t.Labels) == 0 {
t.Labels = map[string]string{}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/eventing/v1/trigger_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ func TestTriggerDefaults(t *testing.T) {
expected: defaultTrigger,
},
"nil filter": {
initial: Trigger{Spec: TriggerSpec{Broker: otherBroker}},
initial: Trigger{Spec: TriggerSpec{Broker: otherBroker}, ObjectMeta: metav1.ObjectMeta{Namespace: namespace}},
expected: Trigger{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{brokerLabel: otherBroker},
Labels: map[string]string{brokerLabel: otherBroker},
Namespace: namespace,
},
Spec: TriggerSpec{Broker: otherBroker, Filter: emptyTriggerFilter}},
},
Expand Down Expand Up @@ -133,11 +134,13 @@ func TestTriggerDefaults(t *testing.T) {
"with broker and label": {
initial: Trigger{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{"otherLabel": "my-other-label"},
Namespace: namespace,
Labels: map[string]string{"otherLabel": "my-other-label"},
},
Spec: TriggerSpec{Broker: defaultBroker}},
expected: Trigger{
ObjectMeta: v1.ObjectMeta{
Namespace: namespace,
Labels: map[string]string{
"otherLabel": "my-other-label",
brokerLabel: defaultBroker},
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(b),
},
Labels: TriggerChannelLabels(b.Name),
Labels: TriggerChannelLabels(b.Name, b.Namespace),
Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster},
},
ducklib.WithChannelableSpec(tmpChannelableSpec),
Expand Down Expand Up @@ -430,7 +430,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf

// TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This
// should only be used by Broker and Trigger code.
func TriggerChannelLabels(brokerName string) map[string]string {
func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string {
return map[string]string{
eventing.BrokerLabelKey: brokerName,
"eventing.knative.dev/brokerEverything": "true",
Expand Down
16 changes: 12 additions & 4 deletions pkg/reconciler/broker/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewController(
FilterFunc: brokerFilter,
Handler: controller.HandleAll(func(obj interface{}) {
if broker, ok := obj.(*eventing.Broker); ok {
for _, t := range getTriggersForBroker(logger, triggerLister, broker) {
for _, t := range getTriggersForBroker(ctx, logger, triggerLister, broker) {
impl.Enqueue(t)
}
}
Expand Down Expand Up @@ -185,13 +185,21 @@ func filterTriggers(featureStore *feature.Store, lister eventinglisters.BrokerLi
// the Triggers belonging to it. As there is no way to return failures in the
// Informers EventHandler, errors are logged, and an empty array is returned in case
// of failures.
func getTriggersForBroker(logger *zap.SugaredLogger, triggerLister eventinglisters.TriggerLister, broker *eventing.Broker) []*eventing.Trigger {
func getTriggersForBroker(ctx context.Context, logger *zap.SugaredLogger, triggerLister eventinglisters.TriggerLister, broker *eventing.Broker) []*eventing.Trigger {
r := make([]*eventing.Trigger, 0)
selector := labels.SelectorFromSet(map[string]string{apiseventing.BrokerLabelKey: broker.Name})
triggers, err := triggerLister.Triggers(broker.Namespace).List(selector)
triggers, err := triggerLister.Triggers(metav1.NamespaceAll).List(selector)
if err != nil {
logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err))
return r
}
return append(r, triggers...)
for _, t := range triggers {
if feature.FromContext(ctx).IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == broker.Namespace {
r = append(r, t)
}
if t.Namespace == broker.Namespace {
r = append(r, t)
}
}
return r
}
6 changes: 4 additions & 2 deletions pkg/reconciler/broker/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ func TestGetTriggersForBroker(t *testing.T) {
ls := testingv1.NewListers(tt.in)
logger := logtesting.TestLogger(t)
triggerLister := ls.GetTriggerLister()
triggers := getTriggersForBroker(logger, triggerLister, ReadyBroker())
ctx := feature.ToContext(context.TODO(), feature.FromContextOrDefaults(context.TODO()))
triggers := getTriggersForBroker(ctx, logger, triggerLister, ReadyBroker())
var found []string
for _, want := range tt.out {
for _, got := range triggers {
Expand Down Expand Up @@ -347,7 +348,8 @@ func (failer *TriggerNamespaceListerFailer) Get(name string) (*eventing.Trigger,
func TestListFailure(t *testing.T) {
logger := logtesting.TestLogger(t)
triggerListerFailer := &TriggerListerFailer{}
if len(getTriggersForBroker(logger, triggerListerFailer, ReadyBroker())) != 0 {
ctx := feature.ToContext(context.TODO(), feature.FromContextOrDefaults(context.TODO()))
if len(getTriggersForBroker(ctx, logger, triggerListerFailer, ReadyBroker())) != 0 {
t.Fatalf("Got back triggers when not expecting any")
}
}
9 changes: 7 additions & 2 deletions pkg/reconciler/sugar/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
sugarconfig "knative.dev/eventing/pkg/apis/sugar"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
Expand Down Expand Up @@ -66,13 +67,17 @@ func NewController(
// Watch brokers.
brokerInformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) {
if b, ok := obj.(*v1.Broker); ok {
triggers, err := triggerInformer.Lister().Triggers(b.Namespace).List(labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: b.Name}))
triggers, err := triggerInformer.Lister().Triggers("").List(labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: b.Name}))
if err != nil {
logging.FromContext(ctx).Warnw("Failed to list triggers", zap.String("Namespace", b.Namespace), zap.String("Broker", b.Name))
return
}
for _, t := range triggers {
impl.Enqueue(t)
if feature.FromContext(ctx).IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == b.Namespace {
impl.Enqueue(t)
} else if t.Namespace == b.Namespace {
impl.Enqueue(t)
}
}
}
}))
Expand Down
10 changes: 10 additions & 0 deletions pkg/reconciler/testing/v1/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"knative.dev/pkg/ptr"

eventingv1 "knative.dev/eventing/pkg/apis/duck/v1"
apiseventing "knative.dev/eventing/pkg/apis/eventing"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
)
Expand Down Expand Up @@ -77,6 +78,15 @@ func WithTriggerSubscriberURI(rawurl string) TriggerOption {
}
}

func WithBrokerLabels(brokerName, brokerNamespace string) TriggerOption {
return func(t *v1.Trigger) {
if t.Labels == nil {
t.Labels = make(map[string]string)
}
t.Labels[apiseventing.BrokerLabelKey] = brokerName
}
}

func WithTriggerSubscriber(sub duckv1.Destination) TriggerOption {
if err := sub.Validate(context.Background()).Filter(apis.ErrorLevel); err != nil {
panic(err)
Expand Down

0 comments on commit 2f1f866

Please sign in to comment.