Skip to content

Commit

Permalink
Add Reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Sep 1, 2020
1 parent e14bba0 commit d88230b
Show file tree
Hide file tree
Showing 25 changed files with 572 additions and 285 deletions.
3 changes: 2 additions & 1 deletion control-plane/cmd/kafka-broker-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/sharedmain"

brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger"
)
Expand All @@ -34,7 +35,7 @@ const (
)

func main() {
brokerEnvConfigs := broker.EnvConfigs{}
brokerEnvConfigs := brokerbase.EnvConfigs{}

if err := envconfig.Process("", &brokerEnvConfigs); err != nil {
log.Fatal("cannot process environment variables", err)
Expand Down
15 changes: 15 additions & 0 deletions control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,18 @@ var conditionSet = apis.NewLivingConditionSet(
func (ks *KafkaSink) GetConditionSet() apis.ConditionSet {
return conditionSet
}

func (ks *KafkaSinkStatus) GetConditionSet() apis.ConditionSet {
return conditionSet
}

// SetAddress makes this Kafka Sink addressable by setting the URI. It also
// sets the ConditionAddressable to true.
func (ks *KafkaSinkStatus) SetAddress(url *apis.URL) {
ks.Address.URL = url
if url != nil {
ks.GetConditionSet().Manage(ks).MarkTrue(ConditionAddressable)
} else {
ks.GetConditionSet().Manage(ks).MarkFalse(ConditionAddressable, "nil URL", "URL is nil")
}
}
20 changes: 20 additions & 0 deletions control-plane/pkg/reconciler/base/broker/broker_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package broker

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func PathFromObject(obj metav1.Object) string {
return Path(obj.GetNamespace(), obj.GetName())
}

func Path(namespace, name string) string {
return fmt.Sprintf("/%s/%s", namespace, name)
}

func Topic(prefix string, obj metav1.Object) string {
return fmt.Sprintf("%s%s-%s", prefix, obj.GetNamespace(), obj.GetName())
}

167 changes: 167 additions & 0 deletions control-plane/pkg/reconciler/base/broker/condition_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package broker

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"knative.dev/eventing/pkg/reconciler/names"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/reconciler"
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionTopicReady apis.ConditionType = "TopicReady"
ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated"
ConditionConfigParsed apis.ConditionType = "ConfigParsed"
)

var ConditionSet = apis.NewLivingConditionSet(
ConditionAddressable,
ConditionTopicReady,
ConditionConfigMapUpdated,
ConditionConfigParsed,
)

type Object interface {
duckv1.KRShaped
runtime.Object
}

type StatusConditionManager struct {
Object Object

SetAddress func(u *apis.URL)

Configs *EnvConfigs

Recorder record.EventRecorder
}

func (manager *StatusConditionManager) FailedToGetBrokersTriggersConfigMap(err error) reconciler.Event {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionConfigMapUpdated,
fmt.Sprintf(
"Failed to get ConfigMap: %s",
manager.Configs.DataPlaneConfigMapAsString(),
),
"%v",
err,
)

return fmt.Errorf("failed to get brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err)
}

func (manager *StatusConditionManager) FailedToGetBrokersTriggersDataFromConfigMap(err error) reconciler.Event {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionConfigMapUpdated,
fmt.Sprintf(
"Failed to get brokers and trigger data from ConfigMap: %s",
manager.Configs.DataPlaneConfigMapAsString(),
),
"%v",
err,
)

return fmt.Errorf("failed to get broker and triggers data from config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err)
}

func (manager *StatusConditionManager) FailedToUpdateBrokersTriggersConfigMap(err error) reconciler.Event {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionConfigMapUpdated,
fmt.Sprintf("Failed to update ConfigMap: %s", manager.Configs.DataPlaneConfigMapAsString()),
"%s",
err,
)

return fmt.Errorf("failed to update brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err)
}

func (manager *StatusConditionManager) BrokersTriggersConfigMapUpdated() {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason(
ConditionConfigMapUpdated,
fmt.Sprintf("Config map %s updated", manager.Configs.DataPlaneConfigMapAsString()),
"",
)
}

func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err error) reconciler.Event {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionTopicReady,
fmt.Sprintf("Failed to create topic: %s", topic),
"%v",
err,
)

return fmt.Errorf("failed to create topic: %s: %w", topic, err)
}

