From 8139bd73d98ae6c42e2d4999cc3c69aba4bdd21d Mon Sep 17 00:00:00 2001 From: Radu Fantaziu Date: Thu, 1 Jul 2021 16:55:43 +0200 Subject: [PATCH 01/19] Initial commit --- components/eventing-controller/Makefile | 3 ++- .../eventing-controller/pkg/handlers/nats.go | 17 ++++++++++++----- .../pkg/handlers/nats_test.go | 18 +++++++++++------- .../controller/templates/deployment.yaml | 4 ++-- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/components/eventing-controller/Makefile b/components/eventing-controller/Makefile index 0640d9c81116..93736a5a0bd6 100644 --- a/components/eventing-controller/Makefile +++ b/components/eventing-controller/Makefile @@ -175,7 +175,8 @@ resolve_clean: licenses_clean: rm -rf licenses -build-local: $(cmds) test-local ; +#build-local: $(cmds) test-local ; +build-local: $(cmds); .PHONY: $(cmds_images) $(cmds_images_push) diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 88db33827f43..680fae7565f5 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "time" @@ -40,6 +41,7 @@ func NewNats(config env.NatsConfig, log logr.Logger) *Nats { const ( period = time.Minute maxTries = 5 + queueSubscribers = 10 ) // Initialize creates a connection to Nats @@ -116,12 +118,15 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even } } - natsSub, subscribeErr := n.connection.Subscribe(subject, callback) - if subscribeErr != nil { - n.log.Error(subscribeErr, "failed to create a Nats subscription") - return false, subscribeErr + //natsSub, subscribeErr := n.connection.Subscribe(subject, callback) + for i:=0; i Date: Fri, 2 Jul 2021 15:09:24 +0200 Subject: [PATCH 02/19] Extend naming convention for NATS subscriptions map --- components/eventing-controller/pkg/handlers/nats.go | 8 +++----- components/eventing-controller/pkg/handlers/nats_test.go | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 680fae7565f5..4d57f42000d7 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -125,7 +125,7 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even n.log.Error(subscribeErr, "failed to create a Nats subscription") return false, subscribeErr } - n.subscriptions[createKey(sub, subject+strconv.Itoa(i))] = natsSub + n.subscriptions[createKey(sub, subject + string(types.Separator) + strconv.Itoa(i))] = natsSub } } return false, nil @@ -231,11 +231,9 @@ func createSubject(filter *eventingv1alpha1.BebFilter, cleaner eventtype.Cleaner } func createKymaSubscriptionNamespacedName(key string, sub *nats.Subscription) types.NamespacedName { - // hack - key = key[:len(key)-1] nsn := types.NamespacedName{} - nnvalues := strings.Split(strings.TrimSuffix(strings.TrimSuffix(key, sub.Subject), "."), string(types.Separator)) + nnvalues := strings.Split(key, string(types.Separator)) nsn.Namespace = nnvalues[0] - nsn.Name = nnvalues[1] + nsn.Name = strings.TrimSuffix(strings.TrimSuffix(nnvalues[1], sub.Subject), ".") return nsn } diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index a8a63214cffd..c3b7848751db 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "k8s.io/apimachinery/pkg/types" "strconv" "strings" "testing" @@ -300,7 +301,7 @@ func TestIsValidSubscription(t *testing.T) { var key string var natsSub *nats.Subscription for i:=0; i Date: Fri, 2 Jul 2021 16:17:35 +0200 Subject: [PATCH 03/19] Remove temp changes --- components/eventing-controller/Makefile | 3 +-- .../eventing/charts/controller/templates/deployment.yaml | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/components/eventing-controller/Makefile b/components/eventing-controller/Makefile index 93736a5a0bd6..0640d9c81116 100644 --- a/components/eventing-controller/Makefile +++ b/components/eventing-controller/Makefile @@ -175,8 +175,7 @@ resolve_clean: licenses_clean: rm -rf licenses -#build-local: $(cmds) test-local ; -build-local: $(cmds); +build-local: $(cmds) test-local ; .PHONY: $(cmds_images) $(cmds_images_push) diff --git a/resources/eventing/charts/controller/templates/deployment.yaml b/resources/eventing/charts/controller/templates/deployment.yaml index f312647635fd..5212cff270d4 100644 --- a/resources/eventing/charts/controller/templates/deployment.yaml +++ b/resources/eventing/charts/controller/templates/deployment.yaml @@ -17,8 +17,8 @@ spec: terminationGracePeriodSeconds: 10 securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} containers: - - image: "radu57/eventing-controller" - imagePullPolicy: "Always" + - image: "{{ .Values.global.image.repository }}/{{ .Values.image.name }}:{{ .Values.image.tag }}" + imagePullPolicy: "{{ .Values.image.pullPolicy }}" name: controller env: - name: NATS_URL From 7f222bafbe2b2f410ebbdb117715d85d7f8de017 Mon Sep 17 00:00:00 2001 From: Radu Fantaziu Date: Fri, 2 Jul 2021 16:25:54 +0200 Subject: [PATCH 04/19] Solve check-imports --- components/eventing-controller/pkg/handlers/nats.go | 8 ++++---- components/eventing-controller/pkg/handlers/nats_test.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 4d57f42000d7..c0b74f01e683 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -39,8 +39,8 @@ func NewNats(config env.NatsConfig, log logr.Logger) *Nats { } const ( - period = time.Minute - maxTries = 5 + period = time.Minute + maxTries = 5 queueSubscribers = 10 ) @@ -119,13 +119,13 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even } //natsSub, subscribeErr := n.connection.Subscribe(subject, callback) - for i:=0; i Date: Thu, 15 Jul 2021 13:02:59 +0200 Subject: [PATCH 05/19] add subscription config --- .../api/v1alpha1/subscription_types.go | 29 ++++++++++++ .../api/v1alpha1/zz_generated.deepcopy.go | 25 +++++++++++ ...venting.kyma-project.io_subscriptions.yaml | 16 +++++++ .../pkg/commander/beb/beb.go | 2 +- .../pkg/commander/commander.go | 3 +- .../pkg/commander/fake/commander.go | 3 +- .../pkg/commander/nats/nats.go | 3 +- .../pkg/env/backend_config.go | 6 +++ .../eventing-controller/pkg/handlers/nats.go | 2 +- .../reconciler/backend/reconciler.go | 14 +++--- .../reconciler/backend/reconciler_test.go | 2 +- .../subscription-nats/reconciler.go | 45 +++++++++++-------- .../subscription-nats/reconciler_test.go | 5 +++ .../controller/templates/deployment.yaml | 2 + .../eventing/charts/controller/values.yaml | 2 + 15 files changed, 130 insertions(+), 29 deletions(-) diff --git a/components/eventing-controller/api/v1alpha1/subscription_types.go b/components/eventing-controller/api/v1alpha1/subscription_types.go index 0b4ccd860d45..cc724a187a6d 100644 --- a/components/eventing-controller/api/v1alpha1/subscription_types.go +++ b/components/eventing-controller/api/v1alpha1/subscription_types.go @@ -104,6 +104,27 @@ func (bf *BebFilters) Deduplicate() (*BebFilters, error) { return result, nil } +type SubscriptionConfig struct { + // +optional + // +kubebuilder:validation:Minimum=1 + MaxInFlightMessages int `json:"maxInFlightMessages,omitempty"` +} + +// MergeSubsConfigs returns a valid subscription config object based on the provided config, +// complemented with default values, if necessary +func MergeSubsConfigs(config, defaults *SubscriptionConfig) *SubscriptionConfig { + merged := &SubscriptionConfig{ + MaxInFlightMessages: defaults.MaxInFlightMessages, + } + if config == nil { + return merged + } + if config.MaxInFlightMessages >= 1 { + merged.MaxInFlightMessages = config.MaxInFlightMessages + } + return merged +} + // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { // ID is the unique identifier of Subscription, read-only. @@ -123,6 +144,10 @@ type SubscriptionSpec struct { // Filter defines the list of filters Filter *BebFilters `json:"filter"` + + // Config defines the configurations that can be applied to the eventing backend when creating this subscription + // +optional + Config *SubscriptionConfig `json:"config,omitempty"` } type EmsSubscriptionStatus struct { @@ -180,6 +205,10 @@ type SubscriptionStatus struct { // EmsSubscriptionStatus defines the status of Subscription in BEB // +optional EmsSubscriptionStatus EmsSubscriptionStatus `json:"emsSubscriptionStatus,omitempty"` + + // Config defines the configurations that have been applied to the eventing backend when creating this subscription + // +optional + Config *SubscriptionConfig `json:"config,omitempty"` } // +kubebuilder:object:root=true diff --git a/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go b/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go index 697705c3185d..7e0a72823b7e 100644 --- a/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go +++ b/components/eventing-controller/api/v1alpha1/zz_generated.deepcopy.go @@ -271,6 +271,21 @@ func (in *Subscription) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscriptionConfig) DeepCopyInto(out *SubscriptionConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionConfig. +func (in *SubscriptionConfig) DeepCopy() *SubscriptionConfig { + if in == nil { + return nil + } + out := new(SubscriptionConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionList) DeepCopyInto(out *SubscriptionList) { *out = *in @@ -316,6 +331,11 @@ func (in *SubscriptionSpec) DeepCopyInto(out *SubscriptionSpec) { *out = new(BebFilters) (*in).DeepCopyInto(*out) } + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(SubscriptionConfig) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionSpec. @@ -339,6 +359,11 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { } } out.EmsSubscriptionStatus = in.EmsSubscriptionStatus + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(SubscriptionConfig) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionStatus. diff --git a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml index b24def2199e5..98cc06e429ff 100644 --- a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml +++ b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB diff --git a/components/eventing-controller/pkg/commander/beb/beb.go b/components/eventing-controller/pkg/commander/beb/beb.go index 3a19a9d62ccd..72e0e1867a33 100644 --- a/components/eventing-controller/pkg/commander/beb/beb.go +++ b/components/eventing-controller/pkg/commander/beb/beb.go @@ -78,7 +78,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the manager. -func (c *Commander) Start(params commander.Params) error { +func (c *Commander) Start(_ *eventingv1alpha1.SubscriptionConfig, params commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel dynamicClient := dynamic.NewForConfigOrDie(c.restCfg) diff --git a/components/eventing-controller/pkg/commander/commander.go b/components/eventing-controller/pkg/commander/commander.go index 642a5aeacf92..3570f209e58f 100644 --- a/components/eventing-controller/pkg/commander/commander.go +++ b/components/eventing-controller/pkg/commander/commander.go @@ -3,6 +3,7 @@ package commander import ( + "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -14,7 +15,7 @@ type Commander interface { Init(mgr manager.Manager) error // Start runs the initialized commander instance. - Start(params Params) error + Start(defaultSubscriptionConfig *v1alpha1.SubscriptionConfig, params Params) error // Stop tells the commander instance to shutdown and clean-up. Stop() error diff --git a/components/eventing-controller/pkg/commander/fake/commander.go b/components/eventing-controller/pkg/commander/fake/commander.go index 462fb499892b..5aedc4cb3a50 100644 --- a/components/eventing-controller/pkg/commander/fake/commander.go +++ b/components/eventing-controller/pkg/commander/fake/commander.go @@ -1,6 +1,7 @@ package fake import ( + "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" "github.com/kyma-project/kyma/components/eventing-controller/pkg/commander" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers" "k8s.io/client-go/dynamic" @@ -16,7 +17,7 @@ func (c *Commander) Init(mgr manager.Manager) error { return nil } -func (c *Commander) Start(_ commander.Params) error { +func (c *Commander) Start(_ *v1alpha1.SubscriptionConfig, _ commander.Params) error { return nil } diff --git a/components/eventing-controller/pkg/commander/nats/nats.go b/components/eventing-controller/pkg/commander/nats/nats.go index 025d0a145dd1..2fcf6cbb8180 100644 --- a/components/eventing-controller/pkg/commander/nats/nats.go +++ b/components/eventing-controller/pkg/commander/nats/nats.go @@ -67,7 +67,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the commander. -func (c *Commander) Start(_ commander.Params) error { +func (c *Commander) Start(defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig, _ commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel @@ -81,6 +81,7 @@ func (c *Commander) Start(_ commander.Params) error { ctrl.Log.WithName("reconciler").WithName("Subscription"), c.mgr.GetEventRecorderFor("eventing-controller-nats"), c.envCfg, + defaultSubscriptionConfig, ) c.backend = natsReconciler.Backend if err := natsReconciler.SetupUnmanaged(c.mgr); err != nil { diff --git a/components/eventing-controller/pkg/env/backend_config.go b/components/eventing-controller/pkg/env/backend_config.go index 4852a1355b9a..8acf6c3e435a 100644 --- a/components/eventing-controller/pkg/env/backend_config.go +++ b/components/eventing-controller/pkg/env/backend_config.go @@ -12,6 +12,8 @@ type BackendConfig struct { BackendCRNamespace string `envconfig:"BACKEND_CR_NAMESPACE" default:"kyma-system"` BackendCRName string `envconfig:"BACKEND_CR_NAME" default:"eventing-backend"` + + DefaultSubscriptionConfig DefaultSubscriptionConfig } type PublisherConfig struct { @@ -27,6 +29,10 @@ type PublisherConfig struct { LimitsMemory string `envconfig:"PUBLISHER_LIMITS_MEMORY" default:"128Mi"` } +type DefaultSubscriptionConfig struct { + MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"9"` +} + func GetBackendConfig() BackendConfig { cfg := BackendConfig{} if err := envconfig.Process("", &cfg); err != nil { diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index c0b74f01e683..15ab5445e206 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -119,7 +119,7 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even } //natsSub, subscribeErr := n.connection.Subscribe(subject, callback) - for i := 0; i < queueSubscribers; i++ { + for i := 0; i < sub.Spec.Config.MaxInFlightMessages; i++ { natsSub, subscribeErr := n.connection.QueueSubscribe(subject, subject, callback) if subscribeErr != nil { n.log.Error(subscribeErr, "failed to create a Nats subscription") diff --git a/components/eventing-controller/reconciler/backend/reconciler.go b/components/eventing-controller/reconciler/backend/reconciler.go index 41aa91de35da..3fb89560550a 100644 --- a/components/eventing-controller/reconciler/backend/reconciler.go +++ b/components/eventing-controller/reconciler/backend/reconciler.go @@ -61,9 +61,10 @@ type Reconciler struct { // TODO: Do we need to explicitly pass and use a cache here? The default client that we get from manager // already uses a cache internally (check manager.DefaultNewClient) cache.Cache - logger *logger.Logger - record record.EventRecorder - cfg env.BackendConfig + logger *logger.Logger + record record.EventRecorder + cfg env.BackendConfig + defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig // backendType is the type of the backend which the reconciler detects at runtime backendType eventingv1alpha1.BackendType @@ -80,6 +81,9 @@ func NewReconciler(ctx context.Context, natsCommander, bebCommander commander.Co logger: logger, record: recorder, cfg: cfg, + defaultSubscriptionConfig: &eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: cfg.DefaultSubscriptionConfig.MaxInFlightMessages, + }, } } @@ -723,7 +727,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *Reconciler) startNATSController() error { if !r.natsCommanderStarted { - if err := r.natsCommander.Start(commander.Params{}); err != nil { + if err := r.natsCommander.Start(r.defaultSubscriptionConfig, commander.Params{}); err != nil { r.namedLogger().Errorw("failed to start the NATS commander", "error", err) return err } @@ -748,7 +752,7 @@ func (r *Reconciler) stopNATSController() error { func (r *Reconciler) startBEBController(clientID, clientSecret string) error { if !r.bebCommanderStarted { bebCommanderParams := commander.Params{"client_id": clientID, "client_secret": clientSecret} - if err := r.bebCommander.Start(bebCommanderParams); err != nil { + if err := r.bebCommander.Start(r.defaultSubscriptionConfig, bebCommanderParams); err != nil { r.namedLogger().Errorw("failed to start the BEB commander", "error", err) return err } diff --git a/components/eventing-controller/reconciler/backend/reconciler_test.go b/components/eventing-controller/reconciler/backend/reconciler_test.go index 7be7b66094ea..0b5a8c5813ca 100644 --- a/components/eventing-controller/reconciler/backend/reconciler_test.go +++ b/components/eventing-controller/reconciler/backend/reconciler_test.go @@ -635,7 +635,7 @@ func (t *TestCommander) Init(_ manager.Manager) error { return nil } -func (t *TestCommander) Start(_ commander.Params) error { +func (t *TestCommander) Start(_ *eventingv1alpha1.SubscriptionConfig, _ commander.Params) error { return t.startErr } diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler.go b/components/eventing-controller/reconciler/subscription-nats/reconciler.go index fb5304c396f8..969e7d3004cf 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler.go @@ -32,10 +32,11 @@ type Reconciler struct { ctx context.Context client.Client cache.Cache - Backend handlers.MessagingBackend - Log logr.Logger - recorder record.EventRecorder - eventTypeCleaner eventtype.Cleaner + Backend handlers.MessagingBackend + Log logr.Logger + recorder record.EventRecorder + eventTypeCleaner eventtype.Cleaner + defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig } var ( @@ -51,7 +52,7 @@ const ( ) func NewReconciler(ctx context.Context, client client.Client, applicationLister *application.Lister, cache cache.Cache, - log logr.Logger, recorder record.EventRecorder, cfg env.NatsConfig) *Reconciler { + log logr.Logger, recorder record.EventRecorder, cfg env.NatsConfig, defaultSubConf *eventingv1alpha1.SubscriptionConfig) *Reconciler { natsHandler := handlers.NewNats(cfg, log) err := natsHandler.Initialize(env.Config{}) if err != nil { @@ -59,13 +60,14 @@ func NewReconciler(ctx context.Context, client client.Client, applicationLister panic(err) } return &Reconciler{ - ctx: ctx, - Client: client, - Cache: cache, - Backend: natsHandler, - Log: log, - recorder: recorder, - eventTypeCleaner: eventtype.NewCleaner(cfg.EventTypePrefix, applicationLister, log), + ctx: ctx, + Client: client, + Cache: cache, + Backend: natsHandler, + Log: log, + recorder: recorder, + eventTypeCleaner: eventtype.NewCleaner(cfg.EventTypePrefix, applicationLister, log), + defaultSubscriptionConfig: defaultSubConf, } } @@ -176,7 +178,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Check for valid sink if err := r.assertSinkValidity(actualSubscription.Spec.Sink); err != nil { r.Log.Error(err, "failed to parse sink URL") - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { return ctrl.Result{}, err } // No point in reconciling as the sink is invalid @@ -187,7 +189,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu err = r.Backend.DeleteSubscription(desiredSubscription) if err != nil { log.Error(err, "failed to delete subscriptions") - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err @@ -209,10 +211,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return result, nil } + appliedSubsConfig := eventingv1alpha1.MergeSubsConfigs(desiredSubscription.Spec.Config, r.defaultSubscriptionConfig) + desiredSubscription.Spec.Config = appliedSubsConfig _, err = r.Backend.SyncSubscription(desiredSubscription, r.eventTypeCleaner) if err != nil { r.Log.Error(err, "failed to sync subscription") - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err @@ -221,7 +225,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu log.Info("successfully created Nats subscriptions") // Update status - if err := r.syncSubscriptionStatus(ctx, actualSubscription, true, ""); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, true, "", appliedSubsConfig); err != nil { return ctrl.Result{}, err } @@ -229,8 +233,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } // syncSubscriptionStatus syncs Subscription status +// subsConfig is the subscription configuration that was applied to the subscription. It is set only if the +// isNatsSubReady is true. func (r Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1alpha1.Subscription, - isNatsSubReady bool, message string) error { + isNatsSubReady bool, message string, subsConfig *eventingv1alpha1.SubscriptionConfig) error { desiredSubscription := sub.DeepCopy() desiredConditions := make([]eventingv1alpha1.Condition, 0) conditionAdded := false @@ -257,6 +263,9 @@ func (r Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1a } desiredSubscription.Status.Conditions = desiredConditions desiredSubscription.Status.Ready = isNatsSubReady + if isNatsSubReady { + desiredSubscription.Status.Config = subsConfig + } if !reflect.DeepEqual(sub.Status, desiredSubscription.Status) { err := r.Client.Status().Update(ctx, desiredSubscription, &client.UpdateOptions{}) @@ -292,7 +301,7 @@ func (r Reconciler) syncInvalidSubscriptions(ctx context.Context) (ctrl.Result, continue } // mark the subscription to be not ready, it will throw a new reconcile call - if err := r.syncSubscriptionStatus(ctx, sub, false, "invalid subscription"); err != nil { + if err := r.syncSubscriptionStatus(ctx, sub, false, "invalid subscription", nil); err != nil { r.Log.Error(err, "failed to save status for invalid subscription", "namespace", v.Namespace, "name", v.Name) return ctrl.Result{}, err } diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index fc85be77964f..1e612fe718f2 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -3,6 +3,7 @@ package subscription_nats import ( "context" "fmt" + "github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/nats" "io/ioutil" "log" "net/http" @@ -323,6 +324,9 @@ var _ = BeforeSuite(func(done Done) { app := applicationtest.NewApplication(reconcilertesting.ApplicationNameNotClean, nil) applicationLister := fake.NewApplicationListerOrDie(context.Background(), app) + defaultSubsConfig := &eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: 1, + } err = NewReconciler( context.Background(), k8sManager.GetClient(), @@ -331,6 +335,7 @@ var _ = BeforeSuite(func(done Done) { ctrl.Log.WithName("nats-reconciler").WithName("Subscription"), k8sManager.GetEventRecorderFor("eventing-controller-nats"), envConf, + defaultSubsConfig, ).SetupUnmanaged(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/resources/eventing/charts/controller/templates/deployment.yaml b/resources/eventing/charts/controller/templates/deployment.yaml index 5212cff270d4..6ad733cdcfde 100644 --- a/resources/eventing/charts/controller/templates/deployment.yaml +++ b/resources/eventing/charts/controller/templates/deployment.yaml @@ -43,6 +43,8 @@ spec: value: "{{ .Values.publisherProxy.image.pullPolicy }}" - name: PUBLISHER_REPLICAS value: "{{ .Values.publisherProxy.replicas }}" + - name: DEFAULT_MAX_IN_FLIGHT_MESSAGES + value: "{{ .Values.eventingBackend.defaultMaxInflightMessages }}" - name: APP_LOG_FORMAT value: {{ .Values.global.log.format | quote }} - name: APP_LOG_LEVEL diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index 0cba0ad82941..f69de1917089 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -61,6 +61,8 @@ clusterRoleBindingSuffix: "" eventingBackend: name: eventing-backend + # Using the current EventMesh default value + defaultMaxInflightMessages: 9 healthProbe: port: 8081 From 20a836623e80458606e05e0ab24fdc28256ecdab Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Thu, 15 Jul 2021 16:31:54 +0200 Subject: [PATCH 06/19] fix tests, etc --- .../api/v1alpha1/subscription_types.go | 3 +- .../pkg/commander/beb/beb.go | 2 +- .../pkg/commander/beb/beb_test.go | 2 +- .../pkg/commander/commander.go | 4 +- .../pkg/commander/fake/commander.go | 4 +- .../pkg/commander/nats/nats.go | 4 +- .../pkg/commander/nats/nats_test.go | 5 +- .../eventing-controller/pkg/handlers/beb.go | 18 ++--- .../pkg/handlers/beb_test.go | 2 +- .../eventing-controller/pkg/handlers/nats.go | 48 +++++++------ .../pkg/handlers/nats_test.go | 69 +++++++++---------- .../eventing-controller/pkg/handlers/utils.go | 2 +- .../reconciler/backend/reconciler.go | 14 ++-- .../reconciler/backend/reconciler_test.go | 4 +- .../subscription-nats/reconciler.go | 10 +-- .../subscription-nats/reconciler_test.go | 3 +- .../reconciler/subscription/reconciler.go | 2 +- .../eventing/charts/controller/values.yaml | 2 +- 18 files changed, 100 insertions(+), 98 deletions(-) diff --git a/components/eventing-controller/api/v1alpha1/subscription_types.go b/components/eventing-controller/api/v1alpha1/subscription_types.go index cc724a187a6d..9f31b7bfd159 100644 --- a/components/eventing-controller/api/v1alpha1/subscription_types.go +++ b/components/eventing-controller/api/v1alpha1/subscription_types.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/mitchellh/hashstructure/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -112,7 +113,7 @@ type SubscriptionConfig struct { // MergeSubsConfigs returns a valid subscription config object based on the provided config, // complemented with default values, if necessary -func MergeSubsConfigs(config, defaults *SubscriptionConfig) *SubscriptionConfig { +func MergeSubsConfigs(config *SubscriptionConfig, defaults *env.DefaultSubscriptionConfig) *SubscriptionConfig { merged := &SubscriptionConfig{ MaxInFlightMessages: defaults.MaxInFlightMessages, } diff --git a/components/eventing-controller/pkg/commander/beb/beb.go b/components/eventing-controller/pkg/commander/beb/beb.go index a2bc0ef452de..c4b4f741d604 100644 --- a/components/eventing-controller/pkg/commander/beb/beb.go +++ b/components/eventing-controller/pkg/commander/beb/beb.go @@ -82,7 +82,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the manager. -func (c *Commander) Start(_ *eventingv1alpha1.SubscriptionConfig, params commander.Params) error { +func (c *Commander) Start(_ env.DefaultSubscriptionConfig, params commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel dynamicClient := dynamic.NewForConfigOrDie(c.restCfg) diff --git a/components/eventing-controller/pkg/commander/beb/beb_test.go b/components/eventing-controller/pkg/commander/beb/beb_test.go index d95fc308ee0a..be92cb7cd831 100644 --- a/components/eventing-controller/pkg/commander/beb/beb_test.go +++ b/components/eventing-controller/pkg/commander/beb/beb_test.go @@ -81,7 +81,7 @@ func TestCleanup(t *testing.T) { // create a BEB subscription from Kyma subscription fakeCleaner := fake.Cleaner{} - _, err = bebCommander.Backend.SyncSubscription(subscription, &fakeCleaner, apiRule) + _, _, err = bebCommander.Backend.SyncSubscription(subscription, &fakeCleaner, apiRule) g.Expect(err).To(gomega.BeNil()) // check that the susbcription exist in bebMock diff --git a/components/eventing-controller/pkg/commander/commander.go b/components/eventing-controller/pkg/commander/commander.go index 3570f209e58f..71001434535a 100644 --- a/components/eventing-controller/pkg/commander/commander.go +++ b/components/eventing-controller/pkg/commander/commander.go @@ -3,7 +3,7 @@ package commander import ( - "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -15,7 +15,7 @@ type Commander interface { Init(mgr manager.Manager) error // Start runs the initialized commander instance. - Start(defaultSubscriptionConfig *v1alpha1.SubscriptionConfig, params Params) error + Start(defaultSubsConfig env.DefaultSubscriptionConfig, params Params) error // Stop tells the commander instance to shutdown and clean-up. Stop() error diff --git a/components/eventing-controller/pkg/commander/fake/commander.go b/components/eventing-controller/pkg/commander/fake/commander.go index 5aedc4cb3a50..96190c02ef29 100644 --- a/components/eventing-controller/pkg/commander/fake/commander.go +++ b/components/eventing-controller/pkg/commander/fake/commander.go @@ -1,8 +1,8 @@ package fake import ( - "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" "github.com/kyma-project/kyma/components/eventing-controller/pkg/commander" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers" "k8s.io/client-go/dynamic" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -17,7 +17,7 @@ func (c *Commander) Init(mgr manager.Manager) error { return nil } -func (c *Commander) Start(_ *v1alpha1.SubscriptionConfig, _ commander.Params) error { +func (c *Commander) Start(_ env.DefaultSubscriptionConfig, _ commander.Params) error { return nil } diff --git a/components/eventing-controller/pkg/commander/nats/nats.go b/components/eventing-controller/pkg/commander/nats/nats.go index 8ebecaf11eef..7713ab61312b 100644 --- a/components/eventing-controller/pkg/commander/nats/nats.go +++ b/components/eventing-controller/pkg/commander/nats/nats.go @@ -73,7 +73,7 @@ func (c *Commander) Init(mgr manager.Manager) error { } // Start implements the Commander interface and starts the commander. -func (c *Commander) Start(defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig, _ commander.Params) error { +func (c *Commander) Start(defaultSubsConfig env.DefaultSubscriptionConfig, _ commander.Params) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel @@ -87,7 +87,7 @@ func (c *Commander) Start(defaultSubscriptionConfig *eventingv1alpha1.Subscripti c.logger, c.mgr.GetEventRecorderFor("eventing-controller-nats"), c.envCfg, - defaultSubscriptionConfig, + defaultSubsConfig, ) c.backend = natsReconciler.Backend if err := natsReconciler.SetupUnmanaged(c.mgr); err != nil { diff --git a/components/eventing-controller/pkg/commander/nats/nats_test.go b/components/eventing-controller/pkg/commander/nats/nats_test.go index 10bf78a8c5a2..0ef69473f0a6 100644 --- a/components/eventing-controller/pkg/commander/nats/nats_test.go +++ b/components/eventing-controller/pkg/commander/nats/nats_test.go @@ -56,7 +56,8 @@ func TestCleanup(t *testing.T) { ReconnectWait: time.Second, EventTypePrefix: controllertesting.EventTypePrefix, } - natsBackend := handlers.NewNats(envConf, defaultLogger) + subsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 9} + natsBackend := handlers.NewNats(envConf, subsConfig, defaultLogger) natsCommander.Backend = natsBackend err = natsCommander.Backend.Initialize(env.Config{}) g.Expect(err).To(gomega.BeNil()) @@ -69,7 +70,7 @@ func TestCleanup(t *testing.T) { fakeCleaner := fake.Cleaner{} // Create NATS subscription - _, err = natsCommander.Backend.SyncSubscription(testSub, &fakeCleaner) + _, _, err = natsCommander.Backend.SyncSubscription(testSub, &fakeCleaner) g.Expect(err).To(gomega.BeNil()) // Make sure subscriber works diff --git a/components/eventing-controller/pkg/handlers/beb.go b/components/eventing-controller/pkg/handlers/beb.go index ca939b117c30..cb907675de1b 100644 --- a/components/eventing-controller/pkg/handlers/beb.go +++ b/components/eventing-controller/pkg/handlers/beb.go @@ -76,7 +76,7 @@ func getWebHookAuth(cfg env.Config, credentials *OAuth2ClientCredentials) *types } // SyncSubscription synchronize the EV2 subscription with the EMS subscription. It returns true, if the EV2 subscription status was changed -func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, error) { +func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) { // Format logger log := utils.LoggerWithSubscription(b.namedLogger(), subscription) @@ -91,13 +91,13 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea sEv2, err := getInternalView4Ev2(subscription, apiRule, b.WebhookAuth, b.ProtocolSettings, b.Namespace) if err != nil { log.Errorw("get Kyma subscription internal view failed", "error", err) - return false, err + return false, nil, err } newEv2Hash, err := getHash(sEv2) if err != nil { log.Errorw("get Kyma subscription hash failed", "error", err) - return false, err + return false, nil, err } var bebSubscription *types.Subscription @@ -107,7 +107,7 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea var newEMSHash int64 bebSubscription, newEMSHash, err = b.deleteCreateAndHashSubscription(sEv2, cleaner, log) if err != nil { - return false, err + return false, nil, err } subscription.Status.Ev2hash = newEv2Hash subscription.Status.Emshash = newEMSHash @@ -117,24 +117,24 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea bebSubscription, err = b.getSubscription(sEv2.Name) if err != nil { log.Errorw("get BEB subscription failed", "error", err) - return false, err + return false, nil, err } // get the internal view for the EMS subscription sEms, err := getInternalView4Ems(bebSubscription) if err != nil { log.Errorw("get BEB subscription internal view failed", "error", err) - return false, err + return false, nil, err } newEmsHash, err := getHash(sEms) if err != nil { log.Errorw("get BEB subscription hash failed", "error", err) - return false, err + return false, nil, err } if newEmsHash != subscription.Status.Emshash { // delete & create a new EMS subscription bebSubscription, newEmsHash, err = b.deleteCreateAndHashSubscription(sEv2, cleaner, log) if err != nil { - return false, err + return false, nil, err } subscription.Status.Emshash = newEmsHash statusChanged = true @@ -143,7 +143,7 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea // set the status of bebSubscription in ev2Subscription statusChanged = b.setEmsSubscriptionStatus(subscription, bebSubscription) || statusChanged - return statusChanged, nil + return statusChanged, nil, nil } // DeleteSubscription deletes the corresponding EMS subscription diff --git a/components/eventing-controller/pkg/handlers/beb_test.go b/components/eventing-controller/pkg/handlers/beb_test.go index a8d5fdf7769c..625a79bc2133 100644 --- a/components/eventing-controller/pkg/handlers/beb_test.go +++ b/components/eventing-controller/pkg/handlers/beb_test.go @@ -51,7 +51,7 @@ func Test_SyncBebSubscription(t *testing.T) { controllertesting.WithService("foo-host", "foo-svc", apiRule) // then - changed, err := beb.SyncSubscription(subscription, nil, apiRule) + changed, _, err := beb.SyncSubscription(subscription, nil, apiRule) g.Expect(err).To(Not(BeNil())) g.Expect(changed).To(BeFalse()) } diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 0d645b880067..37c7051981e7 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -10,6 +10,7 @@ import ( "time" "github.com/kyma-project/kyma/components/eventing-controller/pkg/tracing" + "github.com/kyma-project/kyma/components/eventing-controller/utils" "k8s.io/apimachinery/pkg/types" @@ -29,28 +30,31 @@ import ( var _ MessagingBackend = &Nats{} type Nats struct { - config env.NatsConfig - logger *logger.Logger - client cev2.Client - connection *nats.Conn - subscriptions map[string]*nats.Subscription + config env.NatsConfig + defaultSubsConfig env.DefaultSubscriptionConfig + logger *logger.Logger + client cev2.Client + connection *nats.Conn + subscriptions map[string]*nats.Subscription } -func NewNats(config env.NatsConfig, logger *logger.Logger) *Nats { - return &Nats{config: config, logger: logger, subscriptions: make(map[string]*nats.Subscription)} +func NewNats(config env.NatsConfig, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger) *Nats { + return &Nats{ + config: config, + logger: logger, + subscriptions: make(map[string]*nats.Subscription), + defaultSubsConfig: subsConfig, + } } const ( - period = time.Minute - maxTries = 5 - queueSubscribers = 10 + period = time.Minute + maxTries = 5 natsHandlerName = "nats-handler" ) // Initialize creates a connection to NATS. -func (n *Nats) Initialize(cfg env.Config) error { - n.log.Info("Initialize NATS connection") - var err error +func (n *Nats) Initialize(env.Config) (err error) { if n.connection == nil || n.connection.Status() != nats.CONNECTED { n.connection, err = nats.Connect(n.config.Url, nats.RetryOnFailedConnect(true), nats.MaxReconnects(n.config.MaxReconnects), nats.ReconnectWait(n.config.ReconnectWait)) @@ -86,12 +90,13 @@ func newCloudeventClient(config env.NatsConfig) (cev2.Client, error) { // SyncSubscription synchronizes the given Kyma subscription to NATS subscription. // note: the returned bool should be ignored now. It should act as a marker for changed subscription status. -func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, _ ...interface{}) (bool, error) { +func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, _ ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) { + var subsConfig *eventingv1alpha1.SubscriptionConfig var filters []*eventingv1alpha1.BebFilter if sub.Spec.Filter != nil { uniqueFilters, err := sub.Spec.Filter.Deduplicate() if err != nil { - return false, errors.Wrap(err, "deduplicate subscription filters failed") + return false, subsConfig, errors.Wrap(err, "deduplicate subscription filters failed") } filters = uniqueFilters.Filters } @@ -104,28 +109,31 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even subject, err := createSubject(filter, cleaner) if err != nil { log.Errorw("create NATS subject failed", "error", err) - return false, err + return false, subsConfig, err } + callback := n.getCallback(sub.Spec.Sink) + if n.connection.Status() != nats.CONNECTED { if err := n.Initialize(env.Config{}); err != nil { log.Errorw("reset NATS connection failed", "status", n.connection.Stats(), "error", err) - return false, err + return false, subsConfig, err } } //natsSub, subscribeErr := n.connection.Subscribe(subject, callback) - for i := 0; i < sub.Spec.Config.MaxInFlightMessages; i++ { + subsConfig = eventingv1alpha1.MergeSubsConfigs(sub.Spec.Config, &n.defaultSubsConfig) + for i := 0; i < subsConfig.MaxInFlightMessages; i++ { natsSub, subscribeErr := n.connection.QueueSubscribe(subject, subject, callback) if subscribeErr != nil { log.Errorw("create NATS subscription failed", "error", err) - return false, subscribeErr + return false, subsConfig, subscribeErr } n.subscriptions[createKey(sub, subject+string(types.Separator)+strconv.Itoa(i))] = natsSub } } - return false, nil + return false, subsConfig, nil } // DeleteSubscription deletes all NATS subscriptions corresponding to a Kyma subscription diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index 69810b37b21f..7b0d885cc1d6 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -12,6 +12,7 @@ import ( "github.com/avast/retry-go" "github.com/nats-io/nats.go" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" cev2event "github.com/cloudevents/sdk-go/v2/event" kymalogger "github.com/kyma-project/kyma/common/logging/logger" @@ -87,6 +88,7 @@ func TestConvertMsgToCE(t *testing.T) { } func TestSubscription(t *testing.T) { + g := NewWithT(t) natsPort := 5222 subscriberPort := 8080 subscriberReceiveURL := fmt.Sprintf("http://127.0.0.1:%d/store", subscriberPort) @@ -101,16 +103,13 @@ func TestSubscription(t *testing.T) { t.Fatalf("initialize logger failed: %v", err) } - natsURL := natsServer.ClientURL() - natsClient := Nats{ - subscriptions: make(map[string]*nats.Subscription), - config: env.NatsConfig{ - Url: natsURL, - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + defaultMaxInflight := 9 + natsClient := NewNats(natsConfig, env.DefaultSubscriptionConfig{MaxInFlightMessages: defaultMaxInflight}, defaultLogger) if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("connect to Nats server failed: %v", err) @@ -137,14 +136,16 @@ func TestSubscription(t *testing.T) { // Create a subscription sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) sub.Spec.Sink = subscriberReceiveURL - _, err = natsClient.SyncSubscription(sub, cleaner) + _, appliedSubConfig, err := natsClient.SyncSubscription(sub, cleaner) if err != nil { t.Fatalf("sync subscription failed: %v", err) } + g.Expect(appliedSubConfig).NotTo(BeNil()) + g.Expect(appliedSubConfig.MaxInFlightMessages).To(Equal(defaultMaxInflight)) data := "sampledata" // Send an event - err = SendEventToNATS(&natsClient, data) + err = SendEventToNATS(natsClient, data) if err != nil { t.Fatalf("publish event failed: %v", err) } @@ -164,7 +165,7 @@ func TestSubscription(t *testing.T) { newData := "test-data" // Send an event - err = SendEventToNATS(&natsClient, newData) + err = SendEventToNATS(natsClient, newData) if err != nil { t.Fatalf("publish event failed: %v", err) } @@ -198,15 +199,13 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { t.Fatalf("initialize logger failed: %v", err) } - natsClient := Nats{ - subscriptions: map[string]*nats.Subscription{}, - config: env.NatsConfig{ - Url: natsServer.ClientURL(), - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + natsClient := NewNats(natsConfig, env.DefaultSubscriptionConfig{MaxInFlightMessages: 9}, defaultLogger) + if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("start NATS eventing backend failed: %s", err.Error()) } @@ -239,13 +238,13 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { } sub.Spec.Sink = subscriberReceiveURL idFunc := func(et string) (string, error) { return et, nil } - if _, err := natsClient.SyncSubscription(sub, eventtype.CleanerFunc(idFunc)); err != nil { + if _, _, err := natsClient.SyncSubscription(sub, eventtype.CleanerFunc(idFunc)); err != nil { t.Fatalf("sync subscription failed: %s", err.Error()) } data := "sampledata" // Send an event - if err := SendEventToNATS(&natsClient, data); err != nil { + if err := SendEventToNATS(natsClient, data); err != nil { t.Fatalf("publish event failed: %v", err) } @@ -275,16 +274,13 @@ func TestIsValidSubscription(t *testing.T) { } // Create NATS client - natsURL := natsServer.ClientURL() - natsClient := Nats{ - subscriptions: make(map[string]*nats.Subscription), - config: env.NatsConfig{ - Url: natsURL, - MaxReconnects: 2, - ReconnectWait: time.Second, - }, - logger: defaultLogger, + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, } + defaultSubsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 9} + natsClient := NewNats(natsConfig, defaultSubsConfig, defaultLogger) if err := natsClient.Initialize(env.Config{}); err != nil { t.Fatalf("connect to NATS server failed: %v", err) @@ -298,7 +294,7 @@ func TestIsValidSubscription(t *testing.T) { // Create a subscription sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) sub.Spec.Sink = subscriberReceiveURL - _, err = natsClient.SyncSubscription(sub, cleaner) + _, appliedConfig, err := natsClient.SyncSubscription(sub, cleaner) if err != nil { t.Fatalf("sync subscription failed: %v", err) } @@ -309,10 +305,13 @@ func TestIsValidSubscription(t *testing.T) { g.Expect(err).ShouldNot(HaveOccurred()) g.Expect(subject).To(Not(BeEmpty())) + g.Expect(appliedConfig).NotTo(BeNil()) + g.Expect(appliedConfig.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + // get internal key var key string var natsSub *nats.Subscription - for i := 0; i < queueSubscribers; i++ { + for i := 0; i < appliedConfig.MaxInFlightMessages; i++ { key = createKey(sub, subject+string(types.Separator)+strconv.Itoa(i)) g.Expect(key).To(Not(BeEmpty())) natsSub = natsClient.subscriptions[key] @@ -345,7 +344,7 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription exist invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(queueSubscribers)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(appliedConfig.MaxInFlightMessages)) // restart NATS server natsServer = eventingtesting.RunNatsServerOnPort(natsPort) @@ -353,7 +352,7 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription still exist, the controller is not running... invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(queueSubscribers)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(appliedConfig.MaxInFlightMessages)) } diff --git a/components/eventing-controller/pkg/handlers/utils.go b/components/eventing-controller/pkg/handlers/utils.go index aabd43d0f78b..28b5b99167ff 100644 --- a/components/eventing-controller/pkg/handlers/utils.go +++ b/components/eventing-controller/pkg/handlers/utils.go @@ -35,7 +35,7 @@ type MessagingBackend interface { // It should return true if Kyma eventing subscription status was changed during this synchronization process. // TODO: Give up the usage of variadic parameters in the favor of using only subscription as input parameter. // TODO: This should contain all the infos necessary for the handler to do its job. - SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, error) + SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) // DeleteSubscription should delete the corresponding subscriber data of messaging backend DeleteSubscription(subscription *eventingv1alpha1.Subscription) error diff --git a/components/eventing-controller/reconciler/backend/reconciler.go b/components/eventing-controller/reconciler/backend/reconciler.go index 2a6b01dbf2ef..6d533209dfc1 100644 --- a/components/eventing-controller/reconciler/backend/reconciler.go +++ b/components/eventing-controller/reconciler/backend/reconciler.go @@ -61,10 +61,9 @@ type Reconciler struct { // TODO: Do we need to explicitly pass and use a cache here? The default client that we get from manager // already uses a cache internally (check manager.DefaultNewClient) cache.Cache - logger *logger.Logger - record record.EventRecorder - cfg env.BackendConfig - defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig + logger *logger.Logger + record record.EventRecorder + cfg env.BackendConfig // backendType is the type of the backend which the reconciler detects at runtime backendType eventingv1alpha1.BackendType @@ -81,9 +80,6 @@ func NewReconciler(ctx context.Context, natsCommander, bebCommander commander.Co logger: logger, record: recorder, cfg: cfg, - defaultSubscriptionConfig: &eventingv1alpha1.SubscriptionConfig{ - MaxInFlightMessages: cfg.DefaultSubscriptionConfig.MaxInFlightMessages, - }, } } @@ -710,7 +706,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *Reconciler) startNATSController() error { if !r.natsCommanderStarted { - if err := r.natsCommander.Start(r.defaultSubscriptionConfig, commander.Params{}); err != nil { + if err := r.natsCommander.Start(r.cfg.DefaultSubscriptionConfig, commander.Params{}); err != nil { r.namedLogger().Errorw("start NATS commander failed", "error", err) return err } @@ -735,7 +731,7 @@ func (r *Reconciler) stopNATSController() error { func (r *Reconciler) startBEBController(clientID, clientSecret string) error { if !r.bebCommanderStarted { bebCommanderParams := commander.Params{"client_id": clientID, "client_secret": clientSecret} - if err := r.bebCommander.Start(r.defaultSubscriptionConfig, bebCommanderParams); err != nil { + if err := r.bebCommander.Start(r.cfg.DefaultSubscriptionConfig, bebCommanderParams); err != nil { r.namedLogger().Errorw("start BEB commander failed", "error", err) return err } diff --git a/components/eventing-controller/reconciler/backend/reconciler_test.go b/components/eventing-controller/reconciler/backend/reconciler_test.go index 0d07cba61212..c1281d728c14 100644 --- a/components/eventing-controller/reconciler/backend/reconciler_test.go +++ b/components/eventing-controller/reconciler/backend/reconciler_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" + "github.com/go-logr/zapr" hydrav1alpha1 "github.com/ory/hydra-maester/api/v1alpha1" "github.com/stretchr/testify/assert" @@ -635,7 +637,7 @@ func (t *TestCommander) Init(_ manager.Manager) error { return nil } -func (t *TestCommander) Start(_ *eventingv1alpha1.SubscriptionConfig, _ commander.Params) error { +func (t *TestCommander) Start(_ env.DefaultSubscriptionConfig, _ commander.Params) error { return t.startErr } diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler.go b/components/eventing-controller/reconciler/subscription-nats/reconciler.go index 6618f013f2ca..0e52825a624e 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler.go @@ -39,7 +39,6 @@ type Reconciler struct { logger *logger.Logger recorder record.EventRecorder eventTypeCleaner eventtype.Cleaner - defaultSubscriptionConfig *eventingv1alpha1.SubscriptionConfig } var ( @@ -56,8 +55,8 @@ const ( reconcilerName = "nats-subscription-reconciler" ) -func NewReconciler(ctx context.Context, client client.Client, applicationLister *application.Lister, cache cache.Cache, logger *logger.Logger, recorder record.EventRecorder, cfg env.NatsConfig, defaultSubConf *eventingv1alpha1.SubscriptionConfig) *Reconciler { - natsHandler := handlers.NewNats(cfg, logger) +func NewReconciler(ctx context.Context, client client.Client, applicationLister *application.Lister, cache cache.Cache, logger *logger.Logger, recorder record.EventRecorder, cfg env.NatsConfig, subsCfg env.DefaultSubscriptionConfig) *Reconciler { + natsHandler := handlers.NewNats(cfg, subsCfg, logger) if err := natsHandler.Initialize(env.Config{}); err != nil { logger.WithContext().Errorw("start reconciler failed", "name", reconcilerName, "error", err) panic(err) @@ -71,7 +70,6 @@ func NewReconciler(ctx context.Context, client client.Client, applicationLister logger: logger, recorder: recorder, eventTypeCleaner: eventtype.NewCleaner(cfg.EventTypePrefix, applicationLister, logger), - defaultSubscriptionConfig: defaultSubConf, } } @@ -208,9 +206,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return result, nil } - appliedSubsConfig := eventingv1alpha1.MergeSubsConfigs(desiredSubscription.Spec.Config, r.defaultSubscriptionConfig) - desiredSubscription.Spec.Config = appliedSubsConfig - _, err = r.Backend.SyncSubscription(desiredSubscription, r.eventTypeCleaner) + _, appliedSubsConfig, err := r.Backend.SyncSubscription(desiredSubscription, r.eventTypeCleaner) if err != nil { log.Errorw("sync subscription failed", "error", err) if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 7a72e6df67b2..694f61fe74b0 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -3,7 +3,6 @@ package subscription_nats import ( "context" "fmt" - "github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/nats" "io/ioutil" "log" "net/http" @@ -329,7 +328,7 @@ var _ = BeforeSuite(func(done Done) { defaultLogger, err := logger.New(string(kymalogger.JSON), string(kymalogger.INFO)) Expect(err).To(BeNil()) - defaultSubsConfig := &eventingv1alpha1.SubscriptionConfig{ + defaultSubsConfig := env.DefaultSubscriptionConfig{ MaxInFlightMessages: 1, } err = NewReconciler( diff --git a/components/eventing-controller/reconciler/subscription/reconciler.go b/components/eventing-controller/reconciler/subscription/reconciler.go index 63b3a5bb4ef6..39f0fb6bbe01 100644 --- a/components/eventing-controller/reconciler/subscription/reconciler.go +++ b/components/eventing-controller/reconciler/subscription/reconciler.go @@ -238,7 +238,7 @@ func (r *Reconciler) syncBEBSubscription(subscription *eventingv1alpha1.Subscrip var statusChanged bool var err error - if statusChanged, err = r.Backend.SyncSubscription(subscription, r.eventTypeCleaner, apiRule); err != nil { + if statusChanged, _, err = r.Backend.SyncSubscription(subscription, r.eventTypeCleaner, apiRule); err != nil { logger.Errorw("update BEB subscription failed", "error", err) condition := eventingv1alpha1.MakeCondition(eventingv1alpha1.ConditionSubscribed, eventingv1alpha1.ConditionReasonSubscriptionCreationFailed, corev1.ConditionFalse, "") if err := r.updateCondition(subscription, condition, ctx); err != nil { diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index 88753914616e..8e8b65b4eed6 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -36,7 +36,7 @@ publisherProxy: image: # name is the name of the container image for the eventing-publisher-proxy name: "event-publisher-proxy" - tag: "PR-11649" + tag: "PR-11621" pullPolicy: IfNotPresent replicas: 1 resources: From 90890175d2eb0d7d272312a145e5a2bb93e5c5a7 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Thu, 15 Jul 2021 16:48:13 +0200 Subject: [PATCH 07/19] copy crds --- ...nting.kyma-project.io_eventingbackends.yaml | 2 +- ...eventing.kyma-project.io_subscriptions.yaml | 2 +- ...gbackends.eventing.kyma-project.io.crd.yaml | 2 +- ...criptions.eventing.kyma-project.io.crd.yaml | 18 +++++++++++++++++- ...gbackends.eventing.kyma-project.io.crd.yaml | 2 +- ...criptions.eventing.kyma-project.io.crd.yaml | 18 +++++++++++++++++- 6 files changed, 38 insertions(+), 6 deletions(-) diff --git a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml +++ b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_eventingbackends.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml index 98cc06e429ff..46a56e86cff1 100644 --- a/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml +++ b/components/eventing-controller/config/crd/bases/eventing.kyma-project.io_subscriptions.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: diff --git a/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml b/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml +++ b/installation/resources/crds/eventing/eventingbackends.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml b/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml index b24def2199e5..46a56e86cff1 100644 --- a/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml +++ b/installation/resources/crds/eventing/subscriptions.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB diff --git a/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml b/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml index bdaebfbe16c7..cc423aadcffa 100644 --- a/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml +++ b/resources/cluster-essentials/files/eventingbackends.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: eventingbackends.eventing.kyma-project.io spec: diff --git a/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml b/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml index b24def2199e5..46a56e86cff1 100644 --- a/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml +++ b/resources/cluster-essentials/files/subscriptions.eventing.kyma-project.io.crd.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.0 + controller-gen.kubebuilder.io/version: v0.6.0 creationTimestamp: null name: subscriptions.eventing.kyma-project.io spec: @@ -43,6 +43,14 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + config: + description: Config defines the configurations that can be applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object filter: description: Filter defines the list of filters properties: @@ -179,6 +187,14 @@ spec: - status type: object type: array + config: + description: Config defines the configurations that have been applied + to the eventing backend when creating this subscription + properties: + maxInFlightMessages: + minimum: 1 + type: integer + type: object emsSubscriptionStatus: description: EmsSubscriptionStatus defines the status of Subscription in BEB From 1d91d7f8fdf2844dcee437a5712273f1a0b380a3 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Fri, 16 Jul 2021 10:59:43 +0200 Subject: [PATCH 08/19] set default maxInFlight to 1 --- components/eventing-controller/pkg/env/backend_config.go | 2 +- resources/eventing/charts/controller/values.yaml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/components/eventing-controller/pkg/env/backend_config.go b/components/eventing-controller/pkg/env/backend_config.go index 8acf6c3e435a..9cae25c227f1 100644 --- a/components/eventing-controller/pkg/env/backend_config.go +++ b/components/eventing-controller/pkg/env/backend_config.go @@ -30,7 +30,7 @@ type PublisherConfig struct { } type DefaultSubscriptionConfig struct { - MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"9"` + MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"1"` } func GetBackendConfig() BackendConfig { diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index 8e8b65b4eed6..d9230ad68dbe 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -61,8 +61,7 @@ clusterRoleBindingSuffix: "" eventingBackend: name: eventing-backend - # Using the current EventMesh default value - defaultMaxInflightMessages: 9 + defaultMaxInflightMessages: 1 healthProbe: port: 8081 From 156df2b06495ac0f6814bfee225516d6a6f8a342 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Fri, 16 Jul 2021 11:46:51 +0200 Subject: [PATCH 09/19] set default maxInFlight to 10 --- components/eventing-controller/pkg/env/backend_config.go | 2 +- resources/eventing/charts/controller/values.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/components/eventing-controller/pkg/env/backend_config.go b/components/eventing-controller/pkg/env/backend_config.go index 9cae25c227f1..e445149f6603 100644 --- a/components/eventing-controller/pkg/env/backend_config.go +++ b/components/eventing-controller/pkg/env/backend_config.go @@ -30,7 +30,7 @@ type PublisherConfig struct { } type DefaultSubscriptionConfig struct { - MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"1"` + MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"10"` } func GetBackendConfig() BackendConfig { diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index d9230ad68dbe..6dd95c58f45e 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -61,7 +61,7 @@ clusterRoleBindingSuffix: "" eventingBackend: name: eventing-backend - defaultMaxInflightMessages: 1 + defaultMaxInflightMessages: 10 healthProbe: port: 8081 From c02dd5699ee4bc64d50ba68ea028fc0531396881 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Fri, 16 Jul 2021 13:53:43 +0200 Subject: [PATCH 10/19] some comments --- components/eventing-controller/pkg/handlers/utils.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/components/eventing-controller/pkg/handlers/utils.go b/components/eventing-controller/pkg/handlers/utils.go index 28b5b99167ff..19e0d21fcfb4 100644 --- a/components/eventing-controller/pkg/handlers/utils.go +++ b/components/eventing-controller/pkg/handlers/utils.go @@ -33,6 +33,10 @@ type MessagingBackend interface { // SyncSubscription should synchronize the Kyma eventing susbscription with the susbcriber infrastructure of messaging backend system. // It should return true if Kyma eventing subscription status was changed during this synchronization process. + // It returns the subscription configurations that were applied on the messaging backend. The returned configuration + // is the set of configs that were successfully applied to the subscription in the backend system. The distinction between the provided subscription + // config (sub.spec.config) and what is actually applied in the backend (sub.status.config) is necessary, since not all configs specified + // in the subscription spec might be available on the messaging backend system, and also some configs might be applied based on defaults and not based on the spec. // TODO: Give up the usage of variadic parameters in the favor of using only subscription as input parameter. // TODO: This should contain all the infos necessary for the handler to do its job. SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) From a024612f27115901514bb9468a70285f37615707 Mon Sep 17 00:00:00 2001 From: Radu Fantaziu Date: Fri, 16 Jul 2021 15:01:20 +0200 Subject: [PATCH 11/19] Add a key suffix based on NATS queue group instance no --- components/eventing-controller/pkg/handlers/nats.go | 11 +++++++---- .../eventing-controller/pkg/handlers/nats_test.go | 9 +++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index 37c7051981e7..afd8eb56b2d5 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -121,7 +121,6 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even } } - //natsSub, subscribeErr := n.connection.Subscribe(subject, callback) subsConfig = eventingv1alpha1.MergeSubsConfigs(sub.Spec.Config, &n.defaultSubsConfig) for i := 0; i < subsConfig.MaxInFlightMessages; i++ { natsSub, subscribeErr := n.connection.QueueSubscribe(subject, subject, callback) @@ -129,7 +128,7 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even log.Errorw("create NATS subscription failed", "error", err) return false, subsConfig, subscribeErr } - n.subscriptions[createKey(sub, subject+string(types.Separator)+strconv.Itoa(i))] = natsSub + n.subscriptions[createKey(sub, subject, i)] = natsSub } } @@ -236,8 +235,12 @@ func createKeyPrefix(sub *eventingv1alpha1.Subscription) string { return fmt.Sprintf("%s", namespacedName.String()) } -func createKey(sub *eventingv1alpha1.Subscription, subject string) string { - return fmt.Sprintf("%s.%s", createKeyPrefix(sub), subject) +func createKeySuffix(subject string, queueGoupInstanceNo int) string { + return subject + string(types.Separator) + strconv.Itoa(queueGoupInstanceNo) +} + +func createKey(sub *eventingv1alpha1.Subscription, subject string, queueGoupInstanceNo int) string { + return fmt.Sprintf("%s.%s", createKeyPrefix(sub), createKeySuffix(subject, queueGoupInstanceNo)) } func createSubject(filter *eventingv1alpha1.BebFilter, cleaner eventtype.Cleaner) (string, error) { diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index 7b0d885cc1d6..61d3deeff90a 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -4,16 +4,11 @@ import ( "context" "errors" "fmt" - "strconv" "strings" "testing" "time" "github.com/avast/retry-go" - "github.com/nats-io/nats.go" - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/types" - cev2event "github.com/cloudevents/sdk-go/v2/event" kymalogger "github.com/kyma-project/kyma/common/logging/logger" eventingv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" @@ -23,6 +18,8 @@ import ( "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers/eventtype" eventingtesting "github.com/kyma-project/kyma/components/eventing-controller/testing" + "github.com/nats-io/nats.go" + . "github.com/onsi/gomega" ) func TestConvertMsgToCE(t *testing.T) { @@ -312,7 +309,7 @@ func TestIsValidSubscription(t *testing.T) { var key string var natsSub *nats.Subscription for i := 0; i < appliedConfig.MaxInFlightMessages; i++ { - key = createKey(sub, subject+string(types.Separator)+strconv.Itoa(i)) + key = createKey(sub, subject, i) g.Expect(key).To(Not(BeEmpty())) natsSub = natsClient.subscriptions[key] g.Expect(natsSub).To(Not(BeNil())) From 30d854accdc50b73d50684377aa837ea6410be15 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Mon, 19 Jul 2021 13:17:29 +0200 Subject: [PATCH 12/19] get rid of the return value in SyncSubscription --- .../pkg/commander/beb/beb_test.go | 2 +- .../pkg/commander/nats/nats_test.go | 2 +- .../eventing-controller/pkg/handlers/beb.go | 18 ++++++++--------- .../pkg/handlers/beb_test.go | 2 +- .../eventing-controller/pkg/handlers/nats.go | 16 +++++++-------- .../pkg/handlers/nats_test.go | 20 +++++++++---------- .../eventing-controller/pkg/handlers/utils.go | 7 ++----- .../subscription-nats/reconciler.go | 18 ++++++++--------- .../reconciler/subscription/reconciler.go | 2 +- 9 files changed, 41 insertions(+), 46 deletions(-) diff --git a/components/eventing-controller/pkg/commander/beb/beb_test.go b/components/eventing-controller/pkg/commander/beb/beb_test.go index be92cb7cd831..d95fc308ee0a 100644 --- a/components/eventing-controller/pkg/commander/beb/beb_test.go +++ b/components/eventing-controller/pkg/commander/beb/beb_test.go @@ -81,7 +81,7 @@ func TestCleanup(t *testing.T) { // create a BEB subscription from Kyma subscription fakeCleaner := fake.Cleaner{} - _, _, err = bebCommander.Backend.SyncSubscription(subscription, &fakeCleaner, apiRule) + _, err = bebCommander.Backend.SyncSubscription(subscription, &fakeCleaner, apiRule) g.Expect(err).To(gomega.BeNil()) // check that the susbcription exist in bebMock diff --git a/components/eventing-controller/pkg/commander/nats/nats_test.go b/components/eventing-controller/pkg/commander/nats/nats_test.go index 0ef69473f0a6..b5b8d520ca52 100644 --- a/components/eventing-controller/pkg/commander/nats/nats_test.go +++ b/components/eventing-controller/pkg/commander/nats/nats_test.go @@ -70,7 +70,7 @@ func TestCleanup(t *testing.T) { fakeCleaner := fake.Cleaner{} // Create NATS subscription - _, _, err = natsCommander.Backend.SyncSubscription(testSub, &fakeCleaner) + _, err = natsCommander.Backend.SyncSubscription(testSub, &fakeCleaner) g.Expect(err).To(gomega.BeNil()) // Make sure subscriber works diff --git a/components/eventing-controller/pkg/handlers/beb.go b/components/eventing-controller/pkg/handlers/beb.go index cb907675de1b..ca939b117c30 100644 --- a/components/eventing-controller/pkg/handlers/beb.go +++ b/components/eventing-controller/pkg/handlers/beb.go @@ -76,7 +76,7 @@ func getWebHookAuth(cfg env.Config, credentials *OAuth2ClientCredentials) *types } // SyncSubscription synchronize the EV2 subscription with the EMS subscription. It returns true, if the EV2 subscription status was changed -func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) { +func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, error) { // Format logger log := utils.LoggerWithSubscription(b.namedLogger(), subscription) @@ -91,13 +91,13 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea sEv2, err := getInternalView4Ev2(subscription, apiRule, b.WebhookAuth, b.ProtocolSettings, b.Namespace) if err != nil { log.Errorw("get Kyma subscription internal view failed", "error", err) - return false, nil, err + return false, err } newEv2Hash, err := getHash(sEv2) if err != nil { log.Errorw("get Kyma subscription hash failed", "error", err) - return false, nil, err + return false, err } var bebSubscription *types.Subscription @@ -107,7 +107,7 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea var newEMSHash int64 bebSubscription, newEMSHash, err = b.deleteCreateAndHashSubscription(sEv2, cleaner, log) if err != nil { - return false, nil, err + return false, err } subscription.Status.Ev2hash = newEv2Hash subscription.Status.Emshash = newEMSHash @@ -117,24 +117,24 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea bebSubscription, err = b.getSubscription(sEv2.Name) if err != nil { log.Errorw("get BEB subscription failed", "error", err) - return false, nil, err + return false, err } // get the internal view for the EMS subscription sEms, err := getInternalView4Ems(bebSubscription) if err != nil { log.Errorw("get BEB subscription internal view failed", "error", err) - return false, nil, err + return false, err } newEmsHash, err := getHash(sEms) if err != nil { log.Errorw("get BEB subscription hash failed", "error", err) - return false, nil, err + return false, err } if newEmsHash != subscription.Status.Emshash { // delete & create a new EMS subscription bebSubscription, newEmsHash, err = b.deleteCreateAndHashSubscription(sEv2, cleaner, log) if err != nil { - return false, nil, err + return false, err } subscription.Status.Emshash = newEmsHash statusChanged = true @@ -143,7 +143,7 @@ func (b *Beb) SyncSubscription(subscription *eventingv1alpha1.Subscription, clea // set the status of bebSubscription in ev2Subscription statusChanged = b.setEmsSubscriptionStatus(subscription, bebSubscription) || statusChanged - return statusChanged, nil, nil + return statusChanged, nil } // DeleteSubscription deletes the corresponding EMS subscription diff --git a/components/eventing-controller/pkg/handlers/beb_test.go b/components/eventing-controller/pkg/handlers/beb_test.go index 625a79bc2133..a8d5fdf7769c 100644 --- a/components/eventing-controller/pkg/handlers/beb_test.go +++ b/components/eventing-controller/pkg/handlers/beb_test.go @@ -51,7 +51,7 @@ func Test_SyncBebSubscription(t *testing.T) { controllertesting.WithService("foo-host", "foo-svc", apiRule) // then - changed, _, err := beb.SyncSubscription(subscription, nil, apiRule) + changed, err := beb.SyncSubscription(subscription, nil, apiRule) g.Expect(err).To(Not(BeNil())) g.Expect(changed).To(BeFalse()) } diff --git a/components/eventing-controller/pkg/handlers/nats.go b/components/eventing-controller/pkg/handlers/nats.go index afd8eb56b2d5..cd407fca16c6 100644 --- a/components/eventing-controller/pkg/handlers/nats.go +++ b/components/eventing-controller/pkg/handlers/nats.go @@ -90,13 +90,12 @@ func newCloudeventClient(config env.NatsConfig) (cev2.Client, error) { // SyncSubscription synchronizes the given Kyma subscription to NATS subscription. // note: the returned bool should be ignored now. It should act as a marker for changed subscription status. -func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, _ ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) { - var subsConfig *eventingv1alpha1.SubscriptionConfig +func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, _ ...interface{}) (bool, error) { var filters []*eventingv1alpha1.BebFilter if sub.Spec.Filter != nil { uniqueFilters, err := sub.Spec.Filter.Deduplicate() if err != nil { - return false, subsConfig, errors.Wrap(err, "deduplicate subscription filters failed") + return false, errors.Wrap(err, "deduplicate subscription filters failed") } filters = uniqueFilters.Filters } @@ -104,12 +103,13 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even // Format logger log := utils.LoggerWithSubscription(n.namedLogger(), sub) + subsConfig := eventingv1alpha1.MergeSubsConfigs(sub.Spec.Config, &n.defaultSubsConfig) // Create subscriptions in NATS for _, filter := range filters { subject, err := createSubject(filter, cleaner) if err != nil { log.Errorw("create NATS subject failed", "error", err) - return false, subsConfig, err + return false, err } callback := n.getCallback(sub.Spec.Sink) @@ -117,22 +117,22 @@ func (n *Nats) SyncSubscription(sub *eventingv1alpha1.Subscription, cleaner even if n.connection.Status() != nats.CONNECTED { if err := n.Initialize(env.Config{}); err != nil { log.Errorw("reset NATS connection failed", "status", n.connection.Stats(), "error", err) - return false, subsConfig, err + return false, err } } - subsConfig = eventingv1alpha1.MergeSubsConfigs(sub.Spec.Config, &n.defaultSubsConfig) for i := 0; i < subsConfig.MaxInFlightMessages; i++ { natsSub, subscribeErr := n.connection.QueueSubscribe(subject, subject, callback) if subscribeErr != nil { log.Errorw("create NATS subscription failed", "error", err) - return false, subsConfig, subscribeErr + return false, subscribeErr } n.subscriptions[createKey(sub, subject, i)] = natsSub } } + sub.Status.Config = subsConfig - return false, subsConfig, nil + return false, nil } // DeleteSubscription deletes all NATS subscriptions corresponding to a Kyma subscription diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index 61d3deeff90a..f010b737da26 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -133,12 +133,12 @@ func TestSubscription(t *testing.T) { // Create a subscription sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) sub.Spec.Sink = subscriberReceiveURL - _, appliedSubConfig, err := natsClient.SyncSubscription(sub, cleaner) + _, err = natsClient.SyncSubscription(sub, cleaner) if err != nil { t.Fatalf("sync subscription failed: %v", err) } - g.Expect(appliedSubConfig).NotTo(BeNil()) - g.Expect(appliedSubConfig.MaxInFlightMessages).To(Equal(defaultMaxInflight)) + g.Expect(sub.Status.Config).NotTo(BeNil()) // It should apply the defaults + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultMaxInflight)) data := "sampledata" // Send an event @@ -235,7 +235,7 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { } sub.Spec.Sink = subscriberReceiveURL idFunc := func(et string) (string, error) { return et, nil } - if _, _, err := natsClient.SyncSubscription(sub, eventtype.CleanerFunc(idFunc)); err != nil { + if _, err := natsClient.SyncSubscription(sub, eventtype.CleanerFunc(idFunc)); err != nil { t.Fatalf("sync subscription failed: %s", err.Error()) } @@ -291,7 +291,7 @@ func TestIsValidSubscription(t *testing.T) { // Create a subscription sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) sub.Spec.Sink = subscriberReceiveURL - _, appliedConfig, err := natsClient.SyncSubscription(sub, cleaner) + _, err = natsClient.SyncSubscription(sub, cleaner) if err != nil { t.Fatalf("sync subscription failed: %v", err) } @@ -302,13 +302,13 @@ func TestIsValidSubscription(t *testing.T) { g.Expect(err).ShouldNot(HaveOccurred()) g.Expect(subject).To(Not(BeEmpty())) - g.Expect(appliedConfig).NotTo(BeNil()) - g.Expect(appliedConfig.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) // get internal key var key string var natsSub *nats.Subscription - for i := 0; i < appliedConfig.MaxInFlightMessages; i++ { + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { key = createKey(sub, subject, i) g.Expect(key).To(Not(BeEmpty())) natsSub = natsClient.subscriptions[key] @@ -341,7 +341,7 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription exist invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(appliedConfig.MaxInFlightMessages)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(sub.Status.Config.MaxInFlightMessages)) // restart NATS server natsServer = eventingtesting.RunNatsServerOnPort(natsPort) @@ -349,7 +349,7 @@ func TestIsValidSubscription(t *testing.T) { // check that only one invalid subscription still exist, the controller is not running... invalidNsn = natsClient.GetInvalidSubscriptions() - g.Expect(len(*invalidNsn)).To(BeIdenticalTo(appliedConfig.MaxInFlightMessages)) + g.Expect(len(*invalidNsn)).To(BeIdenticalTo(sub.Status.Config.MaxInFlightMessages)) } diff --git a/components/eventing-controller/pkg/handlers/utils.go b/components/eventing-controller/pkg/handlers/utils.go index 19e0d21fcfb4..05ae86adcc12 100644 --- a/components/eventing-controller/pkg/handlers/utils.go +++ b/components/eventing-controller/pkg/handlers/utils.go @@ -33,13 +33,10 @@ type MessagingBackend interface { // SyncSubscription should synchronize the Kyma eventing susbscription with the susbcriber infrastructure of messaging backend system. // It should return true if Kyma eventing subscription status was changed during this synchronization process. - // It returns the subscription configurations that were applied on the messaging backend. The returned configuration - // is the set of configs that were successfully applied to the subscription in the backend system. The distinction between the provided subscription - // config (sub.spec.config) and what is actually applied in the backend (sub.status.config) is necessary, since not all configs specified - // in the subscription spec might be available on the messaging backend system, and also some configs might be applied based on defaults and not based on the spec. + // It sets subscription.status.config with configurations that were applied on the messaging backend when creating the subscription. // TODO: Give up the usage of variadic parameters in the favor of using only subscription as input parameter. // TODO: This should contain all the infos necessary for the handler to do its job. - SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, *eventingv1alpha1.SubscriptionConfig, error) + SyncSubscription(subscription *eventingv1alpha1.Subscription, cleaner eventtype.Cleaner, params ...interface{}) (bool, error) // DeleteSubscription should delete the corresponding subscriber data of messaging backend DeleteSubscription(subscription *eventingv1alpha1.Subscription) error diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler.go b/components/eventing-controller/reconciler/subscription-nats/reconciler.go index 0e52825a624e..49febd255206 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler.go @@ -173,7 +173,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Check for valid sink if err := r.assertSinkValidity(actualSubscription.Spec.Sink); err != nil { log.Errorw("parse sink URL failed", "error", err) - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { return ctrl.Result{}, err } // No point in reconciling as the sink is invalid @@ -183,7 +183,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Clean up the old subscriptions if err := r.Backend.DeleteSubscription(desiredSubscription); err != nil { log.Errorw("delete subscription failed", "error", err) - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err @@ -206,18 +206,19 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return result, nil } - _, appliedSubsConfig, err := r.Backend.SyncSubscription(desiredSubscription, r.eventTypeCleaner) + _, err = r.Backend.SyncSubscription(desiredSubscription, r.eventTypeCleaner) if err != nil { log.Errorw("sync subscription failed", "error", err) - if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error(), nil); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, false, err.Error()); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err } log.Debug("create NATS subscriptions succeeded") + actualSubscription.Status.Config = desiredSubscription.Status.Config // Update status - if err := r.syncSubscriptionStatus(ctx, actualSubscription, true, "", appliedSubsConfig); err != nil { + if err := r.syncSubscriptionStatus(ctx, actualSubscription, true, ""); err != nil { return ctrl.Result{}, err } @@ -227,7 +228,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // syncSubscriptionStatus syncs Subscription status // subsConfig is the subscription configuration that was applied to the subscription. It is set only if the // isNatsSubReady is true. -func (r *Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1alpha1.Subscription, isNatsSubReady bool, message string, subsConfig *eventingv1alpha1.SubscriptionConfig) error { +func (r *Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1alpha1.Subscription, isNatsSubReady bool, message string) error { desiredSubscription := sub.DeepCopy() desiredConditions := make([]eventingv1alpha1.Condition, 0) conditionAdded := false @@ -254,9 +255,6 @@ func (r *Reconciler) syncSubscriptionStatus(ctx context.Context, sub *eventingv1 } desiredSubscription.Status.Conditions = desiredConditions desiredSubscription.Status.Ready = isNatsSubReady - if isNatsSubReady { - desiredSubscription.Status.Config = subsConfig - } if !reflect.DeepEqual(sub.Status, desiredSubscription.Status) { err := r.Client.Status().Update(ctx, desiredSubscription, &client.UpdateOptions{}) @@ -290,7 +288,7 @@ func (r *Reconciler) syncInvalidSubscriptions(ctx context.Context) (ctrl.Result, continue } // mark the subscription to be not ready, it will throw a new reconcile call - if err := r.syncSubscriptionStatus(ctx, sub, false, "invalid subscription", nil); err != nil { + if err := r.syncSubscriptionStatus(ctx, sub, false, "invalid subscription"); err != nil { r.namedLogger().Errorw("sync status for invalid subscription failed", "namespace", v.Namespace, "name", v.Name, "error", err) return ctrl.Result{}, err } diff --git a/components/eventing-controller/reconciler/subscription/reconciler.go b/components/eventing-controller/reconciler/subscription/reconciler.go index 39f0fb6bbe01..63b3a5bb4ef6 100644 --- a/components/eventing-controller/reconciler/subscription/reconciler.go +++ b/components/eventing-controller/reconciler/subscription/reconciler.go @@ -238,7 +238,7 @@ func (r *Reconciler) syncBEBSubscription(subscription *eventingv1alpha1.Subscrip var statusChanged bool var err error - if statusChanged, _, err = r.Backend.SyncSubscription(subscription, r.eventTypeCleaner, apiRule); err != nil { + if statusChanged, err = r.Backend.SyncSubscription(subscription, r.eventTypeCleaner, apiRule); err != nil { logger.Errorw("update BEB subscription failed", "error", err) condition := eventingv1alpha1.MakeCondition(eventingv1alpha1.ConditionSubscribed, eventingv1alpha1.ConditionReasonSubscriptionCreationFailed, corev1.ConditionFalse, "") if err := r.updateCondition(subscription, condition, ctx); err != nil { From b1c051e46c8844a7982bee80ce0993ff06392c89 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Mon, 19 Jul 2021 13:48:50 +0200 Subject: [PATCH 13/19] add test for maxInFlight change --- .../pkg/handlers/nats_test.go | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/components/eventing-controller/pkg/handlers/nats_test.go b/components/eventing-controller/pkg/handlers/nats_test.go index f010b737da26..ff239cdc478a 100644 --- a/components/eventing-controller/pkg/handlers/nats_test.go +++ b/components/eventing-controller/pkg/handlers/nats_test.go @@ -255,6 +255,90 @@ func TestSubscriptionWithDuplicateFilters(t *testing.T) { g.Expect(err).Should(HaveOccurred()) } +func TestSubscriptionWithMaxInFlightChange(t *testing.T) { + g := NewWithT(t) + + natsPort := 5223 + subscriberPort := 8080 + subscriberReceiveURL := fmt.Sprintf("http://127.0.0.1:%d/store", subscriberPort) + + // Start NATS server + natsServer := eventingtesting.RunNatsServerOnPort(natsPort) + defer eventingtesting.ShutDownNATSServer(natsServer) + + defaultLogger, err := logger.New(string(kymalogger.JSON), string(kymalogger.INFO)) + if err != nil { + t.Fatalf("initialize logger failed: %v", err) + } + + natsConfig := env.NatsConfig{ + Url: natsServer.ClientURL(), + MaxReconnects: 2, + ReconnectWait: time.Second, + } + defaultSubsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 5} + natsBackend := NewNats(natsConfig, defaultSubsConfig, defaultLogger) + + if err := natsBackend.Initialize(env.Config{}); err != nil { + t.Fatalf("connect to NATS server failed: %v", err) + } + + // Prepare event-type cleaner + application := applicationtest.NewApplication(eventingtesting.ApplicationNameNotClean, nil) + applicationLister := fake.NewApplicationListerOrDie(context.Background(), application) + cleaner := eventtype.NewCleaner(eventingtesting.EventTypePrefix, applicationLister, defaultLogger) + + // Create a subscription + sub := eventingtesting.NewSubscription("sub", "foo", eventingtesting.WithNotCleanEventTypeFilter) + sub.Spec.Sink = subscriberReceiveURL + _, err = natsBackend.SyncSubscription(sub, cleaner) + if err != nil { + t.Fatalf("sync subscription failed: %v", err) + } + + filter := sub.Spec.Filter.Filters[0] + subject, err := createSubject(filter, cleaner) + g.Expect(err).ShouldNot(HaveOccurred()) + g.Expect(subject).To(Not(BeEmpty())) + + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + + // get internal key + var key string + var natsSub *nats.Subscription + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { + key = createKey(sub, subject, i) + natsSub = natsBackend.subscriptions[key] + g.Expect(natsSub).To(Not(BeNil())) + g.Expect(natsSub.IsValid()).To(BeTrue()) + } + g.Expect(len(natsBackend.subscriptions)).To(Equal(defaultSubsConfig.MaxInFlightMessages)) + + // check that no invalid subscriptions exist + invalidNsn := natsBackend.GetInvalidSubscriptions() + g.Expect(len(*invalidNsn)).To(BeZero()) + + sub.Spec.Config = &eventingv1alpha1.SubscriptionConfig{MaxInFlightMessages: 7} + _, err = natsBackend.SyncSubscription(sub, cleaner) + if err != nil { + t.Fatalf("sync subscription failed: %v", err) + } + + g.Expect(sub.Status.Config).NotTo(BeNil()) + g.Expect(sub.Status.Config.MaxInFlightMessages).To(Equal(sub.Spec.Config.MaxInFlightMessages)) + for i := 0; i < sub.Status.Config.MaxInFlightMessages; i++ { + key = createKey(sub, subject, i) + natsSub = natsBackend.subscriptions[key] + g.Expect(natsSub).To(Not(BeNil())) + g.Expect(natsSub.IsValid()).To(BeTrue()) + } + g.Expect(len(natsBackend.subscriptions)).To(Equal(sub.Spec.Config.MaxInFlightMessages)) + // check that no invalid subscriptions exist + invalidNsn = natsBackend.GetInvalidSubscriptions() + g.Expect(len(*invalidNsn)).To(BeZero()) +} + func TestIsValidSubscription(t *testing.T) { g := NewWithT(t) From ef89cf9ece967ad65510d48b9992587f5676719f Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Mon, 19 Jul 2021 15:52:57 +0200 Subject: [PATCH 14/19] use correct image tags --- resources/eventing/charts/controller/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index 6dd95c58f45e..ec5e692205a9 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -15,7 +15,7 @@ image: # name is the name of the container image for the eventing-controller name: "eventing-controller" # tag is the container tag of the eventing-controller image - tag: "PR-11649" + tag: "PR-11621" # pullPolicy is the pullPolicy for the eventing-controller image pullPolicy: "IfNotPresent" @@ -36,7 +36,7 @@ publisherProxy: image: # name is the name of the container image for the eventing-publisher-proxy name: "event-publisher-proxy" - tag: "PR-11621" + tag: "PR-11649" pullPolicy: IfNotPresent replicas: 1 resources: From 75e9ea4c1216427eab893e7213dfcf32b0bb6804 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Mon, 19 Jul 2021 17:03:14 +0200 Subject: [PATCH 15/19] more tests --- .../api/v1alpha1/subscription_types_test.go | 40 +++++++++++ .../subscription-nats/reconciler_test.go | 72 ++++++++++++++++++- .../eventing-controller/testing/matchers.go | 6 ++ .../eventing/charts/controller/values.yaml | 2 +- 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/components/eventing-controller/api/v1alpha1/subscription_types_test.go b/components/eventing-controller/api/v1alpha1/subscription_types_test.go index 90cb29c65c40..c117ec46a2fc 100644 --- a/components/eventing-controller/api/v1alpha1/subscription_types_test.go +++ b/components/eventing-controller/api/v1alpha1/subscription_types_test.go @@ -3,6 +3,8 @@ package v1alpha1 import ( "reflect" "testing" + + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" ) const ( @@ -85,3 +87,41 @@ func TestBebFilters_Deduplicate(t *testing.T) { }) } } + +func TestMergeSubsConfigs(t *testing.T) { + defaultConf := &env.DefaultSubscriptionConfig{MaxInFlightMessages: 4} + tests := []struct { + caseName string + inputConf *SubscriptionConfig + inputDefaults *env.DefaultSubscriptionConfig + expectedOutput *SubscriptionConfig + }{ + { + caseName: "nil input config", + inputConf: nil, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4}, + }, + { + caseName: "default is overridden", + inputConf: &SubscriptionConfig{MaxInFlightMessages: 10}, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 10}, + }, + { + caseName: "provided input is invalid", + inputConf: &SubscriptionConfig{MaxInFlightMessages: 0}, + inputDefaults: defaultConf, + expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4}, + }, + } + + for _, tt := range tests { + t.Run(tt.caseName, func(t *testing.T) { + got := MergeSubsConfigs(tt.inputConf, tt.inputDefaults) + if !reflect.DeepEqual(got, tt.expectedOutput) { + t.Errorf("MergeSubsConfigs() got = %v, want = %v", got, tt.expectedOutput) + } + }) + } +} diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 694f61fe74b0..5d76265e8868 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -83,6 +83,9 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { eventingv1alpha1.ConditionSubscriptionActive, eventingv1alpha1.ConditionReasonNATSSubscriptionActive, v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + }), )) // publish a message @@ -162,6 +165,71 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { }) }) + When("Changing subscription configuration", func() { + It("Should reflect the new config in the subscription status", func() { + By("Creating the subscription using the default config") + ctx := context.Background() + subscriptionName := fmt.Sprintf("sub-%d", testId) + + // create subscriber + result := make(chan []byte) + url, shutdown := newSubscriber(result) + defer shutdown() + + // create subscription + givenSubscription := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithNotCleanEventTypeFilter, reconcilertesting.WithWebhookForNats) + givenSubscription.Spec.Sink = url + ensureSubscriptionCreated(givenSubscription, ctx) + + getSubscription(givenSubscription, ctx).Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + }), + )) + + By("Updating the subscription configuration in the spec") + + newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 + givenSubscription.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + } + err := k8sClient.Update(ctx, givenSubscription) + Expect(err).NotTo(HaveOccurred()) + + getSubscription(givenSubscription, ctx).Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + }), + )) + + // publish a message + connection, err := connectToNats(natsUrl) + Expect(err).ShouldNot(HaveOccurred()) + err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) + Expect(err).ShouldNot(HaveOccurred()) + + // make sure that the subscriber received the message + Eventually(func() bool { + sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) + received := string(<-result) + return sent == received + }).Should(BeTrue()) + + Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) + isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) + }) + }) + When("Creating a Subscription with empty event type", func() { It("Should mark the subscription as not ready", func() { ctx := context.Background() @@ -270,6 +338,7 @@ var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment var natsServer *natsserver.Server +var defaultSubsConfig = env.DefaultSubscriptionConfig{MaxInFlightMessages: 1} func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -328,9 +397,6 @@ var _ = BeforeSuite(func(done Done) { defaultLogger, err := logger.New(string(kymalogger.JSON), string(kymalogger.INFO)) Expect(err).To(BeNil()) - defaultSubsConfig := env.DefaultSubscriptionConfig{ - MaxInFlightMessages: 1, - } err = NewReconciler( context.Background(), k8sManager.GetClient(), diff --git a/components/eventing-controller/testing/matchers.go b/components/eventing-controller/testing/matchers.go index 4b1196ba802a..db680c8d7080 100644 --- a/components/eventing-controller/testing/matchers.go +++ b/components/eventing-controller/testing/matchers.go @@ -40,6 +40,12 @@ func HaveNotFoundSubscription(isReallyDeleted bool) GomegaMatcher { return WithTransform(func(isDeleted bool) bool { return isDeleted }, Equal(isReallyDeleted)) } +func HaveSubsConfiguration(subsConf *eventingv1alpha1.SubscriptionConfig) GomegaMatcher { + return WithTransform(func(s *eventingv1alpha1.Subscription) *eventingv1alpha1.SubscriptionConfig { + return s.Status.Config + }, Equal(subsConf)) +} + func IsAnEmptySubscription() GomegaMatcher { return WithTransform(func(s *eventingv1alpha1.Subscription) bool { emptySub := eventingv1alpha1.Subscription{} diff --git a/resources/eventing/charts/controller/values.yaml b/resources/eventing/charts/controller/values.yaml index ec5e692205a9..8f8c54916742 100644 --- a/resources/eventing/charts/controller/values.yaml +++ b/resources/eventing/charts/controller/values.yaml @@ -36,7 +36,7 @@ publisherProxy: image: # name is the name of the container image for the eventing-publisher-proxy name: "event-publisher-proxy" - tag: "PR-11649" + tag: "947fb1cd" pullPolicy: IfNotPresent replicas: 1 resources: From d3a9534dea63fde325c5724cdbf294048d0369ca Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Tue, 20 Jul 2021 10:16:30 +0200 Subject: [PATCH 16/19] disable test --- .../subscription-nats/reconciler_test.go | 131 +++++++++--------- 1 file changed, 67 insertions(+), 64 deletions(-) diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 5d76265e8868..6b3468e89cb4 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -41,6 +41,9 @@ import ( const ( smallTimeOut = 5 * time.Second smallPollingInterval = 1 * time.Second + + //testTimeOut = 60 * time.Second + //testPollingInterval = 3 * time.Second ) var _ = Describe("NATS Subscription Reconciliation Tests", func() { @@ -165,70 +168,70 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { }) }) - When("Changing subscription configuration", func() { - It("Should reflect the new config in the subscription status", func() { - By("Creating the subscription using the default config") - ctx := context.Background() - subscriptionName := fmt.Sprintf("sub-%d", testId) - - // create subscriber - result := make(chan []byte) - url, shutdown := newSubscriber(result) - defer shutdown() - - // create subscription - givenSubscription := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithNotCleanEventTypeFilter, reconcilertesting.WithWebhookForNats) - givenSubscription.Spec.Sink = url - ensureSubscriptionCreated(givenSubscription, ctx) - - getSubscription(givenSubscription, ctx).Should(And( - reconcilertesting.HaveSubscriptionName(subscriptionName), - reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( - eventingv1alpha1.ConditionSubscriptionActive, - eventingv1alpha1.ConditionReasonNATSSubscriptionActive, - v1.ConditionTrue, "")), - reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ - MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, - }), - )) - - By("Updating the subscription configuration in the spec") - - newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 - givenSubscription.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ - MaxInFlightMessages: newMaxInFlight, - } - err := k8sClient.Update(ctx, givenSubscription) - Expect(err).NotTo(HaveOccurred()) - - getSubscription(givenSubscription, ctx).Should(And( - reconcilertesting.HaveSubscriptionName(subscriptionName), - reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( - eventingv1alpha1.ConditionSubscriptionActive, - eventingv1alpha1.ConditionReasonNATSSubscriptionActive, - v1.ConditionTrue, "")), - reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ - MaxInFlightMessages: newMaxInFlight, - }), - )) - - // publish a message - connection, err := connectToNats(natsUrl) - Expect(err).ShouldNot(HaveOccurred()) - err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) - Expect(err).ShouldNot(HaveOccurred()) - - // make sure that the subscriber received the message - Eventually(func() bool { - sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) - received := string(<-result) - return sent == received - }).Should(BeTrue()) - - Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) - isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) - }) - }) + //When("Changing subscription configuration", func() { + // It("Should reflect the new config in the subscription status", func() { + // By("Creating the subscription using the default config") + // ctx := context.Background() + // subscriptionName := fmt.Sprintf("sub-%d", testId) + // + // // create subscriber + // result := make(chan []byte) + // url, shutdown := newSubscriber(result) + // defer shutdown() + // + // // create subscription + // givenSubscription := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithNotCleanEventTypeFilter, reconcilertesting.WithWebhookForNats) + // givenSubscription.Spec.Sink = url + // ensureSubscriptionCreated(givenSubscription, ctx) + // + // getSubscription(givenSubscription, ctx).Should(And( + // reconcilertesting.HaveSubscriptionName(subscriptionName), + // reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + // eventingv1alpha1.ConditionSubscriptionActive, + // eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + // v1.ConditionTrue, "")), + // reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + // MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + // }), + // )) + // + // By("Updating the subscription configuration in the spec") + // + // newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 + // givenSubscription.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ + // MaxInFlightMessages: newMaxInFlight, + // } + // err := k8sClient.Update(ctx, givenSubscription) + // Expect(err).NotTo(HaveOccurred()) + // + // getSubscription(givenSubscription, ctx).Should(And( + // reconcilertesting.HaveSubscriptionName(subscriptionName), + // reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + // eventingv1alpha1.ConditionSubscriptionActive, + // eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + // v1.ConditionTrue, "")), + // reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + // MaxInFlightMessages: newMaxInFlight, + // }), + // )) + // + // // publish a message + // connection, err := connectToNats(natsUrl) + // Expect(err).ShouldNot(HaveOccurred()) + // err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) + // Expect(err).ShouldNot(HaveOccurred()) + // + // // make sure that the subscriber received the message + // Eventually(func() bool { + // sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) + // received := string(<-result) + // return sent == received + // }).Should(BeTrue()) + // + // Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) + // isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) + // }) + //}) When("Creating a Subscription with empty event type", func() { It("Should mark the subscription as not ready", func() { From f395c7b5253c1aca556c35105b4d80e9ef9612b7 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Tue, 20 Jul 2021 13:56:35 +0200 Subject: [PATCH 17/19] try fixing the test --- .../subscription-nats/reconciler_test.go | 155 ++++++++++-------- 1 file changed, 87 insertions(+), 68 deletions(-) diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 6b3468e89cb4..422c75a4f724 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -42,8 +42,8 @@ const ( smallTimeOut = 5 * time.Second smallPollingInterval = 1 * time.Second - //testTimeOut = 60 * time.Second - //testPollingInterval = 3 * time.Second + timeOut = 60 * time.Second + pollingInterval = 3 * time.Second ) var _ = Describe("NATS Subscription Reconciliation Tests", func() { @@ -102,7 +102,7 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) received := string(<-result) return sent == received - }).Should(BeTrue()) + }, timeOut, pollingInterval).Should(BeTrue()) Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) @@ -164,74 +164,76 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) received := string(<-result) return sent == received - }).Should(BeTrue()) + }, timeOut, pollingInterval).Should(BeTrue()) }) }) - //When("Changing subscription configuration", func() { - // It("Should reflect the new config in the subscription status", func() { - // By("Creating the subscription using the default config") - // ctx := context.Background() - // subscriptionName := fmt.Sprintf("sub-%d", testId) - // - // // create subscriber - // result := make(chan []byte) - // url, shutdown := newSubscriber(result) - // defer shutdown() - // - // // create subscription - // givenSubscription := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithNotCleanEventTypeFilter, reconcilertesting.WithWebhookForNats) - // givenSubscription.Spec.Sink = url - // ensureSubscriptionCreated(givenSubscription, ctx) - // - // getSubscription(givenSubscription, ctx).Should(And( - // reconcilertesting.HaveSubscriptionName(subscriptionName), - // reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( - // eventingv1alpha1.ConditionSubscriptionActive, - // eventingv1alpha1.ConditionReasonNATSSubscriptionActive, - // v1.ConditionTrue, "")), - // reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ - // MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, - // }), - // )) - // - // By("Updating the subscription configuration in the spec") - // - // newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 - // givenSubscription.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ - // MaxInFlightMessages: newMaxInFlight, - // } - // err := k8sClient.Update(ctx, givenSubscription) - // Expect(err).NotTo(HaveOccurred()) - // - // getSubscription(givenSubscription, ctx).Should(And( - // reconcilertesting.HaveSubscriptionName(subscriptionName), - // reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( - // eventingv1alpha1.ConditionSubscriptionActive, - // eventingv1alpha1.ConditionReasonNATSSubscriptionActive, - // v1.ConditionTrue, "")), - // reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ - // MaxInFlightMessages: newMaxInFlight, - // }), - // )) - // - // // publish a message - // connection, err := connectToNats(natsUrl) - // Expect(err).ShouldNot(HaveOccurred()) - // err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) - // Expect(err).ShouldNot(HaveOccurred()) - // - // // make sure that the subscriber received the message - // Eventually(func() bool { - // sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) - // received := string(<-result) - // return sent == received - // }).Should(BeTrue()) - // - // Expect(k8sClient.Delete(ctx, givenSubscription)).Should(BeNil()) - // isSubscriptionDeleted(givenSubscription, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) - // }) - //}) + When("Changing subscription configuration", func() { + It("Should reflect the new config in the subscription status", func() { + By("Creating the subscription using the default config") + ctx := context.Background() + subscriptionName := fmt.Sprintf("sub-%d", testId) + + // create subscriber + result := make(chan []byte) + url, shutdown := newSubscriber(result) + defer shutdown() + + // create subscription + sub := reconcilertesting.NewSubscription(subscriptionName, namespaceName, reconcilertesting.WithEventTypeFilter, reconcilertesting.WithWebhookForNats) + sub.Spec.Sink = url + ensureSubscriptionCreated(sub, ctx) + + getSubscription(sub, ctx).Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, "")), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: defaultSubsConfig.MaxInFlightMessages, + }), + )) + + By("Updating the subscription configuration in the spec") + + newMaxInFlight := defaultSubsConfig.MaxInFlightMessages + 1 + changedSub := sub.DeepCopy() + changedSub.Spec.Config = &eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + } + Expect(k8sClient.Update(ctx, changedSub)).Should(BeNil()) + + Eventually(subscriptionGetter(ctx, sub.Name, sub.Namespace), timeOut, pollingInterval). + Should(And( + reconcilertesting.HaveSubscriptionName(subscriptionName), + reconcilertesting.HaveCondition(eventingv1alpha1.MakeCondition( + eventingv1alpha1.ConditionSubscriptionActive, + eventingv1alpha1.ConditionReasonNATSSubscriptionActive, + v1.ConditionTrue, ""), + ), + reconcilertesting.HaveSubsConfiguration(&eventingv1alpha1.SubscriptionConfig{ + MaxInFlightMessages: newMaxInFlight, + }), + )) + + // publish a message + connection, err := connectToNats(natsUrl) + Expect(err).ShouldNot(HaveOccurred()) + err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) + Expect(err).ShouldNot(HaveOccurred()) + + // make sure that the subscriber received the message + Eventually(func() bool { + sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) + received := string(<-result) + return sent == received + }).Should(BeTrue()) + + Expect(k8sClient.Delete(ctx, sub)).Should(BeNil()) + isSubscriptionDeleted(sub, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) + }) + }) When("Creating a Subscription with empty event type", func() { It("Should mark the subscription as not ready", func() { @@ -289,6 +291,23 @@ func fixtureNamespace(name string) *v1.Namespace { return &namespace } +func subscriptionGetter(ctx context.Context, name, namespace string) func() (*eventingv1alpha1.Subscription, error) { + return func() (*eventingv1alpha1.Subscription, error) { + lookupKey := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + subscription := &eventingv1alpha1.Subscription{} + if err := k8sClient.Get(ctx, lookupKey, subscription); err != nil { + log.Printf("fetch subscription %s failed: %v", lookupKey.String(), err) + return &eventingv1alpha1.Subscription{}, err + } + log.Printf("[Subscription] name:%s ns:%s status:%v", subscription.Name, subscription.Namespace, + subscription.Status) + return subscription, nil + } +} + // getSubscription fetches a subscription using the lookupKey and allows to make assertions on it func getSubscription(subscription *eventingv1alpha1.Subscription, ctx context.Context) AsyncAssertion { return Eventually(func() *eventingv1alpha1.Subscription { From 1b8a8e7fe80d994ce57841441cd6f8fe7bc942f9 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Tue, 20 Jul 2021 14:12:09 +0200 Subject: [PATCH 18/19] make sure all Eventually blocks have a timeout --- .../reconciler/subscription-nats/reconciler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 422c75a4f724..0af7a22b8c86 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -228,7 +228,7 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) received := string(<-result) return sent == received - }).Should(BeTrue()) + }, timeOut, pollingInterval).Should(BeTrue()) Expect(k8sClient.Delete(ctx, sub)).Should(BeNil()) isSubscriptionDeleted(sub, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true)) From a3fcf34de9bb3e9b623912df576a30592d72f32f Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Tue, 20 Jul 2021 14:50:28 +0200 Subject: [PATCH 19/19] publish and subscribe in an Eventually-block --- .../subscription-nats/reconciler_test.go | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go index 0af7a22b8c86..ebf0c9f00386 100644 --- a/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go +++ b/components/eventing-controller/reconciler/subscription-nats/reconciler_test.go @@ -43,7 +43,7 @@ const ( smallPollingInterval = 1 * time.Second timeOut = 60 * time.Second - pollingInterval = 3 * time.Second + pollingInterval = 5 * time.Second ) var _ = Describe("NATS Subscription Reconciliation Tests", func() { @@ -217,18 +217,23 @@ var _ = Describe("NATS Subscription Reconciliation Tests", func() { }), )) - // publish a message connection, err := connectToNats(natsUrl) Expect(err).ShouldNot(HaveOccurred()) - err = connection.Publish(reconcilertesting.OrderCreatedEventType, []byte(reconcilertesting.StructuredCloudEvent)) - Expect(err).ShouldNot(HaveOccurred()) - - // make sure that the subscriber received the message - Eventually(func() bool { - sent := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) - received := string(<-result) - return sent == received - }, timeOut, pollingInterval).Should(BeTrue()) + toSend := fmt.Sprintf(`"%s"`, reconcilertesting.EventData) + msgData := []byte(reconcilertesting.StructuredCloudEvent) + Eventually(func() (string, error) { + // publish the message + if err = connection.Publish(reconcilertesting.OrderCreatedEventType, msgData); err != nil { + return "", err + } + // make sure that the subscriber received the message + select { + case received := <-result: + return string(received), nil + case <-time.After(smallPollingInterval): + return "", fmt.Errorf("no message received") + } + }, timeOut, pollingInterval).Should(Equal(toSend)) Expect(k8sClient.Delete(ctx, sub)).Should(BeNil()) isSubscriptionDeleted(sub, ctx).Should(reconcilertesting.HaveNotFoundSubscription(true))