diff --git a/components/eventing-controller/api/v1alpha1/subscription_types.go b/components/eventing-controller/api/v1alpha1/subscription_types.go index 0b4ccd860d45..9f31b7bfd159 100644 --- a/components/eventing-controller/api/v1alpha1/subscription_types.go +++ b/components/eventing-controller/api/v1alpha1/subscription_types.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/mitchellh/hashstructure/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -104,6 +105,27 @@ func (bf *BebFilters) Deduplicate() (*BebFilters, error) { return result, nil } +type SubscriptionConfig struct { + // +optional + // +kubebuilder:validation:Minimum=1 + MaxInFlightMessages int `json:"maxInFlightMessages,omitempty"` +} + +// MergeSubsConfigs returns a valid subscription config object based on the provided config, +// complemented with default values, if necessary +func MergeSubsConfigs(config *SubscriptionConfig, defaults *env.DefaultSubscriptionConfig) *SubscriptionConfig { + merged := &SubscriptionConfig{ + MaxInFlightMessages: defaults.MaxInFlightMessages, + } + if config == nil { + return merged + } + if config.MaxInFlightMessages >= 1 { + merged.MaxInFlightMessages = config.MaxInFlightMessages + } + return merged +} + // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { // ID is the unique identifier of Subscription, read-only. @@ -123,6 +145,10 @@ type SubscriptionSpec struct { // Filter defines the list of filters Filter *BebFilters `json:"filter"` + + // Config defines the configurations that can be applied to the eventing backend when creating this subscription + // +optional + Config *SubscriptionConfig `json:"config,omitempty"` } type EmsSubscriptionStatus struct { @@ -180,6 +206,10 @@ type SubscriptionStatus struct { // EmsSubscriptionStatus defines the status of Subscription in BEB // +optional EmsSubscriptionStatus EmsSubscriptionStatus `json:"emsSubscriptionStatus,omitempty"` + + // Config defines the configurations that have been applied to the eventing backend when creating this subscription + // +optional + Config *SubscriptionConfig `json:"config,omitempty"` } // +kubebuilder:object:root=true diff --git a/components/eventing-controller/api/v1alpha1/subscription_types_test.go b/components/eventing-controller/api/v1alpha1/subscription_types_test.go index 90cb29c65c40..c117ec46a2fc 100644 --- a/components/eventing-controller/api/v1alpha1/subscription_types_test.go +++ b/components/eventing-controller/api/v1alpha1/subscription_types_test.go @@ -3,6 +3,8 @@ package v1alpha1 import ( "reflect" "testing" + + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" ) const ( @@ -85,3 +87,41 @@ func TestBebFilters_Deduplicate(t *testing.T) { }) } } + +func TestMergeSubsConfigs(t *testing.T) { + defaultConf := &env.DefaultSubscriptionConfig{MaxInFlightMessages: 4} + tests := []struct { + caseName string + inputConf *SubscriptionConfig + inputDefaults *env.DefaultSubscriptionConfig + expectedOutput *SubscriptionConfig + }{ + { + caseName: "nil input config", + inputConf: nil, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4}, + }, + { + caseName: "default is overridden", + inputConf: &SubscriptionConfig{MaxInFlightMessages: 10}, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 10}, + }, + { + caseName: "provided input is invalid", + inputConf: &SubscriptionConfig{MaxInFlightMessages: 0}, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4}, + }, + } + + for _, tt := range tests { + t.Run(tt.caseName, func(t *testing.T) { + got := MergeSubsConfigs(tt.inputConf, tt.inputDefaults) + if !reflect.DeepEqual(got, tt.expectedOutput) { + t.Errorf("MergeSubsConfigs() got = %v, want = %v", got, tt.expectedOutput) + } + }) + } +} diff --git a/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go b/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go index 697705c3185d..7e0a72823b7e 100644 --- a/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go +++ b/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go @@ -271,6 +271,21 @@ func (in *Subscription) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscriptionConfig) DeepCopyInto(out *SubscriptionConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionConfig. +func (in *SubscriptionConfig) DeepCopy() *SubscriptionConfig { + if in == nil { + return nil + } + out := new(SubscriptionConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionList) DeepCopyInto(out *SubscriptionList) { *out = *in @@ -316,6 +331,11 @@ func (in *SubscriptionSpec) DeepCopyInto(out *SubscriptionSpec) { *out = new(BebFilters) (*in).DeepCopyInto(*out) } + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(SubscriptionConfig) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionSpec. @@ -339,6 +359,11 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { } } out.EmsSubscriptionStatus = in.EmsSubscriptionStatus + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(SubscriptionConfig) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionStatus. diff --git a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml +++ b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml index b24def2199e5..46a56e86cff1 100644 --- a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml +++ b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB diff --git a/components/eventing-controller/pkg/commander/beb/beb.go b/components/eventing-controller/pkg/commander/beb/beb.go index d8b52ee1c1e6..c4b4f741d604 100644 --- a/components/eventing-controller/pkg/commander/beb/beb.go +++ b/components/eventing-controller/pkg/commander/beb/beb.go @@ -82,7 +82,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the manager. -func (c *Commander) Start(params commander.Params) error { +func (c *Commander) Start(_ env.DefaultSubscriptionConfig, params commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel dynamicClient := dynamic.NewForConfigOrDie(c.restCfg) diff --git a/components/eventing-controller/pkg/commander/commander.go b/components/eventing-controller/pkg/commander/commander.go index 642a5aeacf92..71001434535a 100644 --- a/components/eventing-controller/pkg/commander/commander.go +++ b/components/eventing-controller/pkg/commander/commander.go @@ -3,6 +3,7 @@ package commander import ( + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -14,7 +15,7 @@ type Commander interface { Init(mgr manager.Manager) error // Start runs the initialized commander instance. - Start(params Params) error + Start(defaultSubsConfig env.DefaultSubscriptionConfig, params Params) error // Stop tells the commander instance to shutdown and clean-up. Stop() error diff --git a/components/eventing-controller/pkg/commander/fake/commander.go b/components/eventing-controller/pkg/commander/fake/commander.go index 462fb499892b..96190c02ef29 100644 --- a/components/eventing-controller/pkg/commander/fake/commander.go +++ b/components/eventing-controller/pkg/commander/fake/commander.go @@ -2,6 +2,7 @@ package fake import ( "github.com/kyma-project/kyma/components/eventing-controller/pkg/commander" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers" "k8s.io/client-go/dynamic" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -16,7 +17,7 @@ func (c *Commander) Init(mgr manager.Manager) error { return nil } -func (c *Commander) Start(_ commander.Params) error { +func (c *Commander) Start(_ env.DefaultSubscriptionConfig, _ commander.Params) error { return nil } diff --git a/components/eventing-controller/pkg/commander/nats/nats.go b/components/eventing-controller/pkg/commander/nats/nats.go index ed79caefa0c4..7713ab61312b 100644 --- a/components/eventing-controller/pkg/commander/nats/nats.go +++ b/components/eventing-controller/pkg/commander/nats/nats.go @@ -73,7 +73,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the commander. -func (c *Commander) Start(_ commander.Params) error { +func (c *Commander) Start(defaultSubsConfig env.DefaultSubscriptionConfig, _ commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel @@ -87,6 +87,7 @@ func (c *Commander) Start(_ commander.Params) error { c.logger, c.mgr.GetEventRecorderFor("eventing-controller-nats"), c.envCfg, + defaultSubsConfig, ) c.backend = natsReconciler.Backend if err := natsReconciler.SetupUnmanaged(c.mgr); err != nil { diff --git a/components/eventing-controller/pkg/commander/nats/nats_test.go b/components/eventing-controller/pkg/commander/nats/nats_test.go index 10bf78a8c5a2..b5b8d520ca52 100644 --- a/components/eventing-controller/pkg/commander/nats/nats_test.go +++ b/components/eventing-controller/pkg/commander/nats/nats_test.go @@ -56,7 +56,8 @@ func TestCleanup(t *testing.T) { ReconnectWait: time.Second, EventTypePrefix: controllertesting.EventTypePrefix, } - natsBackend := handlers.NewNats(envConf, defaultLogger) + subsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 9} + natsBackend := handlers.NewNats(envConf, subsConfig, defaultLogger) natsCommander.Backend = natsBackend err = natsCommander.Backend.Initialize(env.Config{}) g.Expect(err).To(gomega.BeNil()) diff --git a/components/eventing-controller/pkg/env/backend_config.go b/components/eventing-controller/pkg/env/backend_config.go index 4852a1355b9a..e445149f6603 100644 --- a/components/eventing-controller/pkg/env/backend_config.go +++ b/components/eventing-controller/pkg/env/backend_config.go @@ -12,6 +12,8 @@ type BackendConfig struct { BackendCRNamespace string `envconfig:"BACKEND_CR_NAMESPACE" default:"kyma-system"` BackendCRName string `envconfig:"BACKEND_CR_NAME" default:"eventing-backend"` + + DefaultSubscriptionConfig DefaultSubscriptionConfig } type PublisherConfig struct { @@ -27,6 +29,10 @@ type PublisherConfig struct { LimitsMemory string `envconfig:"PUBLISHER_LIMITS_MEMORY" default:"128Mi"` } +type DefaultSubscriptionConfig struct { + MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"10"` +} + func GetBackendConfig() BackendConfig { cfg := BackendConfig{} if err := envconfig.Process("", &cfg); err != nil { diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 92a2e4a964b8..cd407fca16c6 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -5,10 +5,12 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "time" "github.com/kyma-project/kyma/components/eventing-controller/pkg/tracing" + "github.com/kyma-project/kyma/components/eventing-controller/utils" "k8s.io/apimachinery/pkg/types" @@ -22,30 +24,35 @@ import ( "github.com/kyma-project/kyma/components/eventing-controller/logger" "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers/eventtype" - "github.com/kyma-project/kyma/components/eventing-controller/utils" -) - -const ( - period = time.Minute - maxTries = 5 - natsHandlerName = "nats-handler" ) // compile time check var _ MessagingBackend = &Nats{} type Nats struct { - config env.NatsConfig - logger *logger.Logger - client cev2.Client - connection *nats.Conn - subscriptions map[string]*nats.Subscription + config env.NatsConfig + defaultSubsConfig env.DefaultSubscriptionConfig + logger *logger.Logger + client cev2.Client + connection *nats.Conn + subscriptions map[string]*nats.Subscription } -func NewNats(config env.NatsConfig, logger *logger.Logger) *Nats { - return &Nats{config: config, logger: logger, subscriptions: make(map[string]*nats.Subscription)} +func NewNats(config env.NatsConfig, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger) *Nats { + return &Nats{ + config: config, + logger: logger, + subscriptions: make(map[string]*nats.Subscription), + defaultSubsConfig: subsConfig, + } } +const ( + period = time.Minute + maxTries = 5 + natsHandlerName = "nats-handler" +) + // Initialize creates a connection to NATS. func (n *Nats) Initialize(env.Config) (err error) { if n.connection == nil || n.connection.Status() != nats.CONNECTED { @@ -96,6 +103,7 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even // Format logger log := utils.LoggerWithSubscription(n.namedLogger(), sub) + subsConfig := eventingv1alpha1.MergeSubsConfigs(sub.Spec.Config, &n.defaultSubsConfig) // Create subscriptions in NATS for _, filter := range filters { subject, err := createSubject(filter, cleaner) @@ -104,6 +112,8 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even return false, err } + callback := n.getCallback(sub.Spec.Sink) + if n.connection.Status() != nats.CONNECTED { if err := n.Initialize(env.Config{}); err != nil { log.Errorw("reset NATS connection failed", "status", n.connection.Stats(), "error", err) @@ -111,14 +121,16 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even } } - callback := n.getCallback(sub.Spec.Sink) - if natsSub, err := n.connection.Subscribe(subject, callback); err != nil { - log.Errorw("create NATS subscription failed", "error", err) - return false, err - } else { - n.subscriptions[createKey(sub, subject)] = natsSub + for i := 0; i < subsConfig.MaxInFlightMessages; i++ { + natsSub, subscribeErr := n.connection.QueueSubscribe(subject, subject, callback) + if subscribeErr != nil { + log.Errorw("create NATS subscription failed", "error", err) + return false, subscribeErr + } + n.subscriptions[createKey(sub, subject, i)] = natsSub } } + sub.Status.Config = subsConfig return false, nil } @@ -223,8 +235,12 @@ func createKeyPrefix(sub *eventingv1alpha1.Subscription) string { return fmt.Sprintf("%s", namespacedName.String()) } -func createKey(sub *eventingv1alpha1.Subscription, subject string) string { - return fmt.Sprintf("%s.%s", createKeyPrefix(sub), subject) +func createKeySuffix(subject string, queueGoupInstanceNo int) string { + return subject + string(types.Separator) + strconv.Itoa(queueGoupInstanceNo) +} + +func createKey(sub *eventingv1alpha1.Subscription, subject string, queueGoupInstanceNo int) string { + return fmt.Sprintf("%s.%s", createKeyPrefix(sub), createKeySuffix(subject, queueGoupInstanceNo)) } func createSubject(filter *eventingv1alpha1.BebFilter, cleaner eventtype.Cleaner) (string, error) { @@ -239,8 +255,8 @@ func createSubject(filter *eventingv1alpha1.BebFilter, cleaner eventtype.Cleaner func createKymaSubscriptionNamespacedName(key string, sub *nats.Subscription) types.NamespacedName { nsn := types.NamespacedName{} - nnvalues := strings.Split(strings.TrimSuffix(strings.TrimSuffix(key, sub.Subject), "."), string(types.Separator)) + nnvalues := strings.Split(key, string(types.Separator)) nsn.Namespace = nnvalues[0] - nsn.Name = nnvalues[1] + nsn.Name = strings.TrimSuffix(strings.TrimSuffix(nnvalues[1], sub.Subject), ".") return nsn } diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index 078083402c32..ff239cdc478a 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -9,9 +9,6 @@ import ( "time" "github.com/avast/retry-go" - "github.com/nats-io/nats.go" - . "github.com/onsi/gomega" - cev2event "github.com/cloudevents/sdk-go/v2/event" kymalogger "github.com/kyma-project/kyma/common/logging/logger" eventingv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" @@ -21,6 +18,8 @@ import ( "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers/eventtype" eventingtesting "github.com/kyma-project/kyma/components/eventing-controller/testing" + "github.com/nats-io/nats.go" + . "github.com/onsi/gomega" ) func TestConvertMsgToCE(t *testing.T) { @@ -86,6 +85,7 @@ func TestConvertMsgToCE(t *testing.T) { } func TestSubscription(t *testing.T) { + g := NewWithT(t) natsPort := 5222 subscriberPort := 8080 subscriberReceiveURL := fmt.Sprintf("http://127.0.0.1:%d/store", subscriberPort) @@ -100,16 +100,13 @@ func TestSubscription(t *testing.T) { t.Fatalf("initialize logger failed: %v", err) } - natsURL := natsServer.ClientURL() - natsClient := Nats{ - subscriptions: make(map[string]*nats.Subscription), - config: env.NatsConfig{ - Url: natsURL, - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + defaultMaxInflight := 9 + natsClient := NewNats(natsConfig, env.DefaultSubscriptionConfig{MaxInFlightMessages: defaultMaxInflight}, defaultLogger) if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("connect to Nats server failed: %v", err) @@ -140,10 +137,12 @@ func TestSubscription(t *testing.T) { if err != nil { t.Fatalf("sync subscription failed: %v", err) } + g.Expect(sub.Status.Config).NotTo(BeNil()) // It should apply the defaults + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultMaxInflight)) data := "sampledata" // Send an event - err = SendEventToNATS(&natsClient, data) + err = SendEventToNATS(natsClient, data) if err != nil { t.Fatalf("publish event failed: %v", err) } @@ -163,7 +162,7 @@ func TestSubscription(t *testing.T) { newData := "test-data" // Send an event - err = SendEventToNATS(&natsClient, newData) + err = SendEventToNATS(natsClient, newData) if err != nil { t.Fatalf("publish event failed: %v", err) } @@ -197,15 +196,13 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { t.Fatalf("initialize logger failed: %v", err) } - natsClient := Nats{ - subscriptions: map[string]*nats.Subscription{}, - config: env.NatsConfig{ - Url: natsServer.ClientURL(), - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + natsClient := NewNats(natsConfig, env.DefaultSubscriptionConfig{MaxInFlightMessages: 9}, defaultLogger) + if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("start NATS eventing backend failed: %s", err.Error()) } @@ -244,7 +241,7 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { data := "sampledata" // Send an event - if err := SendEventToNATS(&natsClient, data); err != nil { + if err := SendEventToNATS(natsClient, data); err != nil { t.Fatalf("publish event failed: %v", err) } @@ -258,6 +255,90 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { g.Expect(err).Should(HaveOccurred()) } +func TestSubscriptionWithMaxInFlightChange(t *testing.T) { + g := NewWithT(t) + + natsPort := 5223 + subscriberPort := 8080 + subscriberReceiveURL := fmt.Sprintf("http://127.0.0.1:%d/store", subscriberPort) + + // Start NATS server + natsServer := eventingtesting.RunNatsServerOnPort(natsPort) + defer eventingtesting.ShutDownNATSServer(natsServer) + + defaultLogger, err := logger.New(string(kymalogger.JSON), string(kymalogger.INFO)) + if err != nil { + t.Fatalf("initialize logger failed: %v", err) + } + + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, + } + defaultSubsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 5} + natsBackend := NewNats(natsConfig, defaultSubsConfig, defaultLogger) + + if err := natsBackend.Initialize(env.Config{}); err != nil { + t.Fatalf("connect to NATS server failed: %v", err) + } + + // Prepare event-type cleaner + application := applicationtest.NewApplication(eventingtesting.ApplicationNameNotClean, nil) + applicationLister := fake.NewApplicationListerOrDie(context.Background(), application) + cleaner := eventtype.NewCleaner(eventingtesting.EventTypePrefix, applicationLister, defaultLogger) + + // Create a subscription + sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) + sub.Spec.Sink = subscriberReceiveURL + _, err = natsBackend.SyncSubscription(sub, cleaner) + if err != nil { + t.Fatalf("sync subscription failed: %v", err) + } + + filter := sub.Spec.Filter.Filters[0] + subject, err := createSubject(filter, cleaner) + g.Expect(err).ShouldNot(HaveOccurred()) + g.Expect(subject).To(Not(BeEmpty())) + + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + + // get internal key + var key string + var natsSub *nats.Subscription + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { + key = createKey(sub, subject, i) + natsSub = natsBackend.subscriptions[key] + g.Expect(natsSub).To(Not(BeNil())) + g.Expect(natsSub.IsValid()).To(BeTrue()) + } + g.Expect(len(natsBackend.subscriptions)).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + + // check that no invalid subscriptions exist + invalidNsn := natsBackend.GetInvalidSubscriptions() + g.Expect(len(*invalidNsn)).To(BeZero()) + + sub.Spec.Config = &eventingv1alpha1.SubscriptionConfig{MaxInFlightMessages: 7} + _, err = natsBackend.SyncSubscription(sub, cleaner) + if err != nil { + t.Fatalf("sync subscription failed: %v", err) + } + + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(sub.Spec.Config.MaxInFlightMessages)) + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { + key = createKey(sub, subject, i) + natsSub = natsBackend.subscriptions[key] + g.Expect(natsSub).To(Not(BeNil())) + g.Expect(natsSub.IsValid()).To(BeTrue()) + } + g.Expect(len(natsBackend.subscriptions)).To(Equal(sub.Spec.Config.MaxInFlightMessages)) + // check that no invalid subscriptions exist + invalidNsn = natsBackend.GetInvalidSubscriptions() + g.Expect(len(*invalidNsn)).To(BeZero()) +} + func TestIsValidSubscription(t *testing.T) { g := NewWithT(t) @@ -274,16 +355,13 @@ func TestIsValidSubscription(t *testing.T) { } // Create NATS client - natsURL := natsServer.ClientURL() - natsClient := Nats{ - subscriptions: make(map[string]*nats.Subscription), - config: env.NatsConfig{ - Url: natsURL, - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + defaultSubsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 9} + natsClient := NewNats(natsConfig, defaultSubsConfig, defaultLogger) if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("connect to NATS server failed: %v", err) @@ -308,12 +386,18 @@ func TestIsValidSubscription(t *testing.T) { g.Expect(err).ShouldNot(HaveOccurred()) g.Expect(subject).To(Not(BeEmpty())) - // get internal key - key := createKey(sub, subject) - g.Expect(key).To(Not(BeEmpty())) - natsSub := natsClient.subscriptions[key] - g.Expect(natsSub).To(Not(BeNil())) + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + // get internal key + var key string + var natsSub *nats.Subscription + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { + key = createKey(sub, subject, i) + g.Expect(key).To(Not(BeEmpty())) + natsSub = natsClient.subscriptions[key] + g.Expect(natsSub).To(Not(BeNil())) + } // check the mapping of Kyma subscription and Nats subscription nsn := createKymaSubscriptionNamespacedName(key, natsSub) g.Expect(nsn.Namespace).To(BeIdenticalTo(sub.Namespace)) @@ -341,7 +425,7 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription exist invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(1)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(sub.Status.Config.MaxInFlightMessages)) // restart NATS server natsServer = eventingtesting.RunNatsServerOnPort(natsPort) @@ -349,7 +433,12 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription still exist, the controller is not running... invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(1)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(sub.Status.Config.MaxInFlightMessages)) + +} + +func checkIsValid(sub *nats.Subscription, t *testing.T) error { + return checkValidity(sub, true, t) } func checkIsNotValid(sub *nats.Subscription, t *testing.T) error { diff --git a/components/eventing-controller/pkg/handlers/utils.go b/components/eventing-controller/pkg/handlers/utils.go index aabd43d0f78b..05ae86adcc12 100644 --- a/components/eventing-controller/pkg/handlers/utils.go +++ b/components/eventing-controller/pkg/handlers/utils.go @@ -33,6 +33,7 @@ type MessagingBackend interface { // SyncSubscription should synchronize the Kyma eventing susbscription with the susbcriber infrastructure of messaging backend system. // It should return true if Kyma eventing subscription status was changed during this synchronization process. + // It sets subscription.status.config with configurations that were applied on the messaging backend when creating the subscription. // TODO: Give up the usage of variadic parameters in the favor of using only subscription as input parameter. // TODO: This should contain all the infos necessary for the handler to do its job. SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, error) diff --git a/components/eventing-controller/reconciler/backend/reconciler.go b/components/eventing-controller/reconciler/backend/reconciler.go index 467be6826b5b..6d533209dfc1 100644 --- a/components/eventing-controller/reconciler/backend/reconciler.go +++ b/components/eventing-controller/reconciler/backend/reconciler.go @@ -706,7 +706,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *Reconciler) startNATSController() error { if !r.natsCommanderStarted { - if err := r.natsCommander.Start(commander.Params{}); err != nil { + if err := r.natsCommander.Start(r.cfg.DefaultSubscriptionConfig, commander.Params{}); err != nil { r.namedLogger().Errorw("start NATS commander failed", "error", err) return err } @@ -731,7 +731,7 @@ func (r *Reconciler) stopNATSController() error { func (r *Reconciler) startBEBController(clientID, clientSecret string) error { if !r.bebCommanderStarted { bebCommanderParams := commander.Params{"client_id": clientID, "client_secret": clientSecret} - if err := r.bebCommander.Start(bebCommanderParams); err != nil { + if err := r.bebCommander.Start(r.cfg.DefaultSubscriptionConfig, bebCommanderParams); err != nil { r.namedLogger().Errorw("start BEB commander failed", "error", err) return err } diff --git a/components/eventing-controller/reconciler/backend/reconciler_test.go b/components/eventing-controller/reconciler/backend/reconciler_test.go index ab21eaba619a..c1281d728c14 100644 --- a/components/eventing-controller/reconciler/backend/reconciler_test.go +++ b/components/eventing-controller/reconciler/backend/reconciler_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" + "github.com/go-logr/zapr" hydrav1alpha1 "github.com/ory/hydra-maester/api/v1alpha1" "github.com/stretchr/testify/assert" @@ -635,7 +637,7 @@ func (t *TestCommander) Init(_ manager.Manager) error { return nil } -func (t *TestCommander) Start(_ commander.Params) error { +func (t *TestCommander) Start(_ env.DefaultSubscriptionConfig, _ commander.Params) error { return t.startErr } diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler.go b/components/eventing-controller/reconciler/subscription-nats/reconciler.go index 0c75b1ed6d18..49febd255206 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler.go @@ -55,8 +55,8 @@ const ( reconcilerName = "nats-subscription-reconciler" ) -func NewReconciler(ctx context.Context, client client.Client, applicationLister *application.Lister, cache cache.Cache, logger *logger.Logger, recorder record.EventRecorder, cfg env.NatsConfig) *Reconciler { - natsHandler := handlers.NewNats(cfg, logger) +func NewReconciler(ctx context.Context, client client.Client, applicationLister *application.Lister, cache cache.Cache, logger *logger.Logger, recorder record.EventRecorder, cfg env.NatsConfig, subsCfg env.DefaultSubscriptionConfig) *Reconciler { + natsHandler := handlers.NewNats(cfg, subsCfg, logger) if err := natsHandler.Initialize(env.Config{}); err != nil { logger.WithContext().Errorw("start reconciler failed", "name", reconcilerName, "error", err) panic(err) @@ -216,6 +216,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } log.Debug("create NATS subscriptions succeeded") + actualSubscription.Status.Config = desiredSubscription.Status.Config // Update status if err := r.syncSubscriptionStatus(ctx, actualSubscription, true, ""); err != nil { return ctrl.Result{}, err @@ -225,6 +226,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } // syncSubscriptionStatus syncs Subscription status +// subsConfig is the subscription configuration that was applied to the subscription. It is set only if the +// isNatsSubReady is true. func (r *Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1alpha1.Subscription, isNatsSubReady bool, message string) error { desiredSubscription := sub.DeepCopy() desiredConditions := make([]eventingv1alpha1.Condition, 0) diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 67bdcc5f53d6..ebf0c9f00386 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -41,6 +41,9 @@ import ( const ( smallTimeOut = 5 * time.Second smallPollingInterval = 1 * time.Second + + timeOut = 60 * time.Second + pollingInterval = 5 * time.Second ) var _ = Describe("NATS Subscription Reconciliation Tests", func() { @@ -83,6 +86,9 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { eventingv1alpha1.ConditionSubscriptionActive, eventingv1alpha1.ConditionReasonNATSSubscriptionActive, v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + }), )) // publish a message @@ -96,7 +102,7 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) received := string(<-result) return sent == received - }).Should(BeTrue()) + }, timeOut, pollingInterval).Should(BeTrue()) Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) @@ -158,7 +164,79 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) received := string(<-result) return sent == received - }).Should(BeTrue()) + }, timeOut, pollingInterval).Should(BeTrue()) + }) + }) + + When("Changing subscription configuration", func() { + It("Should reflect the new config in the subscription status", func() { + By("Creating the subscription using the default config") + ctx := context.Background() + subscriptionName := fmt.Sprintf("sub-%d", testId) + + // create subscriber + result := make(chan []byte) + url, shutdown := newSubscriber(result) + defer shutdown() + + // create subscription + sub := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithEventTypeFilter, reconcilertesting.WithWebhookForNats) + sub.Spec.Sink = url + ensureSubscriptionCreated(sub, ctx) + + getSubscription(sub, ctx).Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + }), + )) + + By("Updating the subscription configuration in the spec") + + newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 + changedSub := sub.DeepCopy() + changedSub.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + } + Expect(k8sClient.Update(ctx, changedSub)).Should(BeNil()) + + Eventually(subscriptionGetter(ctx, sub.Name, sub.Namespace), timeOut, pollingInterval). + Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, ""), + ), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + }), + )) + + connection, err := connectToNats(natsUrl) + Expect(err).ShouldNot(HaveOccurred()) + toSend := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) + msgData := []byte(reconcilertesting.StructuredCloudEvent) + Eventually(func() (string, error) { + // publish the message + if err = connection.Publish(reconcilertesting.OrderCreatedEventType, msgData); err != nil { + return "", err + } + // make sure that the subscriber received the message + select { + case received := <-result: + return string(received), nil + case <-time.After(smallPollingInterval): + return "", fmt.Errorf("no message received") + } + }, timeOut, pollingInterval).Should(Equal(toSend)) + + Expect(k8sClient.Delete(ctx, sub)).Should(BeNil()) + isSubscriptionDeleted(sub, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) }) }) @@ -218,6 +296,23 @@ func fixtureNamespace(name string) *v1.Namespace { return &namespace } +func subscriptionGetter(ctx context.Context, name, namespace string) func() (*eventingv1alpha1.Subscription, error) { + return func() (*eventingv1alpha1.Subscription, error) { + lookupKey := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + subscription := &eventingv1alpha1.Subscription{} + if err := k8sClient.Get(ctx, lookupKey, subscription); err != nil { + log.Printf("fetch subscription %s failed: %v", lookupKey.String(), err) + return &eventingv1alpha1.Subscription{}, err + } + log.Printf("[Subscription] name:%s ns:%s status:%v", subscription.Name, subscription.Namespace, + subscription.Status) + return subscription, nil + } +} + // getSubscription fetches a subscription using the lookupKey and allows to make assertions on it func getSubscription(subscription *eventingv1alpha1.Subscription, ctx context.Context) AsyncAssertion { return Eventually(func() *eventingv1alpha1.Subscription { @@ -270,6 +365,7 @@ var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment var natsServer *natsserver.Server +var defaultSubsConfig = env.DefaultSubscriptionConfig{MaxInFlightMessages: 1} func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -336,6 +432,7 @@ var _ = BeforeSuite(func(done Done) { defaultLogger, k8sManager.GetEventRecorderFor("eventing-controller-nats"), envConf, + defaultSubsConfig, ).SetupUnmanaged(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/components/eventing-controller/testing/matchers.go b/components/eventing-controller/testing/matchers.go index 4b1196ba802a..db680c8d7080 100644 --- a/components/eventing-controller/testing/matchers.go +++ b/components/eventing-controller/testing/matchers.go @@ -40,6 +40,12 @@ func HaveNotFoundSubscription(isReallyDeleted bool) GomegaMatcher { return WithTransform(func(isDeleted bool) bool { return isDeleted }, Equal(isReallyDeleted)) } +func HaveSubsConfiguration(subsConf *eventingv1alpha1.SubscriptionConfig) GomegaMatcher { + return WithTransform(func(s *eventingv1alpha1.Subscription) *eventingv1alpha1.SubscriptionConfig { + return s.Status.Config + }, Equal(subsConf)) +} + func IsAnEmptySubscription() GomegaMatcher { return WithTransform(func(s *eventingv1alpha1.Subscription) bool { emptySub := eventingv1alpha1.Subscription{} diff --git a/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml b/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml +++ b/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml b/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml index b24def2199e5..46a56e86cff1 100644 --- a/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml +++ b/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB diff --git a/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml b/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml +++ b/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml b/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml index b24def2199e5..46a56e86cff1 100644 --- a/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml +++ b/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB diff --git a/resources/eventing/charts/controller/templates/deployment.yaml b/resources/eventing/charts/controller/templates/deployment.yaml index 5212cff270d4..6ad733cdcfde 100644 --- a/resources/eventing/charts/controller/templates/deployment.yaml +++ b/resources/eventing/charts/controller/templates/deployment.yaml @@ -43,6 +43,8 @@ spec: value: "{{ .Values.publisherProxy.image.pullPolicy }}" - name: PUBLISHER_REPLICAS value: "{{ .Values.publisherProxy.replicas }}" + - name: DEFAULT_MAX_IN_FLIGHT_MESSAGES + value: "{{ .Values.eventingBackend.defaultMaxInflightMessages }}" - name: APP_LOG_FORMAT value: {{ .Values.global.log.format | quote }} - name: APP_LOG_LEVEL diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index 8d39ff751491..8f8c54916742 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -15,7 +15,7 @@ image: # name is the name of the container image for the eventing-controller name: "eventing-controller" # tag is the container tag of the eventing-controller image - tag: "PR-11649" + tag: "PR-11621" # pullPolicy is the pullPolicy for the eventing-controller image pullPolicy: "IfNotPresent" @@ -36,7 +36,7 @@ publisherProxy: image: # name is the name of the container image for the eventing-publisher-proxy name: "event-publisher-proxy" - tag: "PR-11649" + tag: "947fb1cd" pullPolicy: IfNotPresent replicas: 1 resources: @@ -61,6 +61,7 @@ clusterRoleBindingSuffix: "" eventingBackend: name: eventing-backend + defaultMaxInflightMessages: 10 healthProbe: port: 8081