diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 5276b6f0a5e..ec46c0b3886 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -227,16 +227,24 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - var brokerRef, brokerNamespace string - if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { - brokerRef = trigger.Spec.BrokerRef.Name - brokerNamespace = trigger.Spec.BrokerRef.Namespace + var brokerName, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil { + if trigger.Spec.BrokerRef.Name != "" { + brokerName = trigger.Spec.BrokerRef.Name + } else { + brokerName = trigger.Spec.Broker + } + if trigger.Spec.BrokerRef.Namespace != "" { + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerNamespace = trigger.Namespace + } } else { - brokerRef = trigger.Spec.Broker + brokerName = trigger.Spec.Broker brokerNamespace = trigger.Namespace } - broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -248,7 +256,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: brokerRef, + broker: brokerName, requestType: "reply_forward", } @@ -265,16 +273,23 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - var brokerRef, brokerNamespace string - if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { - brokerRef = trigger.Spec.BrokerRef.Name - brokerNamespace = trigger.Spec.BrokerRef.Namespace + var brokerName, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil { + if trigger.Spec.BrokerRef.Name != "" { + brokerName = trigger.Spec.BrokerRef.Name + } else { + brokerName = trigger.Spec.Broker + } + if trigger.Spec.BrokerRef.Namespace != "" { + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerNamespace = trigger.Namespace + } } else { - brokerRef = trigger.Spec.Broker + brokerName = trigger.Spec.Broker brokerNamespace = trigger.Namespace } - - broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -299,7 +314,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerName, requestType: "dls_forward", } @@ -316,11 +331,15 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - var brokerRef string - if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { - brokerRef = trigger.Spec.BrokerRef.Name + var brokerName string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil { + if trigger.Spec.BrokerRef.Name != "" { + brokerName = trigger.Spec.BrokerRef.Name + } else { + brokerName = trigger.Spec.Broker + } } else { - brokerRef = trigger.Spec.Broker + brokerName = trigger.Spec.Broker } triggerRef := types.NamespacedName{ @@ -346,7 +365,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: brokerRef, + broker: brokerName, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), requestType: "filter", } diff --git a/pkg/reconciler/broker/trigger/controller.go b/pkg/reconciler/broker/trigger/controller.go index 57067e81929..0bf52aa69ea 100644 --- a/pkg/reconciler/broker/trigger/controller.go +++ b/pkg/reconciler/broker/trigger/controller.go @@ -163,6 +163,8 @@ func filterTriggers(featureStore *feature.Store, lister eventinglisters.BrokerLi return false } + var broker string + var brokerNamespace string if featureStore.IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil { broker = trigger.Spec.BrokerRef.Name brokerNamespace = trigger.Spec.BrokerRef.Namespace diff --git a/pkg/reconciler/broker/trigger/trigger.go b/pkg/reconciler/broker/trigger/trigger.go index 8d9578b5ae1..51166e638f5 100644 --- a/pkg/reconciler/broker/trigger/trigger.go +++ b/pkg/reconciler/broker/trigger/trigger.go @@ -57,8 +57,6 @@ import ( ) var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker") -var brokerNamespace string -var broker string const ( // Name of the corev1.Events emitted from the Trigger reconciliation process. @@ -91,6 +89,8 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event { logging.FromContext(ctx).Infow("Reconciling", zap.Any("Trigger", t)) + var broker string + var brokerNamespace string if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) { broker = t.Spec.BrokerRef.Name brokerNamespace = t.Spec.BrokerRef.Namespace diff --git a/test/config/config-features.yaml b/test/config/config-features.yaml index f5528595dbf..8fbccdd8d6e 100644 --- a/test/config/config-features.yaml +++ b/test/config/config-features.yaml @@ -54,7 +54,7 @@ data: # ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing. # For more details: https://github.com/knative/eventing/issues/7739 - cross-namespace-event-links: "disabled" + cross-namespace-event-links: "enabled" # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field # in APIServerSource objects with its rich filtering capabilities. diff --git a/test/e2e-common.sh b/test/e2e-common.sh index 2b02dbbe31f..d2e6cf9d4cf 100755 --- a/test/e2e-common.sh +++ b/test/e2e-common.sh @@ -107,7 +107,7 @@ function scale_controlplane() { } function create_knsubscribe_rolebinding() { - kubectl create clusterrolebinding knsubscribe-test-rb --user=$(kubectl auth whoami -ojson | jq .status.userInfo.username -r) --clusterrole=crossnamespace=subscriber + kubectl create clusterrolebinding knsubscribe-test-rb --user=$(kubectl auth whoami -ojson | jq .status.userInfo.username -r) --clusterrole=crossnamespace-subscriber } # Install Knative Monitoring in the current cluster. diff --git a/test/experimental/features/eventtype_autocreate/features.go b/test/experimental/features/eventtype_autocreate/features.go index 1c33d7abc70..f4eaa988ccd 100644 --- a/test/experimental/features/eventtype_autocreate/features.go +++ b/test/experimental/features/eventtype_autocreate/features.go @@ -120,7 +120,7 @@ func AutoCreateEventTypesOnBroker(brokerName string) *feature.Feature { sink := feature.MakeRandomK8sName("sink") f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) - f.Setup("install subscription", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(service.AsKReference(sink), ""))) + f.Setup("install subscription", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""))) f.Setup("trigger is ready", trigger.IsReady(triggerName)) f.Setup("broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) @@ -158,7 +158,7 @@ func AutoCreateEventTypesOnTrigger(brokerName string) *feature.Feature { replyData := "" f.Setup("install sink", eventshub.Install(sink, eventshub.ReplyWithTransformedEvent(replyType, replySource, replyData), eventshub.StartReceiver)) - f.Setup("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(service.AsDestinationRef(sink)), trigger.WithFilter(map[string]string{ + f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(service.AsDestinationRef(sink)), trigger.WithFilter(map[string]string{ "type": event.Type(), }))) @@ -194,7 +194,7 @@ func AutoCreateEventTypeEventsFromPingSource() *feature.Feature { f.Setup("broker is ready", broker.IsReady(brokerName)) f.Setup("broker is addressable", broker.IsAddressable(brokerName)) f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) - f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), ""))) + f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""))) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Requirement("install pingsource", func(ctx context.Context, t feature.T) { diff --git a/test/rekt/crossnamespace_test.go b/test/rekt/crossnamespace_test.go new file mode 100644 index 00000000000..fda6fe3e556 --- /dev/null +++ b/test/rekt/crossnamespace_test.go @@ -0,0 +1,54 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rekt + +import ( + "testing" + + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + + "knative.dev/eventing/test/rekt/features/trigger" +) + +func TestBrokerTriggerCrossNamespaceReference(t *testing.T) { + t.Parallel() + + brokerEnvCtx, _ := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + triggerEnvCtx, triggerEnv := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + // brokerEnv.Test(brokerEnvCtx, t, broker.GoesReady(brokerName)) + triggerEnv.Test(triggerEnvCtx, t, trigger.CrossNamespaceEventLinks(brokerEnvCtx)) +} diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index 40b31c6fa73..f856ca6984d 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -286,7 +286,7 @@ func SendsEventsWithEventTypes() *feature.Feature { f.Setup("broker is ready", broker.IsReady(brokerName)) f.Setup("broker is addressable", broker.IsAddressable(brokerName)) f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) - f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), ""))) + f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""))) f.Setup("trigger goes ready", trigger.IsReady(via)) sacmName := feature.MakeRandomK8sName("apiserversource") @@ -888,7 +888,7 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature { f.Setup("install trigger", func(ctx context.Context, t feature.T) { d := service.AsDestinationRef(sinkName) d.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t) + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d))(ctx, t) }) f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName)) diff --git a/test/rekt/features/broker/control_plane.go b/test/rekt/features/broker/control_plane.go index 3f943e9ed0f..d8e87399bbe 100644 --- a/test/rekt/features/broker/control_plane.go +++ b/test/rekt/features/broker/control_plane.go @@ -114,7 +114,7 @@ func ControlPlaneTrigger_GivenBroker(brokerName string) *feature.Feature { service.WithSelectors(map[string]string{"bad": "svc"}))) triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName, + f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName), triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""), )) @@ -142,7 +142,7 @@ func ControlPlaneTrigger_GivenBrokerTriggerReady(brokerName string) *feature.Fea service.WithSelectors(map[string]string{"bad": "svc"}))) triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName, + f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName), triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""), )) @@ -167,7 +167,7 @@ func ControlPlaneTrigger_WithBrokerLifecycle(brokerOpts ...manifest.CfgFn) *feat brokerName := feature.MakeRandomK8sName("broker") triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName, + f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName), triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""), )) @@ -211,7 +211,7 @@ func ControlPlaneTrigger_WithValidFilters(brokerName string) *feature.Feature { } triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName, + f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName), triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""), triggerresources.WithFilter(filters), )) @@ -261,7 +261,7 @@ func ControlPlaneTrigger_WithInvalidFilters(brokerName string) *feature.Feature } triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName, + f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName), triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""), )) diff --git a/test/rekt/features/broker/crossnamespace.go b/test/rekt/features/broker/crossnamespace.go new file mode 100644 index 00000000000..b1fd3c4e90a --- /dev/null +++ b/test/rekt/features/broker/crossnamespace.go @@ -0,0 +1,41 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "fmt" + + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" +) + +func GoesReadyInDifferentNamespace(name, namespace string, cfg ...manifest.CfgFn) *feature.Feature { + f := new(feature.Feature) + f.Prerequisite("Cross Namespace Event Links is enabled", featureflags.CrossEventLinksEnabled()) + + // Add the namespace configuration + namespaceCfg := broker.WithConfigNamespace(namespace) + cfg = append(cfg, namespaceCfg) + + f.Setup(fmt.Sprintf("install broker %q in namespace %q", name, namespace), broker.Install(name, cfg...)) + f.Setup("Broker is ready", broker.IsReady(name)) + f.Setup("Broker is addressable", broker.IsAddressable(name)) + + return f +} diff --git a/test/rekt/features/broker/eventing_tls_feature.go b/test/rekt/features/broker/eventing_tls_feature.go index a1700550605..15d8894be55 100644 --- a/test/rekt/features/broker/eventing_tls_feature.go +++ b/test/rekt/features/broker/eventing_tls_feature.go @@ -72,7 +72,7 @@ func RotateMTChannelBrokerTLSCertificates() *feature.Feature { f.Setup("install trigger", func(ctx context.Context, t feature.T) { d := service.AsDestinationRef(sink) d.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t) + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d))(ctx, t) }) f.Setup("trigger is ready", trigger.IsReady(triggerName)) f.Setup("Broker has HTTPS address", broker.ValidateAddress(brokerName, addressable.AssertHTTPSAddress)) diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 5651422af99..1f0984942bf 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -140,9 +140,10 @@ func ManyTriggers() *feature.FeatureSet { trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithFilter(filter), trigger.WithExtensions(eventFilter.Extensions), + trigger.WithBrokerName(brokerName), } - trigger.Install(sink, brokerName, cfg...)(ctx, t) + trigger.Install(sink, cfg...)(ctx, t) } broker.IsReady(brokerName)(ctx, t) @@ -291,7 +292,7 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere // Install the trigger1 point to Broker and transform the original events to new events f.Setup("install trigger1", trigger.Install( trigger1, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithFilter(filter1), trigger.WithSubscriber(service.AsKReference(sink1), ""), )) @@ -299,7 +300,7 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere // Install the trigger2 point to Broker to filter all the events f.Setup("install trigger2", trigger.Install( trigger2, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithFilter(filter2), trigger.WithSubscriber(service.AsKReference(sink2), ""), )) @@ -321,7 +322,7 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere // Install the trigger3 point to Broker to filter the events after transformation point to channel f.Setup("install trigger3", trigger.Install( trigger3, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithFilter(filter3), trigger.WithSubscriber(channel.AsRef(channelName), ""), )) @@ -431,7 +432,7 @@ func brokerEventTransformationForTrigger() *feature.Feature { // Install the trigger1 point to Broker and transform the original events to new events f.Setup("install trigger1", trigger.Install( trigger1, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithFilter(filter1), trigger.WithSubscriber(service.AsKReference(sink1), ""), )) @@ -439,7 +440,7 @@ func brokerEventTransformationForTrigger() *feature.Feature { // Install the trigger2 point to Broker to filter all the events f.Setup("install trigger2", trigger.Install( trigger2, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithFilter(filter2), trigger.WithSubscriber(service.AsKReference(sink2), ""), )) @@ -499,10 +500,10 @@ func BrokerPreferHeaderCheck() *feature.Feature { f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) // Point the Trigger subscriber to the sink svc. - cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), "")} + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} // Install the trigger - f.Setup("install trigger", trigger.Install(via, brokerName, cfg...)) + f.Setup("install trigger", trigger.Install(via, cfg...)) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Requirement("install source", eventshub.Install( @@ -560,10 +561,10 @@ func brokerRedeliveryFibonacci(retryNum int32) *feature.Feature { f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) // Point the Trigger subscriber to the sink svc. - cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), "")} + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} // Install the trigger - f.Setup("install trigger", trigger.Install(via, brokerName, cfg...)) + f.Setup("install trigger", trigger.Install(via, cfg...)) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Requirement("install source", eventshub.Install( @@ -615,10 +616,10 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) // Point the Trigger subscriber to the sink svc. - cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), "")} + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} // Install the trigger - f.Setup("install trigger", trigger.Install(via, brokerName, cfg...)) + f.Setup("install trigger", trigger.Install(via, cfg...)) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Requirement("install source", eventshub.Install( @@ -689,7 +690,7 @@ func brokerSubscriberUnreachable() *feature.Feature { // Install the trigger and Point the Trigger subscriber to the sink svc. f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(nil, subscriberUri), trigger.WithDeadLetterSink(service.AsKReference(sink), ""), )) @@ -748,7 +749,7 @@ func brokerSubscriberErrorNodata() *feature.Feature { // Install the trigger and Point the Trigger subscriber to the sink svc. f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(failer), ""), trigger.WithDeadLetterSink(service.AsKReference(sink), ""), )) @@ -810,7 +811,7 @@ func brokerSubscriberErrorWithdata() *feature.Feature { // Install the trigger and Point the Trigger subscriber to the sink svc. f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(failer), ""), trigger.WithDeadLetterSink(service.AsKReference(sink), ""), )) @@ -893,7 +894,7 @@ func brokerSubscriberLongMessage() *feature.Feature { // Install the trigger and Point the Trigger subscriber to the sink svc. f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""), )) f.Setup("trigger goes ready", trigger.IsReady(triggerName)) @@ -968,7 +969,7 @@ func brokerSubscriberLongResponseMessage() *feature.Feature { // Install the Triggers with appropriate Sinks and filters f.Setup("install trigger1", trigger.Install( trigger1, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink1), ""), trigger.WithFilter(map[string]string{"type": eventType1, "source": eventSource1}), )) @@ -976,7 +977,7 @@ func brokerSubscriberLongResponseMessage() *feature.Feature { f.Setup("install trigger2", trigger.Install( trigger2, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink2), ""), trigger.WithFilter(map[string]string{"type": eventType2, "source": eventSource2}), )) diff --git a/test/rekt/features/broker/oidc_feature.go b/test/rekt/features/broker/oidc_feature.go index 270212228e1..c6d688a18bd 100644 --- a/test/rekt/features/broker/oidc_feature.go +++ b/test/rekt/features/broker/oidc_feature.go @@ -77,7 +77,7 @@ func BrokerSendEventWithOIDCTokenToSubscriber() *feature.Feature { d := service.AsDestinationRef(sink) d.CACerts = eventshub.GetCaCerts(ctx) d.Audience = &sinkAudience - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t) + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d))(ctx, t) }) f.Setup("trigger goes ready", trigger.IsReady(triggerName)) @@ -135,7 +135,7 @@ func BrokerSendEventWithOIDCTokenToDLS() *feature.Feature { d := duckv1.Destination{} d.CACerts = eventshub.GetCaCerts(ctx) d.URI, _ = apis.ParseURL("bad://uri") - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(&d))(ctx, t) + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(&d))(ctx, t) }) @@ -201,7 +201,7 @@ func BrokerSendEventWithOIDCTokenToReply() *feature.Feature { f.Setup("install the trigger", func(ctx context.Context, t feature.T) { d := service.AsDestinationRef(subscriber) d.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d), trigger.WithFilter(map[string]string{ + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d), trigger.WithFilter(map[string]string{ "type": event.Type(), }))(ctx, t) }) @@ -211,7 +211,7 @@ func BrokerSendEventWithOIDCTokenToReply() *feature.Feature { f.Setup("install the trigger and specify the CA cert of the destination", func(ctx context.Context, t feature.T) { d := service.AsDestinationRef(reply) d.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(helperTriggerName, brokerName, trigger.WithSubscriberFromDestination(d), trigger.WithFilter(map[string]string{ + trigger.Install(helperTriggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d), trigger.WithFilter(map[string]string{ "type": replyEventType, }))(ctx, t) }) diff --git a/test/rekt/features/broker/readyness.go b/test/rekt/features/broker/readyness.go index bce002cfd02..784bfb8158f 100644 --- a/test/rekt/features/broker/readyness.go +++ b/test/rekt/features/broker/readyness.go @@ -37,10 +37,10 @@ func TriggerGoesReady(name, brokerName string, cfg ...manifest.CfgFn) *feature.F f.Setup("install a service", service.Install(sub, service.WithSelectors(map[string]string{"app": "rekt"}))) // Append user-provided cfg to the end, in case they are providing their own subscriber. - cfg = append([]manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sub), "")}, cfg...) + cfg = append([]manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sub), ""), trigger.WithBrokerName(brokerName)}, cfg...) // Install the trigger - f.Setup(fmt.Sprintf("install trigger %q", name), trigger.Install(name, brokerName, cfg...)) + f.Setup(fmt.Sprintf("install trigger %q", name), trigger.Install(name, cfg...)) // Wait for a ready broker. f.Setup("Broker is ready", broker.IsReady(brokerName)) diff --git a/test/rekt/features/broker/source_to_sink.go b/test/rekt/features/broker/source_to_sink.go index c3a7d50f73d..4c59c150ccb 100644 --- a/test/rekt/features/broker/source_to_sink.go +++ b/test/rekt/features/broker/source_to_sink.go @@ -46,10 +46,10 @@ func SourceToSink(brokerName string) *feature.Feature { f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) // Point the Trigger subscriber to the sink svc. - cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), "")} + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} // Install the trigger - f.Setup("install trigger", trigger.Install(via, brokerName, cfg...)) + f.Setup("install trigger", trigger.Install(via, cfg...)) f.Setup("trigger goes ready", trigger.IsReady(via)) @@ -85,7 +85,7 @@ func SourceToSinkWithDLQ() *feature.Feature { brokerConfig := append(broker.WithEnvConfig(), delivery.WithDeadLetterSink(service.AsKReference(dls), "")) f.Setup("install broker", broker.Install(brokerName, brokerConfig...)) f.Setup("Broker is ready", broker.IsReady(brokerName)) - f.Setup("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(nil, "bad://uri"))) + f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(nil, "bad://uri"))) f.Setup("trigger is ready", trigger.IsReady(triggerName)) ce := FullEvent() @@ -123,7 +123,7 @@ func SourceToSinkWithFlakyDLQ(brokerName string) *feature.Feature { f.Setup("install dlq", eventshub.Install(dlq, eventshub.StartReceiver)) f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver, eventshub.DropFirstN(2))) f.Setup("update broker with DLQ", broker.Install(brokerName, broker.WithDeadLetterSink(service.AsKReference(dlq), ""))) - f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), ""))) + f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""))) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Setup("broker goes ready", broker.IsReady(via)) diff --git a/test/rekt/features/broker/topology.go b/test/rekt/features/broker/topology.go index ef3adbb8ac8..cad9b96c374 100644 --- a/test/rekt/features/broker/topology.go +++ b/test/rekt/features/broker/topology.go @@ -88,7 +88,7 @@ func createBrokerTriggerTopology(f *feature.Feature, brokerName string, brokerDS f.Setup("install recorder for "+name, prober.ReceiverInstall(name, nameOpts...)) f.Setup("install recorder for "+dlqName, prober.ReceiverInstall(dlqName)) - tOpts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference(name), "")} + tOpts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference(name), ""), triggerresources.WithBrokerName(brokerName)} if t.delivery != nil { if t.delivery.DeadLetterSink != nil { @@ -104,7 +104,7 @@ func createBrokerTriggerTopology(f *feature.Feature, brokerName string, brokerDS triggerName := feature.MakeRandomK8sName(name) f.Setup("Create Trigger"+strconv.Itoa(i)+" with recorder", - triggerresources.Install(triggerName, brokerName, tOpts...)) + triggerresources.Install(triggerName, tOpts...)) f.Setup("Trigger"+strconv.Itoa(i)+" is ready", triggerresources.IsReady(triggerName)) diff --git a/test/rekt/features/featureflags/featureflags.go b/test/rekt/features/featureflags/featureflags.go index 8c1a6c5b6b9..042a46692e8 100644 --- a/test/rekt/features/featureflags/featureflags.go +++ b/test/rekt/features/featureflags/featureflags.go @@ -74,6 +74,20 @@ func AuthenticationOIDCEnabled() feature.ShouldRun { } } +func CrossEventLinksEnabled() feature.ShouldRun { + return func(ctx context.Context, t feature.T) (feature.PrerequisiteResult, error) { + flags, err := getFeatureFlags(ctx, "config-features") + if err != nil { + return feature.PrerequisiteResult{}, err + } + + return feature.PrerequisiteResult{ + ShouldRun: flags.IsCrossNamespaceEventLinks(), + Reason: flags.String(), + }, nil + } +} + func IstioDisabled() feature.ShouldRun { return func(ctx context.Context, t feature.T) (feature.PrerequisiteResult, error) { flags, err := getFeatureFlags(ctx, "config-features") diff --git a/test/rekt/features/new_trigger_filters/feature.go b/test/rekt/features/new_trigger_filters/feature.go index ee95bab4e32..f331b39b999 100644 --- a/test/rekt/features/new_trigger_filters/feature.go +++ b/test/rekt/features/new_trigger_filters/feature.go @@ -351,15 +351,17 @@ func MultipleTriggersAndSinksFeature(installBroker InstallBrokerFunc) *feature.F trigger.WithSubscriber(service.AsKReference(subscriberName1), ""), trigger.WithNewFilters(filtersFirstTrigger), trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes), + trigger.WithBrokerName(brokerName), } - trigger.Install(triggerName1, brokerName, triggerCfg1...)(ctx, t) + trigger.Install(triggerName1, triggerCfg1...)(ctx, t) triggerCfg2 := []manifest.CfgFn{ trigger.WithSubscriber(service.AsKReference(subscriberName2), ""), trigger.WithNewFilters(filtersSecondTrigger), trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes), + trigger.WithBrokerName(brokerName), } - trigger.Install(triggerName2, brokerName, triggerCfg2...)(ctx, t) + trigger.Install(triggerName2, triggerCfg2...)(ctx, t) broker.IsReady(brokerName)(ctx, t) broker.IsAddressable(brokerName)(ctx, t) diff --git a/test/rekt/features/new_trigger_filters/filters.go b/test/rekt/features/new_trigger_filters/filters.go index b07d66f8110..0968056821a 100644 --- a/test/rekt/features/new_trigger_filters/filters.go +++ b/test/rekt/features/new_trigger_filters/filters.go @@ -46,9 +46,10 @@ func createNewFiltersFeature(f *feature.Feature, eventContexts []CloudEventsCont trigger.WithSubscriber(service.AsKReference(subscriberName), ""), trigger.WithNewFilters(filters), trigger.WithFilter(filter.Attributes), + trigger.WithBrokerName(brokerName), } - trigger.Install(triggerName, brokerName, triggerCfg...)(ctx, t) + trigger.Install(triggerName, triggerCfg...)(ctx, t) broker.IsReady(brokerName)(ctx, t) broker.IsAddressable(brokerName)(ctx, t) diff --git a/test/rekt/features/pingsource/features.go b/test/rekt/features/pingsource/features.go index f59858d6d8c..2770cd09f4f 100644 --- a/test/rekt/features/pingsource/features.go +++ b/test/rekt/features/pingsource/features.go @@ -204,7 +204,7 @@ func SendsEventsWithEventTypes() *feature.Feature { f.Setup("broker is ready", broker.IsReady(brokerName)) f.Setup("broker is addressable", broker.IsAddressable(brokerName)) f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) - f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), ""))) + f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), ""))) f.Setup("trigger goes ready", trigger.IsReady(via)) f.Requirement("install pingsource", func(ctx context.Context, t feature.T) { @@ -253,7 +253,7 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature { f.Setup("install trigger", func(ctx context.Context, t feature.T) { d := service.AsDestinationRef(sinkName) d.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t) + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d))(ctx, t) }) f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName)) diff --git a/test/rekt/features/trigger/control_plane.go b/test/rekt/features/trigger/control_plane.go index bd3c70be247..c6ed3819208 100644 --- a/test/rekt/features/trigger/control_plane.go +++ b/test/rekt/features/trigger/control_plane.go @@ -48,7 +48,7 @@ func Defaulting_Filter() *feature.Feature { f.Setup("Set Trigger name", SetTriggerName(resourceName)) f.Setup("Create a Trigger with empty spec.filter", - triggerresources.Install(resourceName, "broker", withSubscriber)) + triggerresources.Install(resourceName, triggerresources.WithBrokerName("broker"), withSubscriber)) f.Stable("Conformance"). Must("Trigger MUST default spec.filter to empty filter", @@ -65,7 +65,7 @@ func Defaulting_SubscriberNamespace() *feature.Feature { f.Setup("Set Trigger name", SetTriggerName(resourceName)) f.Setup("Create a Trigger with empty subscriber namespace", - triggerresources.Install(resourceName, "broker", withSubscriber)) + triggerresources.Install(resourceName, triggerresources.WithBrokerName("broker"), withSubscriber)) f.Stable("Conformance"). Must("Trigger subscriber namespace MUST be defaulted to Trigger namespace", diff --git a/test/rekt/features/trigger/crossnamespace.go b/test/rekt/features/trigger/crossnamespace.go new file mode 100644 index 00000000000..dae0a7a90cb --- /dev/null +++ b/test/rekt/features/trigger/crossnamespace.go @@ -0,0 +1,81 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trigger + +import ( + "context" + + cetest "github.com/cloudevents/sdk-go/v2/test" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/trigger" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/pkg/resources/service" +) + +func CrossNamespaceEventLinks(brokerEnvCtx context.Context) *feature.Feature { + f := feature.NewFeature() + + f.Prerequisite("Cross Namespace Event Links is enabled", featureflags.CrossEventLinksEnabled()) + + sourceName := feature.MakeRandomK8sName("source") + subscriberName := feature.MakeRandomK8sName("subscriber") + + ev := cetest.FullEvent() + + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + brokerNamespace := environment.FromContext(brokerEnvCtx).Namespace() + + brokerRef := &duckv1.KReference{ + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + Name: brokerName, + Namespace: brokerNamespace, + } + + triggerCfg := []manifest.CfgFn{ + trigger.WithSubscriber(service.AsKReference(subscriberName), ""), + trigger.WithBrokerRef(brokerRef), + } + + f.Setup("install broker", broker.Install(brokerName, broker.WithNamespace(brokerNamespace))) + f.Setup("install trigger", trigger.Install(triggerName, triggerCfg...)) + + f.Setup("install subscriber", eventshub.Install(subscriberName, eventshub.StartReceiver)) + + // .IsReady uses the environment in the context to find the resource, hence we can only check the trigger + // However, the trigger being ready implies the broker is ready, so we are okay + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + f.Requirement("install event source", eventshub.Install(sourceName, eventshub.StartSenderToNamespacedResource(broker.GVR(), brokerName, brokerNamespace), eventshub.InputEvent(ev))) + + f.Assert("event is received by subscriber", assert.OnStore(subscriberName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + f.Teardown("delete trigger", func(ctx context.Context, t feature.T) { + env := environment.FromContext(ctx) + eventingclient.Get(ctx).EventingV1().Triggers(env.Namespace()).Delete(ctx, triggerName, metav1.DeleteOptions{}) + }) + + return f +} diff --git a/test/rekt/features/trigger/feature.go b/test/rekt/features/trigger/feature.go index 219e48535ce..30d7d69872e 100644 --- a/test/rekt/features/trigger/feature.go +++ b/test/rekt/features/trigger/feature.go @@ -64,10 +64,11 @@ func TriggerDependencyAnnotation() *feature.Feature { cfg := []manifest.CfgFn{ trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithAnnotations(annotations), + trigger.WithBrokerName(brokerName), } // Install the trigger - f.Setup("install trigger", trigger.Install(triggerName, brokerName, cfg...)) + f.Setup("install trigger", trigger.Install(triggerName, cfg...)) // trigger won't go ready until after the pingsource exists, because of the dependency annotation f.Requirement("trigger goes ready", trigger.IsReady(triggerName)) @@ -123,7 +124,7 @@ func TriggerWithTLSSubscriber() *feature.Feature { subscriber := service.AsDestinationRef(sinkName) subscriber.CACerts = eventshub.GetCaCerts(ctx) - trigger.Install(triggerName, brokerName, + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(subscriber))(ctx, t) }) f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName)) @@ -133,7 +134,7 @@ func TriggerWithTLSSubscriber() *feature.Feature { dls.CACerts = eventshub.GetCaCerts(ctx) linear := eventingv1.BackoffPolicyLinear - trigger.Install(dlsTriggerName, brokerName, + trigger.Install(dlsTriggerName, trigger.WithBrokerName(brokerName), trigger.WithRetry(2, &linear, pointer.String("PT1S")), trigger.WithDeadLetterSinkFromDestination(dls), trigger.WithSubscriber(nil, "http://127.0.0.1:2468"))(ctx, t) @@ -196,7 +197,7 @@ func TriggerWithTLSSubscriberTrustBundle() *feature.Feature { CACerts: nil, // CA certs are in the trust-bundle } - trigger.Install(triggerName, brokerName, + trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(subscriber))(ctx, t) }) f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName)) @@ -211,7 +212,7 @@ func TriggerWithTLSSubscriberTrustBundle() *feature.Feature { } linear := eventingv1.BackoffPolicyLinear - trigger.Install(dlsTriggerName, brokerName, + trigger.Install(dlsTriggerName, trigger.WithBrokerName(brokerName), trigger.WithRetry(2, &linear, pointer.String("PT1S")), trigger.WithDeadLetterSinkFromDestination(dls), trigger.WithSubscriber(nil, "http://127.0.0.1:2468"))(ctx, t) diff --git a/test/rekt/features/trigger/trigger_sink_resolution.go b/test/rekt/features/trigger/trigger_sink_resolution.go index fbdb89dcfd4..ceb8eacd622 100644 --- a/test/rekt/features/trigger/trigger_sink_resolution.go +++ b/test/rekt/features/trigger/trigger_sink_resolution.go @@ -53,7 +53,7 @@ func SourceToTriggerSinkWithDLS() *feature.Feature { // Setup trigger f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(nil, "bad://uri"), delivery.WithDeadLetterSink(prober.AsKReference(triggerSinkName), ""))) @@ -105,7 +105,7 @@ func SourceToTriggerSinkWithDLSDontUseBrokers() *feature.Feature { f.Setup("install trigger", trigger.Install( triggerName, - brokerName, + trigger.WithBrokerName(brokerName), trigger.WithSubscriber(nil, "bad://uri"), delivery.WithDeadLetterSink(prober.AsKReference(triggerSinkName), ""))) @@ -154,8 +154,8 @@ func BadTriggerDoesNotAffectOkTrigger() *feature.Feature { f.Setup("Broker is ready", broker.IsReady(brokerName)) prober.SetTargetResource(broker.GVR(), brokerName) - f.Setup("install trigger via1", trigger.Install(via1, brokerName, trigger.WithSubscriber(nil, "bad://uri"))) - f.Setup("install trigger via2", trigger.Install(via2, brokerName, trigger.WithSubscriber(prober.AsKReference(sink), ""))) + f.Setup("install trigger via1", trigger.Install(via1, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(nil, "bad://uri"))) + f.Setup("install trigger via2", trigger.Install(via2, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(prober.AsKReference(sink), ""))) // Resources ready. f.Setup("trigger1 goes ready", trigger.IsReady(via1)) diff --git a/test/rekt/resources/broker/broker.go b/test/rekt/resources/broker/broker.go index f2b84459040..1887f9f26fd 100644 --- a/test/rekt/resources/broker/broker.go +++ b/test/rekt/resources/broker/broker.go @@ -104,6 +104,12 @@ func WithConfig(name string) manifest.CfgFn { } } +func WithNamespace(namespace string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + cfg["namespace"] = namespace + } +} + // WithConfigNamespace adds the specified config map namespace to the Broker spec. func WithConfigNamespace(namespace string) manifest.CfgFn { return func(cfg map[string]interface{}) { diff --git a/test/rekt/resources/trigger/trigger.go b/test/rekt/resources/trigger/trigger.go index e6ea82a71fe..975b5f67395 100644 --- a/test/rekt/resources/trigger/trigger.go +++ b/test/rekt/resources/trigger/trigger.go @@ -157,6 +157,49 @@ func WithExtensions(extensions map[string]interface{}) manifest.CfgFn { } } +func WithBrokerName(brokerName string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if brokerName != "" { + cfg["brokerName"] = brokerName + } + } +} + +// WithBrokerRef adds the brokerRef related config to a Trigger spec. +func WithBrokerRef(ref *duckv1.KReference) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["brokerRef"]; !set { + cfg["brokerRef"] = map[string]interface{}{} + } + brokerRef := cfg["brokerRef"].(map[string]interface{}) + + if ref != nil { + brokerRef["apiVersion"] = ref.APIVersion + brokerRef["kind"] = ref.Kind + brokerRef["name"] = ref.Name + brokerRef["namespace"] = ref.Namespace + } + + cfg["brokerRef"] = brokerRef + } +} + +// func WithBrokerRefName(brokerName string) manifest.CfgFn { +// return func(cfg map[string]interface{}) { +// if brokerName != "" { +// cfg["brokerRefName"] = brokerName +// } +// } +// } + +// func WithBrokerRefNamespace(brokerNamespace string) manifest.CfgFn { +// return func(cfg map[string]interface{}) { +// if brokerNamespace != "" { +// cfg["brokerRefNamespace"] = brokerNamespace +// } +// } +// } + // WithDeadLetterSink adds the dead letter sink related config to a Trigger spec. var WithDeadLetterSink = delivery.WithDeadLetterSink @@ -170,13 +213,10 @@ var WithRetry = delivery.WithRetry var WithTimeout = delivery.WithTimeout // Install will create a Trigger resource, augmented with the config fn options. -func Install(name, brokerName string, opts ...manifest.CfgFn) feature.StepFn { +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ "name": name, } - if len(brokerName) > 0 { - cfg["brokerName"] = brokerName - } for _, fn := range opts { fn(cfg) } diff --git a/test/rekt/resources/trigger/trigger.yaml b/test/rekt/resources/trigger/trigger.yaml index 77d02ad1907..9a27dcdf68b 100644 --- a/test/rekt/resources/trigger/trigger.yaml +++ b/test/rekt/resources/trigger/trigger.yaml @@ -27,6 +27,13 @@ spec: {{ if .brokerName }} broker: {{ .brokerName }} {{ end }} + {{ if .brokerRef }} + brokerRef: + kind: {{ .brokerRef.kind }} + namespace: {{ .brokerRef.namespace }} + name: {{ .brokerRef.name }} + apiVersion: {{ .brokerRef.apiVersion }} + {{ end }} {{ if .filter }} filter: attributes: