Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Nats queue subscriptions #11621

Merged
merged 23 commits into from Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
"github.com/mitchellh/hashstructure/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -104,6 +105,27 @@ func (bf *BebFilters) Deduplicate() (*BebFilters, error) {
return result, nil
}

type SubscriptionConfig struct {
// +optional
// +kubebuilder:validation:Minimum=1
MaxInFlightMessages int `json:"maxInFlightMessages,omitempty"`
}

// MergeSubsConfigs returns a valid subscription config object based on the provided config,
// complemented with default values, if necessary
func MergeSubsConfigs(config *SubscriptionConfig, defaults *env.DefaultSubscriptionConfig) *SubscriptionConfig {
merged := &SubscriptionConfig{
MaxInFlightMessages: defaults.MaxInFlightMessages,
}
if config == nil {
return merged
}
if config.MaxInFlightMessages >= 1 {
merged.MaxInFlightMessages = config.MaxInFlightMessages
}
return merged
}

// SubscriptionSpec defines the desired state of Subscription
type SubscriptionSpec struct {
// ID is the unique identifier of Subscription, read-only.
Expand All @@ -123,6 +145,10 @@ type SubscriptionSpec struct {

// Filter defines the list of filters
Filter *BebFilters `json:"filter"`

// Config defines the configurations that can be applied to the eventing backend when creating this subscription
// +optional
Config *SubscriptionConfig `json:"config,omitempty"`
}

type EmsSubscriptionStatus struct {
Expand Down Expand Up @@ -180,6 +206,10 @@ type SubscriptionStatus struct {
// EmsSubscriptionStatus defines the status of Subscription in BEB
// +optional
EmsSubscriptionStatus EmsSubscriptionStatus `json:"emsSubscriptionStatus,omitempty"`

// Config defines the configurations that have been applied to the eventing backend when creating this subscription
// +optional
Config *SubscriptionConfig `json:"config,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
Expand Up @@ -3,6 +3,8 @@ package v1alpha1
import (
"reflect"
"testing"

"github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
)

const (
Expand Down Expand Up @@ -85,3 +87,41 @@ func TestBebFilters_Deduplicate(t *testing.T) {
})
}
}

func TestMergeSubsConfigs(t *testing.T) {
defaultConf := &env.DefaultSubscriptionConfig{MaxInFlightMessages: 4}
tests := []struct {
caseName string
inputConf *SubscriptionConfig
inputDefaults *env.DefaultSubscriptionConfig
expectedOutput *SubscriptionConfig
}{
{
caseName: "nil input config",
inputConf: nil,
inputDefaults: defaultConf,
expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4},
},
{
caseName: "default is overridden",
inputConf: &SubscriptionConfig{MaxInFlightMessages: 10},
inputDefaults: defaultConf,
expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 10},
},
{
caseName: "provided input is invalid",
inputConf: &SubscriptionConfig{MaxInFlightMessages: 0},
inputDefaults: defaultConf,
expectedOutput: &SubscriptionConfig{MaxInFlightMessages: 4},
},
}

for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
got := MergeSubsConfigs(tt.inputConf, tt.inputDefaults)
if !reflect.DeepEqual(got, tt.expectedOutput) {
t.Errorf("MergeSubsConfigs() got = %v, want = %v", got, tt.expectedOutput)
}
})
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.0
controller-gen.kubebuilder.io/version: v0.6.0
creationTimestamp: null
name: eventingbackends.eventing.kyma-project.io
spec:
Expand Down
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.0
controller-gen.kubebuilder.io/version: v0.6.0
creationTimestamp: null
name: subscriptions.eventing.kyma-project.io
spec:
Expand Down Expand Up @@ -43,6 +43,14 @@ spec:
spec:
description: SubscriptionSpec defines the desired state of Subscription
properties:
config:
description: Config defines the configurations that can be applied
to the eventing backend when creating this subscription
properties:
maxInFlightMessages:
minimum: 1
type: integer
type: object
filter:
description: Filter defines the list of filters
properties:
Expand Down Expand Up @@ -179,6 +187,14 @@ spec:
- status
type: object
type: array
config:
description: Config defines the configurations that have been applied
to the eventing backend when creating this subscription
properties:
maxInFlightMessages:
minimum: 1
type: integer
type: object
emsSubscriptionStatus:
description: EmsSubscriptionStatus defines the status of Subscription
in BEB
Expand Down
2 changes: 1 addition & 1 deletion components/eventing-controller/pkg/commander/beb/beb.go
Expand Up @@ -82,7 +82,7 @@ func (c *Commander) Init(mgr manager.Manager) error {
}

// Start implements the Commander interface and starts the manager.
func (c *Commander) Start(params commander.Params) error {
func (c *Commander) Start(_ env.DefaultSubscriptionConfig, params commander.Params) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
dynamicClient := dynamic.NewForConfigOrDie(c.restCfg)
Expand Down
3 changes: 2 additions & 1 deletion components/eventing-controller/pkg/commander/commander.go
Expand Up @@ -3,6 +3,7 @@
package commander

import (
"github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

Expand All @@ -14,7 +15,7 @@ type Commander interface {
Init(mgr manager.Manager) error

// Start runs the initialized commander instance.
Start(params Params) error
Start(defaultSubsConfig env.DefaultSubscriptionConfig, params Params) error

// Stop tells the commander instance to shutdown and clean-up.
Stop() error
Expand Down
Expand Up @@ -2,6 +2,7 @@ package fake

import (
"github.com/kyma-project/kyma/components/eventing-controller/pkg/commander"
"github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
"github.com/kyma-project/kyma/components/eventing-controller/pkg/handlers"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -16,7 +17,7 @@ func (c *Commander) Init(mgr manager.Manager) error {
return nil
}

func (c *Commander) Start(_ commander.Params) error {
func (c *Commander) Start(_ env.DefaultSubscriptionConfig, _ commander.Params) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion components/eventing-controller/pkg/commander/nats/nats.go
Expand Up @@ -73,7 +73,7 @@ func (c *Commander) Init(mgr manager.Manager) error {
}

// Start implements the Commander interface and starts the commander.
func (c *Commander) Start(_ commander.Params) error {
func (c *Commander) Start(defaultSubsConfig env.DefaultSubscriptionConfig, _ commander.Params) error {
ctx, cancel := context.WithCancel(context.Background())

c.cancel = cancel
Expand All @@ -87,6 +87,7 @@ func (c *Commander) Start(_ commander.Params) error {
c.logger,
c.mgr.GetEventRecorderFor("eventing-controller-nats"),
c.envCfg,
defaultSubsConfig,
)
c.backend = natsReconciler.Backend
if err := natsReconciler.SetupUnmanaged(c.mgr); err != nil {
Expand Down
Expand Up @@ -56,7 +56,8 @@ func TestCleanup(t *testing.T) {
ReconnectWait: time.Second,
EventTypePrefix: controllertesting.EventTypePrefix,
}
natsBackend := handlers.NewNats(envConf, defaultLogger)
subsConfig := env.DefaultSubscriptionConfig{MaxInFlightMessages: 9}
natsBackend := handlers.NewNats(envConf, subsConfig, defaultLogger)
natsCommander.Backend = natsBackend
err = natsCommander.Backend.Initialize(env.Config{})
g.Expect(err).To(gomega.BeNil())
Expand Down
6 changes: 6 additions & 0 deletions components/eventing-controller/pkg/env/backend_config.go
Expand Up @@ -12,6 +12,8 @@ type BackendConfig struct {

BackendCRNamespace string `envconfig:"BACKEND_CR_NAMESPACE" default:"kyma-system"`
BackendCRName string `envconfig:"BACKEND_CR_NAME" default:"eventing-backend"`

DefaultSubscriptionConfig DefaultSubscriptionConfig
}

type PublisherConfig struct {
Expand All @@ -27,6 +29,10 @@ type PublisherConfig struct {
LimitsMemory string `envconfig:"PUBLISHER_LIMITS_MEMORY" default:"128Mi"`
}

type DefaultSubscriptionConfig struct {
MaxInFlightMessages int `envconfig:"DEFAULT_MAX_IN_FLIGHT_MESSAGES" default:"10"`
}

func GetBackendConfig() BackendConfig {
cfg := BackendConfig{}
if err := envconfig.Process("", &cfg); err != nil {
Expand Down