func (manager *StatusConditionManager) TopicCreated(topic string) {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason(
ConditionTopicReady,
fmt.Sprintf("Topic %s created", topic),
"",
)
}

func (manager *StatusConditionManager) Reconciled() reconciler.Event {

object := manager.Object

manager.SetAddress(&apis.URL{
Scheme: "http",
Host: names.ServiceHostName(manager.Configs.BrokerIngressName, manager.Configs.SystemNamespace),
Path: fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName()),
})
object.GetConditionSet().Manage(object.GetStatus()).MarkTrue(ConditionAddressable)

return nil
}

func (manager *StatusConditionManager) FailedToUpdateDispatcherPodsAnnotation(err error) {

// We don't set status conditions for dispatcher pods updates.

// Record the event.
manager.Recorder.Eventf(
manager.Object,
corev1.EventTypeWarning,
"failed to update dispatcher pods annotation",
"%v",
err,
)
}

func (manager *StatusConditionManager) FailedToUpdateReceiverPodsAnnotation(err error) reconciler.Event {

return fmt.Errorf("failed to update receiver pods annotation: %w", err)
}

func (manager *StatusConditionManager) FailedToGetBrokerConfig(err error) reconciler.Event {

return fmt.Errorf("failed to get broker configuration: %w", err)
}

func (manager *StatusConditionManager) FailedToResolveBrokerConfig(err error) reconciler.Event {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionConfigParsed,
fmt.Sprintf("%v", err),
"",
)

return fmt.Errorf("failed to get broker configuration: %w", err)
}

func (manager *StatusConditionManager) BrokerConfigResolved() {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrue(ConditionConfigParsed)
}
16 changes: 16 additions & 0 deletions control-plane/pkg/reconciler/base/broker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package broker

import "fmt"

type EnvConfigs struct {
DataPlaneConfigMapNamespace string `required:"true" split_words:"true"`
DataPlaneConfigMapName string `required:"true" split_words:"true"`
GeneralConfigMapName string `required:"true" split_words:"true"`
BrokerIngressName string `required:"true" split_words:"true"`
SystemNamespace string `required:"true" split_words:"true"`
DataPlaneConfigFormat string `required:"true" split_words:"true"`
}

func (c *EnvConfigs) DataPlaneConfigMapAsString() string {
return fmt.Sprintf("%s/%s", c.DataPlaneConfigMapNamespace, c.DataPlaneConfigMapName)
}
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ const (
ConfigMapDataKey = "data"

// label for selecting dispatcher pods.
DispatcherLabel = "kafka-broker-dispatcher"
BrokerDispatcherLabel = "kafka-broker-dispatcher"
// label for selecting receiver pods.
ReceiverLabel = "kafka-broker-receiver"
BrokerReceiverLabel = "kafka-broker-receiver"
// label for selecting receiver pods.
SinkReceiverLabel = "kafka-sink-receiver"

// volume generation annotation data plane pods.
VolumeGenerationAnnotationKey = "volumeGeneration"
Expand All @@ -46,6 +48,9 @@ type Reconciler struct {
DataPlaneConfigMapName string
DataPlaneConfigFormat string
SystemNamespace string

DispatcherLabel string
ReceiverLabel string
}

func (r *Reconciler) GetOrCreateDataPlaneConfigMap() (*corev1.ConfigMap, error) {
Expand Down Expand Up @@ -151,7 +156,7 @@ func (r *Reconciler) UpdateDispatcherPodsAnnotation(logger *zap.Logger, volumeGe

return retry.RetryOnConflict(retry.DefaultRetry, func() error {

labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel})
labelSelector := labels.SelectorFromSet(map[string]string{"app": r.DispatcherLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors)
Expand All @@ -165,7 +170,7 @@ func (r *Reconciler) UpdateReceiverPodsAnnotation(logger *zap.Logger, volumeGene

return retry.RetryOnConflict(retry.DefaultRetry, func() error {

labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel})
labelSelector := labels.SelectorFromSet(map[string]string{"app": r.ReceiverLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors)
Expand Down
Loading

0 comments on commit d88230b

Please sign in to comment.