Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker Ingress Tracing #1290

Merged
merged 19 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 35 additions & 5 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,41 @@ import (
"sync"
"time"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

cloudevents "github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/kelseyhightower/envconfig"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/broker"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/tracing"
"github.com/knative/eventing/pkg/utils"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/signals"
pkgtracing "github.com/knative/pkg/tracing"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

const (
NAMESPACE = "NAMESPACE"
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
)

type envConfig struct {
Broker string `envconfig:"BROKER" required:"true"`
Channel string `envconfig:"CHANNEL" required:"true"`
Broker string `envconfig:"BROKER" required:"true"`
Channel string `envconfig:"CHANNEL" required:"true"`
Namespace string `envconfig:"NAMESPACE" required:"true"`

ZipkinServiceName string `envconfig:"ZIPKIN_SERVICE_NAME" required:"true"`
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
}

var (
Expand Down Expand Up @@ -92,7 +105,18 @@ func main() {
Path: "/",
}

ceClient, err := cloudevents.NewDefaultClient()
kc := kubernetes.NewForConfigOrDie(mgr.GetConfig())
configMapWatcher := configmap.NewInformedWatcher(kc, env.Namespace)

if err = tracing.SetupDynamicZipkinPublishing(logger.Sugar(), configMapWatcher, env.ZipkinServiceName); err != nil {
logger.Fatal("Error setting up Zipkin publishing", zap.Error(err))
}

httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(pkgtracing.HTTPSpanMiddleware))
if err != nil {
logger.Fatal("Unable to create CE transport", zap.Error(err))
}
ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
logger.Fatal("Unable to create CE client", zap.Error(err))
}
Expand Down Expand Up @@ -135,6 +159,12 @@ func main() {

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()

// configMapWatcher does not block, so start it first.
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatal("Failed to start ConfigMap watcher", zap.Error(err))
}

// Start blocks forever.
if err = mgr.Start(stopCh); err != nil {
logger.Error("manager.Start() returned an error", zap.Error(err))
Expand Down
10 changes: 9 additions & 1 deletion config/200-broker-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: eventing-broker-ingress
rules: []
rules:
- apiGroups:
- ""
resources:
- "configmaps"
verbs:
- "get"
- "list"
- "watch"
43 changes: 43 additions & 0 deletions pkg/configmap/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package configmap
Harwayne marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/knative/pkg/configmap"
v1 "k8s.io/api/core/v1"
)

// TODO Move this to knative/pkg.

// DefaultConstructors are something.
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
type DefaultConstructors map[*v1.ConfigMap]interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a map? What are the semantics of a map keyed by struct pointers?

I think this would make more sense to me as:

type DefaultConstructorFunc func() *v1.ConfigMap
type DefaultConstructors map[string]DefaultConstructorFunc

I think that would simplify this implementation since there's no need to store default configmaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is based on knative/pkg's configmap.Constructor:

type Constructors map[string]interface{}

The suggestion drops the 'constructor' portion of this map. Essentially, what I am doing is changing the key from the name of the ConfigMap, to the entire ConfigMap.

As for pointer as key semantics, "Pointer values are comparable. Two pointer values are equal if they point to the same variable or if both have value nil. Pointers to distinct zero-size variables may or may not be equal.". So essentially, the pointer must be to the same place. The equality of what is being pointed at does not matter.

I tried to use non-pointers, but v1.ConfigMap's are "invalid map key type".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why this needs to be a map with a pointer key. This would be equivalent, right?

type DefaultConstructor struct {
  ConfigMap *v1.ConfigMap
  Constructor func(*v1.ConfigMap) (interface{}, error)
}

func NewDefaultUntypedStore(..., defaultConstructors []DefaultConstructor, ...)

I still like my version above better, but the array of structs makes more sense to me than the map.

WDYT @Harwayne? Do you like pkg's configmap.Constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to slice of structs.

