Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Using the new source.ConfigWatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed May 8, 2020
1 parent 51c4f79 commit f856300
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 96 deletions.
13 changes: 5 additions & 8 deletions kafka/source/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@ import (
"os"

"k8s.io/client-go/tools/cache"
tracingconfig "knative.dev/pkg/tracing/config"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/source"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/resolver"

"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
Expand All @@ -59,6 +58,7 @@ func NewController(
deploymentLister: deploymentInformer.Lister(),
receiveAdapterImage: raImage,
loggingContext: ctx,
configs: source.StartWatchingSourceConfigurations(ctx, component, cmw),
}

impl := kafkasource.NewImpl(ctx, c)
Expand All @@ -73,8 +73,5 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

cmw.Watch(logging.ConfigMapName(), c.UpdateFromLoggingConfigMap)
cmw.Watch(metrics.ConfigMapName(), c.UpdateFromMetricsConfigMap)
cmw.Watch(tracingconfig.ConfigName, c.UpdateFromTracingConfigMap)
return impl
}
80 changes: 10 additions & 70 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"fmt"
"strings"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/metrics"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -35,6 +31,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing/pkg/reconciler/source"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/reconciler/source/resources"
Expand All @@ -44,7 +43,6 @@ import (
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
tracingconfig "knative.dev/pkg/tracing/config"

"knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
Expand Down Expand Up @@ -95,11 +93,10 @@ type Reconciler struct {

kafkaClientSet versioned.Interface
loggingContext context.Context
loggingConfig *logging.Config
metricsConfig *metrics.ExporterOptions
tracingCfg *tracingconfig.Config

sinkResolver *resolver.URIResolver

configs *source.ConfigWatcher
}

// Check that our Reconciler implements Interface
Expand Down Expand Up @@ -190,28 +187,12 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
return nil, err
}

loggingConfig, err := logging.LoggingConfigToJson(r.loggingConfig)
if err != nil {
logging.FromContext(ctx).Error("error while converting logging config to JSON", zap.Any("receiveAdapter", err))
}
metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig)
if err != nil {
logging.FromContext(ctx).Error("error while converting metrics config to JSON", zap.Any("receiveAdapter", err))
}

tracingCfg, err := tracingconfig.TracingConfigToJson(r.tracingCfg)
if err != nil {
logging.FromContext(ctx).Error("error while converting tracing config to json", zap.Any("receiveAdapter", err))
}

raArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Labels: resources.GetLabels(src.Name),
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
TracingConfig: tracingCfg,
SinkURI: sinkURI.String(),
Image: r.receiveAdapterImage,
Source: src,
Labels: resources.GetLabels(src.Name),
SinkURI: sinkURI.String(),
AdditionalEnvs: r.configs.ToEnvVars(),
}
expected := resources.MakeReceiveAdapter(&raArgs)

Expand Down Expand Up @@ -275,47 +256,6 @@ func (r *Reconciler) getLabelSelector(src *v1alpha1.KafkaSource) labels.Selector
return labels.SelectorFromSet(resources.GetLabels(src.Name))
}

func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) {
if cfg != nil {
delete(cfg.Data, "_example")
}

logcfg, err := logging.NewConfigFromConfigMap(cfg)
if err != nil {
logging.FromContext(r.loggingContext).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name))
return
}
r.loggingConfig = logcfg
logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg))
}

func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) {
if cfg != nil {
delete(cfg.Data, "_example")
}

r.metricsConfig = &metrics.ExporterOptions{
Domain: metrics.Domain(),
Component: component,
ConfigMap: cfg.Data,
}
logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg))
}

func (r *Reconciler) UpdateFromTracingConfigMap(cfg *corev1.ConfigMap) {
if cfg != nil {
delete(cfg.Data, "_example")
}

tracingCfg, err := tracingconfig.NewTracingConfigFromMap(cfg.Data)
if err != nil {
logging.FromContext(r.loggingContext).Warn("failed to create tracing config from configmap", zap.String("cfg.Name", cfg.Name))
return
}

r.tracingCfg = tracingCfg
}

func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics))
for i := range src.Spec.Topics {
Expand Down
25 changes: 7 additions & 18 deletions kafka/source/pkg/reconciler/source/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,17 @@ import (
)

type ReceiveAdapterArgs struct {
Image string
Source *v1alpha1.KafkaSource
Labels map[string]string
SinkURI string
MetricsConfig string
LoggingConfig string
TracingConfig string
Image string
Source *v1alpha1.KafkaSource
Labels map[string]string
SinkURI string
AdditionalEnvs []corev1.EnvVar
}

func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
replicas := int32(1)

env := []corev1.EnvVar{{
env := append([]corev1.EnvVar{{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: strings.Join(args.Source.Spec.BootstrapServers, ","),
}, {
Expand All @@ -67,16 +65,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
}, {
Name: "NAMESPACE",
Value: args.Source.Namespace,
}, {
Name: "K_LOGGING_CONFIG",
Value: args.LoggingConfig,
}, {
Name: "K_METRICS_CONFIG",
Value: args.MetricsConfig,
}, {
Name: "K_TRACING_CONFIG",
Value: args.TracingConfig,
}}
}}, args.AdditionalEnvs...)

if val, ok := args.Source.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok {
env = append(env, corev1.EnvVar{
Expand Down

0 comments on commit f856300

Please sign in to comment.