From d88230b26813f4f9cdd229f88d1f90255386753a Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 1 Sep 2020 12:40:36 +0200 Subject: [PATCH] Add Reconciler Signed-off-by: Pierangelo Di Pilato --- .../cmd/kafka-broker-controller/main.go | 3 +- .../eventing/v1alpha1/kafka_sink_lifecycle.go | 15 ++ .../pkg/reconciler/base/broker/broker_base.go | 20 ++ .../reconciler/base/broker/condition_set.go | 167 ++++++++++++++++ .../pkg/reconciler/base/broker/config.go | 16 ++ .../pkg/reconciler/base/reconciler.go | 13 +- control-plane/pkg/reconciler/broker/broker.go | 60 +++--- .../pkg/reconciler/broker/broker_config.go | 2 +- .../pkg/reconciler/broker/broker_lifecycle.go | 180 ----------------- .../pkg/reconciler/broker/broker_test.go | 51 ++--- .../pkg/reconciler/broker/controller.go | 5 +- .../pkg/reconciler/broker/controller_test.go | 4 +- .../reconciler/broker/reconciler_config.go | 17 +- control-plane/pkg/reconciler/broker/topic.go | 14 +- .../pkg/reconciler/broker/topic_test.go | 3 +- .../pkg/reconciler/sink/controller.go | 49 +++++ .../pkg/reconciler/sink/kafka_sink.go | 182 ++++++++++++++++++ .../pkg/reconciler/testing/factory.go | 3 +- .../pkg/reconciler/testing/objects.go | 21 +- .../pkg/reconciler/trigger/controller.go | 4 +- .../pkg/reconciler/trigger/controller_test.go | 4 +- .../pkg/reconciler/trigger/trigger.go | 11 +- .../reconciler/trigger/trigger_lifecycle.go | 4 +- .../pkg/reconciler/trigger/trigger_test.go | 5 +- test/cmd/watch-cm/main.go | 4 +- 25 files changed, 572 insertions(+), 285 deletions(-) create mode 100644 control-plane/pkg/reconciler/base/broker/broker_base.go create mode 100644 control-plane/pkg/reconciler/base/broker/condition_set.go create mode 100644 control-plane/pkg/reconciler/base/broker/config.go delete mode 100644 control-plane/pkg/reconciler/broker/broker_lifecycle.go create mode 100644 control-plane/pkg/reconciler/sink/controller.go create mode 100644 control-plane/pkg/reconciler/sink/kafka_sink.go diff --git a/control-plane/cmd/kafka-broker-controller/main.go b/control-plane/cmd/kafka-broker-controller/main.go index 705394b35e..8f8af3ade1 100644 --- a/control-plane/cmd/kafka-broker-controller/main.go +++ b/control-plane/cmd/kafka-broker-controller/main.go @@ -25,6 +25,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection/sharedmain" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger" ) @@ -34,7 +35,7 @@ const ( ) func main() { - brokerEnvConfigs := broker.EnvConfigs{} + brokerEnvConfigs := brokerbase.EnvConfigs{} if err := envconfig.Process("", &brokerEnvConfigs); err != nil { log.Fatal("cannot process environment variables", err) diff --git a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go index 8e888bd618..217b2fbdbd 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go @@ -33,3 +33,18 @@ var conditionSet = apis.NewLivingConditionSet( func (ks *KafkaSink) GetConditionSet() apis.ConditionSet { return conditionSet } + +func (ks *KafkaSinkStatus) GetConditionSet() apis.ConditionSet { + return conditionSet +} + +// SetAddress makes this Kafka Sink addressable by setting the URI. It also +// sets the ConditionAddressable to true. +func (ks *KafkaSinkStatus) SetAddress(url *apis.URL) { + ks.Address.URL = url + if url != nil { + ks.GetConditionSet().Manage(ks).MarkTrue(ConditionAddressable) + } else { + ks.GetConditionSet().Manage(ks).MarkFalse(ConditionAddressable, "nil URL", "URL is nil") + } +} diff --git a/control-plane/pkg/reconciler/base/broker/broker_base.go b/control-plane/pkg/reconciler/base/broker/broker_base.go new file mode 100644 index 0000000000..6d0a103672 --- /dev/null +++ b/control-plane/pkg/reconciler/base/broker/broker_base.go @@ -0,0 +1,20 @@ +package broker + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func PathFromObject(obj metav1.Object) string { + return Path(obj.GetNamespace(), obj.GetName()) +} + +func Path(namespace, name string) string { + return fmt.Sprintf("/%s/%s", namespace, name) +} + +func Topic(prefix string, obj metav1.Object) string { + return fmt.Sprintf("%s%s-%s", prefix, obj.GetNamespace(), obj.GetName()) +} + diff --git a/control-plane/pkg/reconciler/base/broker/condition_set.go b/control-plane/pkg/reconciler/base/broker/condition_set.go new file mode 100644 index 0000000000..f4be028034 --- /dev/null +++ b/control-plane/pkg/reconciler/base/broker/condition_set.go @@ -0,0 +1,167 @@ +package broker + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/reconciler" +) + +const ( + ConditionAddressable apis.ConditionType = "Addressable" + ConditionTopicReady apis.ConditionType = "TopicReady" + ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated" + ConditionConfigParsed apis.ConditionType = "ConfigParsed" +) + +var ConditionSet = apis.NewLivingConditionSet( + ConditionAddressable, + ConditionTopicReady, + ConditionConfigMapUpdated, + ConditionConfigParsed, +) + +type Object interface { + duckv1.KRShaped + runtime.Object +} + +type StatusConditionManager struct { + Object Object + + SetAddress func(u *apis.URL) + + Configs *EnvConfigs + + Recorder record.EventRecorder +} + +func (manager *StatusConditionManager) FailedToGetBrokersTriggersConfigMap(err error) reconciler.Event { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( + ConditionConfigMapUpdated, + fmt.Sprintf( + "Failed to get ConfigMap: %s", + manager.Configs.DataPlaneConfigMapAsString(), + ), + "%v", + err, + ) + + return fmt.Errorf("failed to get brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) +} + +func (manager *StatusConditionManager) FailedToGetBrokersTriggersDataFromConfigMap(err error) reconciler.Event { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( + ConditionConfigMapUpdated, + fmt.Sprintf( + "Failed to get brokers and trigger data from ConfigMap: %s", + manager.Configs.DataPlaneConfigMapAsString(), + ), + "%v", + err, + ) + + return fmt.Errorf("failed to get broker and triggers data from config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) +} + +func (manager *StatusConditionManager) FailedToUpdateBrokersTriggersConfigMap(err error) reconciler.Event { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( + ConditionConfigMapUpdated, + fmt.Sprintf("Failed to update ConfigMap: %s", manager.Configs.DataPlaneConfigMapAsString()), + "%s", + err, + ) + + return fmt.Errorf("failed to update brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) +} + +func (manager *StatusConditionManager) BrokersTriggersConfigMapUpdated() { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason( + ConditionConfigMapUpdated, + fmt.Sprintf("Config map %s updated", manager.Configs.DataPlaneConfigMapAsString()), + "", + ) +} + +func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err error) reconciler.Event { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( + ConditionTopicReady, + fmt.Sprintf("Failed to create topic: %s", topic), + "%v", + err, + ) + + return fmt.Errorf("failed to create topic: %s: %w", topic, err) +} + +func (manager *StatusConditionManager) TopicCreated(topic string) { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason( + ConditionTopicReady, + fmt.Sprintf("Topic %s created", topic), + "", + ) +} + +func (manager *StatusConditionManager) Reconciled() reconciler.Event { + + object := manager.Object + + manager.SetAddress(&apis.URL{ + Scheme: "http", + Host: names.ServiceHostName(manager.Configs.BrokerIngressName, manager.Configs.SystemNamespace), + Path: fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName()), + }) + object.GetConditionSet().Manage(object.GetStatus()).MarkTrue(ConditionAddressable) + + return nil +} + +func (manager *StatusConditionManager) FailedToUpdateDispatcherPodsAnnotation(err error) { + + // We don't set status conditions for dispatcher pods updates. + + // Record the event. + manager.Recorder.Eventf( + manager.Object, + corev1.EventTypeWarning, + "failed to update dispatcher pods annotation", + "%v", + err, + ) +} + +func (manager *StatusConditionManager) FailedToUpdateReceiverPodsAnnotation(err error) reconciler.Event { + + return fmt.Errorf("failed to update receiver pods annotation: %w", err) +} + +func (manager *StatusConditionManager) FailedToGetBrokerConfig(err error) reconciler.Event { + + return fmt.Errorf("failed to get broker configuration: %w", err) +} + +func (manager *StatusConditionManager) FailedToResolveBrokerConfig(err error) reconciler.Event { + + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( + ConditionConfigParsed, + fmt.Sprintf("%v", err), + "", + ) + + return fmt.Errorf("failed to get broker configuration: %w", err) +} + +func (manager *StatusConditionManager) BrokerConfigResolved() { + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrue(ConditionConfigParsed) +} diff --git a/control-plane/pkg/reconciler/base/broker/config.go b/control-plane/pkg/reconciler/base/broker/config.go new file mode 100644 index 0000000000..838672e8bb --- /dev/null +++ b/control-plane/pkg/reconciler/base/broker/config.go @@ -0,0 +1,16 @@ +package broker + +import "fmt" + +type EnvConfigs struct { + DataPlaneConfigMapNamespace string `required:"true" split_words:"true"` + DataPlaneConfigMapName string `required:"true" split_words:"true"` + GeneralConfigMapName string `required:"true" split_words:"true"` + BrokerIngressName string `required:"true" split_words:"true"` + SystemNamespace string `required:"true" split_words:"true"` + DataPlaneConfigFormat string `required:"true" split_words:"true"` +} + +func (c *EnvConfigs) DataPlaneConfigMapAsString() string { + return fmt.Sprintf("%s/%s", c.DataPlaneConfigMapNamespace, c.DataPlaneConfigMapName) +} diff --git a/control-plane/pkg/reconciler/base/reconciler.go b/control-plane/pkg/reconciler/base/reconciler.go index b27d14176e..a2736481a2 100644 --- a/control-plane/pkg/reconciler/base/reconciler.go +++ b/control-plane/pkg/reconciler/base/reconciler.go @@ -25,9 +25,11 @@ const ( ConfigMapDataKey = "data" // label for selecting dispatcher pods. - DispatcherLabel = "kafka-broker-dispatcher" + BrokerDispatcherLabel = "kafka-broker-dispatcher" // label for selecting receiver pods. - ReceiverLabel = "kafka-broker-receiver" + BrokerReceiverLabel = "kafka-broker-receiver" + // label for selecting receiver pods. + SinkReceiverLabel = "kafka-sink-receiver" // volume generation annotation data plane pods. VolumeGenerationAnnotationKey = "volumeGeneration" @@ -46,6 +48,9 @@ type Reconciler struct { DataPlaneConfigMapName string DataPlaneConfigFormat string SystemNamespace string + + DispatcherLabel string + ReceiverLabel string } func (r *Reconciler) GetOrCreateDataPlaneConfigMap() (*corev1.ConfigMap, error) { @@ -151,7 +156,7 @@ func (r *Reconciler) UpdateDispatcherPodsAnnotation(logger *zap.Logger, volumeGe return retry.RetryOnConflict(retry.DefaultRetry, func() error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel}) + labelSelector := labels.SelectorFromSet(map[string]string{"app": r.DispatcherLabel}) pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) if errors != nil { return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors) @@ -165,7 +170,7 @@ func (r *Reconciler) UpdateReceiverPodsAnnotation(logger *zap.Logger, volumeGene return retry.RetryOnConflict(retry.DefaultRetry, func() error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel}) + labelSelector := labels.SelectorFromSet(map[string]string{"app": r.ReceiverLabel}) pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) if errors != nil { return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 0f03180d31..09a812f11d 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -26,6 +26,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/retry" eventing "knative.dev/eventing/pkg/apis/eventing/v1" @@ -37,6 +38,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/log" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) const ( @@ -60,11 +62,14 @@ type Reconciler struct { // NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. - NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + NewClusterAdmin NewClusterAdminFunc Configs *Configs } +// NewClusterAdmin creates new sarama ClusterAdmin. +type NewClusterAdminFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { return retry.RetryOnConflict(retry.DefaultRetry, func() error { return r.reconcileKind(ctx, broker) @@ -75,32 +80,33 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger := log.Logger(ctx, "reconcile", broker) - statusConditionManager := statusConditionManager{ - Broker: broker, - configs: r.Configs, - recorder: controller.GetEventRecorder(ctx), + statusConditionManager := brokerbase.StatusConditionManager{ + Object: broker, + SetAddress: broker.Status.SetAddress, + Configs: &r.Configs.EnvConfigs, + Recorder: controller.GetEventRecorder(ctx), } config, err := r.resolveBrokerConfig(logger, broker) if err != nil { - return statusConditionManager.failedToResolveBrokerConfig(err) + return statusConditionManager.FailedToResolveBrokerConfig(err) } - statusConditionManager.brokerConfigResolved() + statusConditionManager.BrokerConfigResolved() logger.Debug("config resolved", zap.Any("config", config)) - topic, err := r.CreateTopic(logger, Topic(broker), config) + topic, err := r.CreateTopic(logger, brokerbase.Topic(TopicPrefix, broker), config) if err != nil { - return statusConditionManager.failedToCreateTopic(topic, err) + return statusConditionManager.FailedToCreateTopic(topic, err) } - statusConditionManager.topicCreated(topic) + statusConditionManager.TopicCreated(topic) logger.Debug("Topic created", zap.Any("topic", topic)) // Get brokers and triggers config map. brokersTriggersConfigMap, err := r.GetOrCreateDataPlaneConfigMap() if err != nil { - return statusConditionManager.failedToGetBrokersTriggersConfigMap(err) + return statusConditionManager.FailedToGetBrokersTriggersConfigMap(err) } logger.Debug("Got brokers and triggers config map") @@ -108,7 +114,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) // Get brokersTriggers data. brokersTriggers, err := r.GetDataPlaneConfigMapData(logger, brokersTriggersConfigMap) if err != nil && brokersTriggers == nil { - return statusConditionManager.failedToGetBrokersTriggersDataFromConfigMap(err) + return statusConditionManager.FailedToGetBrokersTriggersDataFromConfigMap(err) } logger.Debug( @@ -116,12 +122,12 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := FindBroker(brokersTriggers, broker) + brokerIndex := FindBroker(brokersTriggers, broker.UID) // Get broker configuration. brokerConfig, err := r.getBrokerConfig(topic, broker, config) if err != nil { - return statusConditionManager.failedToGetBrokerConfig(err) + return statusConditionManager.FailedToGetBrokerConfig(err) } // Update brokersTriggers data with the new broker configuration if brokerIndex != NoBroker { @@ -143,7 +149,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { return err } - statusConditionManager.brokersTriggersConfigMapUpdated() + statusConditionManager.BrokersTriggersConfigMapUpdated() logger.Debug("Brokers and triggers config map updated") @@ -170,12 +176,12 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) zap.Error(err), ) - statusConditionManager.failedToUpdateDispatcherPodsAnnotation(err) + statusConditionManager.FailedToUpdateDispatcherPodsAnnotation(err) } else { logger.Debug("Updated dispatcher pod annotation") } - return statusConditionManager.reconciled() + return statusConditionManager.Reconciled() } func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -207,7 +213,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := FindBroker(brokersTriggers, broker) + brokerIndex := FindBroker(brokersTriggers, broker.UID) if brokerIndex != NoBroker { deleteBroker(brokersTriggers, brokerIndex) @@ -230,7 +236,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) } bootstrapServers := config.BootstrapServers - topic, err := r.deleteTopic(Topic(broker), bootstrapServers) + topic, err := r.deleteTopic(brokerbase.Topic(TopicPrefix, broker), bootstrapServers) if err != nil { return fmt.Errorf("failed to delete topic %s: %w", topic, err) } @@ -346,7 +352,7 @@ func (r *Reconciler) SetBootstrapServers(servers string) { return } - addrs := bootstrapServersArray(servers) + addrs := BootstrapServersArray(servers) r.bootstrapServersLock.Lock() r.bootstrapServers = addrs @@ -354,10 +360,16 @@ func (r *Reconciler) SetBootstrapServers(servers string) { } func (r *Reconciler) getKafkaClusterAdmin(bootstrapServers []string) (sarama.ClusterAdmin, error) { + return GetClusterAdmin(r.NewClusterAdmin, bootstrapServers) +} + +// GetClusterAdmin create a new sarama.ClusterAdmin. +// The caller is responsible for closing the sarama.ClusterAdmin. +func GetClusterAdmin(adminFunc NewClusterAdminFunc, bootstrapServers []string) (sarama.ClusterAdmin, error) { config := sarama.NewConfig() config.Version = sarama.MaxVersion - kafkaClusterAdmin, err := r.NewClusterAdmin(bootstrapServers, config) + kafkaClusterAdmin, err := adminFunc(bootstrapServers, config) if err != nil { return nil, fmt.Errorf("failed to create cluster admin: %w", err) } @@ -372,11 +384,11 @@ func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) { r.KafkaDefaultTopicDetails = topicDetail } -func FindBroker(brokersTriggers *coreconfig.Brokers, broker *eventing.Broker) int { +func FindBroker(brokersTriggers *coreconfig.Brokers, broker types.UID) int { // Find broker in brokersTriggers. brokerIndex := NoBroker for i, b := range brokersTriggers.Brokers { - if b.Id == string(broker.UID) { + if b.Id == string(broker) { brokerIndex = i break } @@ -409,7 +421,7 @@ func (r *Reconciler) getDefaultBootstrapServersOrFail() ([]string, error) { return r.bootstrapServers, nil } -func bootstrapServersArray(bootstrapServers string) []string { +func BootstrapServersArray(bootstrapServers string) []string { return strings.Split(bootstrapServers, ",") } diff --git a/control-plane/pkg/reconciler/broker/broker_config.go b/control-plane/pkg/reconciler/broker/broker_config.go index ff744ae991..1050240f13 100644 --- a/control-plane/pkg/reconciler/broker/broker_config.go +++ b/control-plane/pkg/reconciler/broker/broker_config.go @@ -59,7 +59,7 @@ func configFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*Config, err config := &Config{ TopicDetail: topicDetail, - BootstrapServers: bootstrapServersArray(bootstrapServers), + BootstrapServers: BootstrapServersArray(bootstrapServers), } logger.Debug("got broker config from config map", zap.Any("config", config)) diff --git a/control-plane/pkg/reconciler/broker/broker_lifecycle.go b/control-plane/pkg/reconciler/broker/broker_lifecycle.go deleted file mode 100644 index 82c3664515..0000000000 --- a/control-plane/pkg/reconciler/broker/broker_lifecycle.go +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package broker - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/record" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" - "knative.dev/eventing/pkg/reconciler/names" - "knative.dev/pkg/apis" - "knative.dev/pkg/reconciler" -) - -const ( - ConditionAddressable apis.ConditionType = "Addressable" - ConditionTopicReady apis.ConditionType = "TopicReady" - ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated" - ConditionConfigParsed apis.ConditionType = "ConfigParsed" -) - -var ConditionSet = apis.NewLivingConditionSet( - ConditionAddressable, - ConditionTopicReady, - ConditionConfigMapUpdated, - ConditionConfigParsed, -) - -const ( - Broker = "Broker" - Reconciled = Broker + "Reconciled" -) - -type statusConditionManager struct { - Broker *eventing.Broker - - configs *Configs - - recorder record.EventRecorder -} - -func (manager *statusConditionManager) failedToGetBrokersTriggersConfigMap(err error) reconciler.Event { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( - ConditionConfigMapUpdated, - fmt.Sprintf( - "Failed to get ConfigMap: %s", - manager.configs.DataPlaneConfigMapAsString(), - ), - "%v", - err, - ) - - return fmt.Errorf("failed to get brokers and triggers config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) -} - -func (manager *statusConditionManager) failedToGetBrokersTriggersDataFromConfigMap(err error) reconciler.Event { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( - ConditionConfigMapUpdated, - fmt.Sprintf( - "Failed to get brokers and trigger data from ConfigMap: %s", - manager.configs.DataPlaneConfigMapAsString(), - ), - "%v", - err, - ) - - return fmt.Errorf("failed to get broker and triggers data from config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) -} - -func (manager *statusConditionManager) failedToUpdateBrokersTriggersConfigMap(err error) reconciler.Event { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( - ConditionConfigMapUpdated, - fmt.Sprintf("Failed to update ConfigMap: %s", manager.configs.DataPlaneConfigMapAsString()), - "%s", - err, - ) - - return fmt.Errorf("failed to update brokers and triggers config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) -} - -func (manager *statusConditionManager) brokersTriggersConfigMapUpdated() { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrueWithReason( - ConditionConfigMapUpdated, - fmt.Sprintf("Config map %s updated", manager.configs.DataPlaneConfigMapAsString()), - "", - ) -} - -func (manager *statusConditionManager) failedToCreateTopic(topic string, err error) reconciler.Event { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( - ConditionTopicReady, - fmt.Sprintf("Failed to create topic: %s", topic), - "%v", - err, - ) - - return fmt.Errorf("failed to create topic: %s: %w", topic, err) -} - -func (manager *statusConditionManager) topicCreated(topic string) { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrueWithReason( - ConditionTopicReady, - fmt.Sprintf("Topic %s created", topic), - "", - ) -} - -func (manager *statusConditionManager) reconciled() reconciler.Event { - - broker := manager.Broker - - broker.Status.Address.URL = &apis.URL{ - Scheme: "http", - Host: names.ServiceHostName(manager.configs.BrokerIngressName, manager.configs.SystemNamespace), - Path: fmt.Sprintf("/%s/%s", broker.Namespace, broker.Name), - } - broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable) - - return nil -} - -func (manager *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err error) { - - // We don't set status conditions for dispatcher pods updates. - - // Record the event. - manager.recorder.Eventf( - manager.Broker, - corev1.EventTypeWarning, - "failed to update dispatcher pods annotation", - "%v", - err, - ) -} - -func (manager *statusConditionManager) failedToUpdateReceiverPodsAnnotation(err error) reconciler.Event { - - return fmt.Errorf("failed to update receiver pods annotation: %w", err) -} - -func (manager *statusConditionManager) failedToGetBrokerConfig(err error) reconciler.Event { - - return fmt.Errorf("failed to get broker configuration: %w", err) -} - -func (manager *statusConditionManager) failedToResolveBrokerConfig(err error) reconciler.Event { - - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( - ConditionConfigParsed, - fmt.Sprintf("%v", err), - "", - ) - - return fmt.Errorf("failed to get broker configuration: %w", err) -} - -func (manager *statusConditionManager) brokerConfigResolved() { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrue(ConditionConfigParsed) -} diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 08d7949f37..c8ca128ada 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -47,6 +47,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" @@ -76,7 +77,7 @@ var ( ) func TestBrokerReconciler(t *testing.T) { - eventing.RegisterAlternateBrokerConditionSet(ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) t.Parallel() @@ -117,7 +118,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -174,7 +175,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -270,7 +271,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -323,7 +324,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -364,7 +365,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44a", @@ -392,7 +393,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44a", @@ -402,7 +403,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -487,7 +488,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://www.my-sink.com/api", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -542,13 +543,13 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, }, &configs), @@ -571,12 +572,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -620,12 +621,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -649,12 +650,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -701,12 +702,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -812,7 +813,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -975,7 +976,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: TriggerUUID + "b", }, }, - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1018,7 +1019,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { }, }, BootstrapServers: bootstrapServers, - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 2, @@ -1081,7 +1082,7 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1110,7 +1111,7 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1343,6 +1344,8 @@ func useTable(t *testing.T, table TableTest, configs *Configs) { DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, KafkaDefaultTopicDetails: defaultTopicDetail, KafkaDefaultTopicDetailsLock: sync.RWMutex{}, diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index ca8e94253c..c8d8a3b050 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -37,6 +37,7 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -51,7 +52,7 @@ const ( func NewController(ctx context.Context, watcher configmap.Watcher, configs *Configs) *controller.Impl { - eventing.RegisterAlternateBrokerConditionSet(ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) configmapInformer := configmapinformer.Get(ctx) @@ -63,6 +64,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *Conf DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, NewClusterAdmin: sarama.NewClusterAdmin, KafkaDefaultTopicDetails: sarama.TopicDetail{ diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index cd827ffbf3..b8eae45d63 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -32,13 +32,15 @@ import ( "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" + + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) func TestNewController(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) configs := &Configs{ - EnvConfigs: EnvConfigs{ + EnvConfigs: brokerbase.EnvConfigs{ SystemNamespace: "cm", GeneralConfigMapName: "cm", }, diff --git a/control-plane/pkg/reconciler/broker/reconciler_config.go b/control-plane/pkg/reconciler/broker/reconciler_config.go index a1d784507e..04544a3e2e 100644 --- a/control-plane/pkg/reconciler/broker/reconciler_config.go +++ b/control-plane/pkg/reconciler/broker/reconciler_config.go @@ -17,24 +17,11 @@ package broker import ( - "fmt" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) type Configs struct { - EnvConfigs + brokerbase.EnvConfigs BootstrapServers string } - -type EnvConfigs struct { - DataPlaneConfigMapNamespace string `required:"true" split_words:"true"` - DataPlaneConfigMapName string `required:"true" split_words:"true"` - GeneralConfigMapName string `required:"true" split_words:"true"` - BrokerIngressName string `required:"true" split_words:"true"` - SystemNamespace string `required:"true" split_words:"true"` - DataPlaneConfigFormat string `required:"true" split_words:"true"` -} - -func (c *EnvConfigs) DataPlaneConfigMapAsString() string { - return fmt.Sprintf("%s/%s", c.DataPlaneConfigMapNamespace, c.DataPlaneConfigMapName) -} diff --git a/control-plane/pkg/reconciler/broker/topic.go b/control-plane/pkg/reconciler/broker/topic.go index 2a3438c18b..499799d306 100644 --- a/control-plane/pkg/reconciler/broker/topic.go +++ b/control-plane/pkg/reconciler/broker/topic.go @@ -17,11 +17,8 @@ package broker import ( - "fmt" - "github.com/Shopify/sarama" "go.uber.org/zap" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" ) func (r *Reconciler) CreateTopic(logger *zap.Logger, topic string, config *Config) (string, error) { @@ -32,6 +29,11 @@ func (r *Reconciler) CreateTopic(logger *zap.Logger, topic string, config *Confi } defer kafkaClusterAdmin.Close() + return CreateTopic(kafkaClusterAdmin, logger, topic, config) +} + +func CreateTopic(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *Config) (string, error) { + topicDetail := &sarama.TopicDetail{ NumPartitions: config.TopicDetail.NumPartitions, ReplicationFactor: config.TopicDetail.ReplicationFactor, @@ -43,7 +45,7 @@ func (r *Reconciler) CreateTopic(logger *zap.Logger, topic string, config *Confi zap.Int32("numPartitions", topicDetail.NumPartitions), ) - createTopicError := kafkaClusterAdmin.CreateTopic(topic, topicDetail, false) + createTopicError := admin.CreateTopic(topic, topicDetail, false) if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists { return topic, nil } @@ -68,7 +70,3 @@ func (r *Reconciler) deleteTopic(topic string, bootstrapServers []string) (strin return topic, nil } - -func Topic(broker *eventing.Broker) string { - return fmt.Sprintf("%s%s-%s", TopicPrefix, broker.Namespace, broker.Name) -} diff --git a/control-plane/pkg/reconciler/broker/topic_test.go b/control-plane/pkg/reconciler/broker/topic_test.go index 51eea319e8..1d7da3bd8e 100644 --- a/control-plane/pkg/reconciler/broker/topic_test.go +++ b/control-plane/pkg/reconciler/broker/topic_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventing "knative.dev/eventing/pkg/apis/eventing/v1" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" reconcilertesting "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" ) @@ -37,7 +38,7 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) { Namespace: "bnamespace", }, } - topic := broker.Topic(b) + topic := brokerbase.Topic(broker.TopicPrefix, b) errMsg := "topic already exists" r := broker.Reconciler{ diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go new file mode 100644 index 0000000000..bbdf39c589 --- /dev/null +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -0,0 +1,49 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sink + +import ( + "context" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + + sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/eventing/v1alpha1/kafkasink" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" +) + +func NewController(ctx context.Context, _ configmap.Watcher, configs *brokerbase.EnvConfigs) *controller.Impl { + + reconciler := &Reconciler{ + Reconciler: &base.Reconciler{ + KubeClient: kubeclient.Get(ctx), + PodLister: podinformer.Get(ctx).Lister(), + DataPlaneConfigMapNamespace: configs.DataPlaneConfigMapNamespace, + DataPlaneConfigMapName: configs.DataPlaneConfigMapName, + DataPlaneConfigFormat: configs.DataPlaneConfigFormat, + SystemNamespace: configs.SystemNamespace, + ReceiverLabel: base.SinkReceiverLabel, + }, + } + + impl := sinkreconciler.NewImpl(ctx, reconciler) + + return impl +} diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go new file mode 100644 index 0000000000..e2bf2c2003 --- /dev/null +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -0,0 +1,182 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sink + +import ( + "context" + "math" + + "github.com/Shopify/sarama" + "go.uber.org/zap" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/retry" + "knative.dev/pkg/controller" + "knative.dev/pkg/reconciler" + + eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" + coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" + "knative.dev/eventing-kafka-broker/control-plane/pkg/log" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" +) + +const ( + // topic prefix - (topic name: knative-sink--) + TopicPrefix = "knative-sink-" +) + +type Reconciler struct { + *base.Reconciler + + ConfigMapLister corelisters.ConfigMapLister + + // NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can + // mock the function used during the reconciliation loop. + NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + + Configs *brokerbase.EnvConfigs +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.reconcileKind(ctx, ks) + }) +} + +func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) error { + + logger := log.Logger(ctx, "reconcile", ks) + + config := &broker.Config{ + TopicDetail: sarama.TopicDetail{ + NumPartitions: ks.Spec.NumPartitions, + ReplicationFactor: ks.Spec.ReplicationFactor, + }, + BootstrapServers: broker.BootstrapServersArray(ks.Spec.BootstrapServers), + } + + statusConditionManager := brokerbase.StatusConditionManager{ + Object: ks, + SetAddress: ks.Status.SetAddress, + Configs: r.Configs, + Recorder: controller.GetEventRecorder(ctx), + } + + kafkaClusterAdmin, err := broker.GetClusterAdmin(r.NewClusterAdmin, config.BootstrapServers) + if err != nil { + return statusConditionManager.FailedToCreateTopic("", err) + } + defer kafkaClusterAdmin.Close() + + topic, err := broker.CreateTopic(kafkaClusterAdmin, logger, brokerbase.Topic(TopicPrefix, ks), config) + if err != nil { + return statusConditionManager.FailedToCreateTopic(topic, err) + } + statusConditionManager.TopicCreated(topic) + + logger.Debug("Topic created", zap.Any("topic", topic)) + + // Get brokers and triggers config map. + brokersTriggersConfigMap, err := r.GetOrCreateDataPlaneConfigMap() + if err != nil { + return statusConditionManager.FailedToGetBrokersTriggersConfigMap(err) + } + + logger.Debug("Got brokers and triggers config map") + + // Get brokersTriggers data. + brokersTriggers, err := r.GetDataPlaneConfigMapData(logger, brokersTriggersConfigMap) + if err != nil && brokersTriggers == nil { + return statusConditionManager.FailedToGetBrokersTriggersDataFromConfigMap(err) + } + if brokersTriggers == nil { + brokersTriggers = &coreconfig.Brokers{} + } + + logger.Debug( + "Got brokers and triggers data from config map", + zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), + ) + + brokerIndex := broker.FindBroker(brokersTriggers, ks.UID) + + // Get broker configuration. + brokerConfig := &coreconfig.Broker{ + Id: string(ks.UID), + Topic: topic, + Path: broker.Path(ks.GetNamespace(), ks.GetName()), + BootstrapServers: ks.Spec.BootstrapServers, + } + + // Update brokersTriggers data with the new broker configuration + if brokerIndex != broker.NoBroker { + brokerConfig.Triggers = brokersTriggers.Brokers[brokerIndex].Triggers + brokersTriggers.Brokers[brokerIndex] = brokerConfig + + logger.Debug("Broker exists", zap.Int("index", brokerIndex)) + + } else { + brokersTriggers.Brokers = append(brokersTriggers.Brokers, brokerConfig) + + logger.Debug("Broker doesn't exist") + } + + // Increment volumeGeneration + brokersTriggers.VolumeGeneration = incrementVolumeGeneration(brokersTriggers.VolumeGeneration) + + // Update the configuration map with the new brokersTriggers data. + if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { + return err + } + statusConditionManager.BrokersTriggersConfigMapUpdated() + + logger.Debug("Brokers and triggers config map updated") + + // After #37 we reject events to a non-existing Broker, which means that we cannot consider a Broker Ready if all + // receivers haven't got the Broker, so update failures to receiver pods is a hard failure. + // On the other side, dispatcher pods care about Triggers, and the Broker object is used as a configuration + // prototype for all associated Triggers, so we consider that it's fine on the dispatcher side to receive eventually + // the update even if here eventually means seconds or minutes after the actual update. + + // Update volume generation annotation of receiver pods + if err := r.UpdateReceiverPodsAnnotation(logger, brokersTriggers.VolumeGeneration); err != nil { + return err + } + + logger.Debug("Updated receiver pod annotation") + + return statusConditionManager.Reconciled() +} + +func (r *Reconciler) FinalizeKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.finalizeKind(ctx, ks) + }) +} + +func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) error { + + // logger := log.Logger(ctx, "finalize", ks) + + // TODO implement finalizer + panic("implement me") +} + +func incrementVolumeGeneration(generation uint64) uint64 { + return (generation + 1) % (math.MaxUint64 - 1) +} diff --git a/control-plane/pkg/reconciler/testing/factory.go b/control-plane/pkg/reconciler/testing/factory.go index 90dac4c60d..a90df40c9f 100644 --- a/control-plane/pkg/reconciler/testing/factory.go +++ b/control-plane/pkg/reconciler/testing/factory.go @@ -34,6 +34,7 @@ import ( . "knative.dev/pkg/reconciler/testing" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" ) @@ -45,7 +46,7 @@ const ( var DefaultConfigs = &broker.Configs{ - EnvConfigs: broker.EnvConfigs{ + EnvConfigs: brokerbase.EnvConfigs{ DataPlaneConfigMapName: "kafka-broker-brokers-triggers", DataPlaneConfigMapNamespace: "knative-eventing", BrokerIngressName: "kafka-broker-receiver", diff --git a/control-plane/pkg/reconciler/testing/objects.go b/control-plane/pkg/reconciler/testing/objects.go index 8eb064bc15..2ac4e81804 100644 --- a/control-plane/pkg/reconciler/testing/objects.go +++ b/control-plane/pkg/reconciler/testing/objects.go @@ -36,6 +36,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -73,7 +74,7 @@ func NewDispatcherPod(namespace string, annotations map[string]string) runtime.O Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.DispatcherLabel, + "app": base.BrokerDispatcherLabel, }, }, } @@ -90,7 +91,7 @@ func NewReceiverPod(namespace string, annotations map[string]string) runtime.Obj Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.ReceiverLabel, + "app": base.BrokerReceiverLabel, }, }, } @@ -260,7 +261,7 @@ func BrokerReady(broker *eventing.Broker) { func ConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrueWithReason( - ConditionConfigMapUpdated, + brokerbase.ConditionConfigMapUpdated, fmt.Sprintf("Config map %s updated", configs.DataPlaneConfigMapAsString()), "", ) @@ -269,19 +270,19 @@ func ConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker) { func TopicReady(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrueWithReason( - ConditionTopicReady, - fmt.Sprintf("Topic %s created", Topic(broker)), + brokerbase.ConditionTopicReady, + fmt.Sprintf("Topic %s created", brokerbase.Topic(TopicPrefix, broker)), "", ) } func ConfigParsed(broker *eventing.Broker) { - broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(ConditionConfigParsed) + broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(brokerbase.ConditionConfigParsed) } func ConfigNotParsed(reason string) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { - broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(ConditionConfigParsed, reason, "") + broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(brokerbase.ConditionConfigParsed, reason, "") } } @@ -295,14 +296,14 @@ func Addressable(configs *Configs) func(broker *eventing.Broker) { Path: fmt.Sprintf("/%s/%s", broker.Namespace, broker.Name), } - broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable) + broker.GetConditionSet().Manage(&broker.Status).MarkTrue(brokerbase.ConditionAddressable) } } func FailedToCreateTopic(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse( - ConditionTopicReady, + brokerbase.ConditionTopicReady, fmt.Sprintf("Failed to create topic: %s", GetTopic()), "%v", fmt.Errorf("failed to create topic"), @@ -315,7 +316,7 @@ func FailedToGetConfigMap(configs *Configs) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse( - ConditionConfigMapUpdated, + brokerbase.ConditionConfigMapUpdated, fmt.Sprintf( "Failed to get ConfigMap: %s", configs.DataPlaneConfigMapAsString(), diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index bbea60a807..29bf2a9856 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -38,7 +38,7 @@ import ( eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -48,7 +48,7 @@ const ( FinalizerName = "kafka.triggers.eventing.knative.dev" ) -func NewController(ctx context.Context, _ configmap.Watcher, configs *broker.EnvConfigs) *controller.Impl { +func NewController(ctx context.Context, _ configmap.Watcher, configs *brokerbase.EnvConfigs) *controller.Impl { logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/trigger/controller_test.go b/control-plane/pkg/reconciler/trigger/controller_test.go index 3c3c1f901c..2548963209 100644 --- a/control-plane/pkg/reconciler/trigger/controller_test.go +++ b/control-plane/pkg/reconciler/trigger/controller_test.go @@ -31,13 +31,13 @@ import ( _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) func TestNewController(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) - controller := NewController(ctx, configmap.NewStaticWatcher(), &brokerreconciler.EnvConfigs{}) + controller := NewController(ctx, configmap.NewStaticWatcher(), &brokerbase.EnvConfigs{}) if controller == nil { t.Error("failed to create controller: ") } diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index e02bcd93f6..0965a0bfb9 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -35,6 +35,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/log" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" ) @@ -49,7 +50,7 @@ type Reconciler struct { EventingClient eventingclientset.Interface Resolver *resolver.URIResolver - Configs *brokerreconciler.EnvConfigs + Configs *brokerbase.EnvConfigs } func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { @@ -97,7 +98,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := brokerreconciler.FindBroker(brokersTriggers, broker) + brokerIndex := brokerreconciler.FindBroker(brokersTriggers, broker.UID) if brokerIndex == brokerreconciler.NoBroker { // If the broker is not there, resources associated with the Trigger are deleted accordingly. return nil @@ -206,8 +207,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge // Actually check if the broker doesn't exist. broker, err = r.EventingClient.EventingV1(). // Note: do not introduce another `broker` variable with `:` - Brokers(trigger.Namespace). - Get(trigger.Spec.Broker, metav1.GetOptions{}) + Brokers(trigger.Namespace). + Get(trigger.Spec.Broker, metav1.GetOptions{}) if apierrors.IsNotFound(err) { @@ -246,7 +247,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: dataPlaneConfig}), ) - brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker) + brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker.UID) if brokerIndex == brokerreconciler.NoBroker { return statusConditionManager.brokerNotFoundInDataPlaneConfigMap() } diff --git a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go index 9d9f968890..2b767b1e31 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go +++ b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go @@ -25,13 +25,13 @@ import ( "knative.dev/pkg/apis" "knative.dev/pkg/reconciler" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) type statusConditionManager struct { Trigger *eventing.Trigger - Configs *broker.EnvConfigs + Configs *brokerbase.EnvConfigs Recorder record.EventRecorder } diff --git a/control-plane/pkg/reconciler/trigger/trigger_test.go b/control-plane/pkg/reconciler/trigger/trigger_test.go index 7953b4b8a2..7d2c7bd4a1 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/trigger_test.go @@ -40,6 +40,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" ) @@ -60,7 +61,7 @@ var ( ) func TestTriggerReconciler(t *testing.T) { - eventing.RegisterAlternateBrokerConditionSet(broker.ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) t.Parallel() @@ -1758,6 +1759,8 @@ func useTable(t *testing.T, table TableTest, configs *broker.Configs) { DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, BrokerLister: listers.GetBrokerLister(), EventingClient: eventingclient.Get(ctx), diff --git a/test/cmd/watch-cm/main.go b/test/cmd/watch-cm/main.go index 3124531c84..48e0d04c97 100644 --- a/test/cmd/watch-cm/main.go +++ b/test/cmd/watch-cm/main.go @@ -22,13 +22,13 @@ import ( "github.com/kelseyhightower/envconfig" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" testobservability "knative.dev/eventing-kafka-broker/test/pkg/observability" ) func main() { - envConfig := &broker.EnvConfigs{} + envConfig := &brokerbase.EnvConfigs{} if err := envconfig.Process("", envConfig); err != nil { log.Fatal("failed to process env config", err)