The reason I was using a map to begin with was to protect against the same thing being registered twice. But that protection didn't actually occur. Both because it was a pointer and because I really wanted the key to just be the name of the ConfigMap, not its whole value.


// DefaultUntypedStore is an UntypedStore with default values for ConfigMaps that do not exist.
type DefaultUntypedStore struct {
store *configmap.UntypedStore
defaultCMs []v1.ConfigMap
}

// NewDefaultUntypedStore creates a new DefaultUntypedStore.
func NewDefaultUntypedStore(
name string,
logger configmap.Logger,
defaultConstructors DefaultConstructors,
onAfterStore ...func(name string, value interface{})) *DefaultUntypedStore {
constructors := configmap.Constructors{}
defaultCMs := make([]v1.ConfigMap, 0, len(defaultConstructors))
for cm, ctor := range defaultConstructors {
constructors[cm.Name] = ctor
defaultCMs = append(defaultCMs, *cm)
}
return &DefaultUntypedStore{
store: configmap.NewUntypedStore(name, logger, constructors, onAfterStore...),
defaultCMs: defaultCMs,
}
}

// WatchConfigs uses the provided configmap.DefaultingWatcher to setup watches for the config maps
// provided in defaultCMs.
func (s *DefaultUntypedStore) WatchConfigs(w configmap.DefaultingWatcher) {
for _, cm := range s.defaultCMs {
w.WatchWithDefault(cm, s.store.OnConfigChanged)
}
}
12 changes: 12 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,14 @@ func envVars(containerName string) []corev1.EnvVar {
}
case ingressContainerName:
return []corev1.EnvVar{
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "FILTER",
Value: "",
Expand All @@ -942,6 +950,10 @@ func envVars(containerName string) []corev1.EnvVar {
Name: "BROKER",
Value: brokerName,
},
{
Name: "ZIPKIN_SERVICE_NAME",
Value: fmt.Sprintf("%s-broker-ingress.%s", brokerName, testNS),
},
}
}
return []corev1.EnvVar{}
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/broker/resources/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Image: args.Image,
Name: "ingress",
Env: []corev1.EnvVar{
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "FILTER",
Value: "", // TODO Add one.
Expand All @@ -75,6 +83,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Name: "BROKER",
Value: args.Broker.Name,
},
{
Name: "ZIPKIN_SERVICE_NAME",
Value: fmt.Sprintf("%s-broker-ingress.%s", args.Broker.Name, args.Broker.Namespace),
},
},
Ports: []corev1.ContainerPort{
{
Expand Down
25 changes: 20 additions & 5 deletions pkg/tracing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package tracing
import (
"fmt"

eventingconfigmap "github.com/knative/eventing/pkg/configmap"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/tracing"
tracingconfig "github.com/knative/pkg/tracing/config"
zipkin "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TODO Move this to knative/pkg.
Expand All @@ -45,6 +48,18 @@ var (
SampleRate: 0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}

enableZeroSamplingCM = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: tracingconfig.ConfigName,
},
Data: map[string]string{
"enable": "True",
"debug": "False",
"sample-rate": "0",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
},
}
)

// setupZipkinPublishing sets up Zipkin trace publishing for the process. Note that other pieces
Expand Down Expand Up @@ -82,7 +97,7 @@ func SetupStaticZipkinPublishing(serviceName string, cfg *tracingconfig.Config)
// just ensures that if generated, they are collected appropriately. This is normally done by using
// tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically
// updated when the ConfigMap is updated.
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName string) error {
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.DefaultingWatcher, serviceName string) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
Expand All @@ -101,11 +116,11 @@ func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher co
}

// Set up our config store.
configStore := configmap.NewUntypedStore(
configStore := eventingconfigmap.NewDefaultUntypedStore(
"tracing-config",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
eventingconfigmap.DefaultConstructors{
enableZeroSamplingCM: tracingconfig.NewTracingConfigFromConfigMap,
},
tracerUpdater)
configStore.WatchConfigs(configMapWatcher)
Expand Down