Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker Ingress Tracing #1290

Merged
merged 19 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 32 additions & 14 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,36 @@ import (
"log"
"net/http"
"net/url"
"os"
"reflect"
"sync"
"time"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

cloudevents "github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/broker"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/tracing"
"github.com/knative/eventing/pkg/utils"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/signals"
pkgtracing "github.com/knative/pkg/tracing"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

const (
NAMESPACE = "NAMESPACE"
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -76,15 +85,26 @@ func main() {
logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err))
}

brokerName := getRequiredEnv("BROKER")
brokerName := utils.GetRequiredEnvOrFatal("BROKER")
Harwayne marked this conversation as resolved.
Show resolved Hide resolved

channelURI := &url.URL{
Scheme: "http",
Host: getRequiredEnv("CHANNEL"),
Host: utils.GetRequiredEnvOrFatal("CHANNEL"),
Path: "/",
}

ceClient, err := cloudevents.NewDefaultClient()
kc := kubernetes.NewForConfigOrDie(mgr.GetConfig())
configMapWatcher := configmap.NewInformedWatcher(kc, utils.GetRequiredEnvOrFatal(NAMESPACE))

if err = tracing.SetupDynamicZipkinPublishing(logger.Sugar(), configMapWatcher, utils.GetRequiredEnvOrFatal("ZIPKIN_SERVICE_NAME")); err != nil {
logger.Fatal("Error setting up Zipkin publishing", zap.Error(err))
}

httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(pkgtracing.HTTPSpanMiddleware))
if err != nil {
logger.Fatal("Unable to create CE transport", zap.Error(err))
}
ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
logger.Fatal("Unable to create CE client", zap.Error(err))
}
Expand Down Expand Up @@ -127,6 +147,12 @@ func main() {

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()

// configMapWatcher does not block, so start it first.
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatal("Failed to start ConfigMap watcher", zap.Error(err))
}

// Start blocks forever.
if err = mgr.Start(stopCh); err != nil {
logger.Error("manager.Start() returned an error", zap.Error(err))
Expand All @@ -146,14 +172,6 @@ func main() {
logger.Info("Done.")
}

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}

type handler struct {
logger *zap.Logger
ceClient cloudevents.Client
Expand Down
10 changes: 9 additions & 1 deletion config/200-broker-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: eventing-broker-ingress
rules: []
rules:
- apiGroups:
- ""
resources:
- "configmaps"
verbs:
- "get"
- "list"
- "watch"
43 changes: 43 additions & 0 deletions pkg/configmap/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package configmap
Harwayne marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/knative/pkg/configmap"
v1 "k8s.io/api/core/v1"
)

// TODO Move this to knative/pkg.

// DefaultConstructors are something.
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
type DefaultConstructors map[*v1.ConfigMap]interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a map? What are the semantics of a map keyed by struct pointers?

I think this would make more sense to me as:

type DefaultConstructorFunc func() *v1.ConfigMap
type DefaultConstructors map[string]DefaultConstructorFunc

I think that would simplify this implementation since there's no need to store default configmaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is based on knative/pkg's configmap.Constructor:

type Constructors map[string]interface{}

The suggestion drops the 'constructor' portion of this map. Essentially, what I am doing is changing the key from the name of the ConfigMap, to the entire ConfigMap.

As for pointer as key semantics, "Pointer values are comparable. Two pointer values are equal if they point to the same variable or if both have value nil. Pointers to distinct zero-size variables may or may not be equal.". So essentially, the pointer must be to the same place. The equality of what is being pointed at does not matter.

I tried to use non-pointers, but v1.ConfigMap's are "invalid map key type".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why this needs to be a map with a pointer key. This would be equivalent, right?

type DefaultConstructor struct {
  ConfigMap *v1.ConfigMap
  Constructor func(*v1.ConfigMap) (interface{}, error)
}

func NewDefaultUntypedStore(..., defaultConstructors []DefaultConstructor, ...)

I still like my version above better, but the array of structs makes more sense to me than the map.

WDYT @Harwayne? Do you like pkg's configmap.Constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to slice of structs.

The reason I was using a map to begin with was to protect against the same thing being registered twice. But that protection didn't actually occur. Both because it was a pointer and because I really wanted the key to just be the name of the ConfigMap, not its whole value.


// DefaultUntypedStore is an UntypedStore with default values for ConfigMaps that do not exist.
type DefaultUntypedStore struct {
store *configmap.UntypedStore
defaultCMs []v1.ConfigMap
}

// NewDefaultUntypedStore creates a new DefaultUntypedStore.
func NewDefaultUntypedStore(
name string,
logger configmap.Logger,
defaultConstructors DefaultConstructors,
onAfterStore ...func(name string, value interface{})) *DefaultUntypedStore {
constructors := configmap.Constructors{}
defaultCMs := make([]v1.ConfigMap, 0, len(defaultConstructors))
for cm, ctor := range defaultConstructors {
constructors[cm.Name] = ctor
defaultCMs = append(defaultCMs, *cm)
}
return &DefaultUntypedStore{
store: configmap.NewUntypedStore(name, logger, constructors, onAfterStore...),
defaultCMs: defaultCMs,
}
}

// WatchConfigs uses the provided configmap.DefaultingWatcher to setup watches for the config maps
// provided in defaultCMs.
func (s *DefaultUntypedStore) WatchConfigs(w configmap.DefaultingWatcher) {
for _, cm := range s.defaultCMs {
w.WatchWithDefault(cm, s.store.OnConfigChanged)
}
}
12 changes: 12 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,14 @@ func envVars(containerName string) []corev1.EnvVar {
}
case ingressContainerName:
return []corev1.EnvVar{
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "FILTER",
Value: "",
Expand All @@ -942,6 +950,10 @@ func envVars(containerName string) []corev1.EnvVar {
Name: "BROKER",
Value: brokerName,
},
{
Name: "ZIPKIN_SERVICE_NAME",
Value: fmt.Sprintf("%s-broker-ingress.%s", brokerName, testNS),
},
}
}
return []corev1.EnvVar{}
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/broker/resources/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Image: args.Image,
Name: "ingress",
Env: []corev1.EnvVar{
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "FILTER",
Value: "", // TODO Add one.
Expand All @@ -75,6 +83,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Name: "BROKER",
Value: args.Broker.Name,
},
{
Name: "ZIPKIN_SERVICE_NAME",
Value: fmt.Sprintf("%s-broker-ingress.%s", args.Broker.Name, args.Broker.Namespace),
},
},
Ports: []corev1.ContainerPort{
{
Expand Down
25 changes: 20 additions & 5 deletions pkg/tracing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package tracing
import (
"fmt"

eventingconfigmap "github.com/knative/eventing/pkg/configmap"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/tracing"
tracingconfig "github.com/knative/pkg/tracing/config"
zipkin "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TODO Move this to knative/pkg.
Expand All @@ -45,6 +48,18 @@ var (
SampleRate: 0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}

enableZeroSamplingCM = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: tracingconfig.ConfigName,
},
Data: map[string]string{
"enable": "True",
"debug": "False",
"sample-rate": "0",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
},
}
)

// setupZipkinPublishing sets up Zipkin trace publishing for the process. Note that other pieces
Expand Down Expand Up @@ -82,7 +97,7 @@ func SetupStaticZipkinPublishing(serviceName string, cfg *tracingconfig.Config)
// just ensures that if generated, they are collected appropriately. This is normally done by using
// tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically
// updated when the ConfigMap is updated.
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName string) error {
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.DefaultingWatcher, serviceName string) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
Expand All @@ -101,11 +116,11 @@ func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher co
}

// Set up our config store.
configStore := configmap.NewUntypedStore(
configStore := eventingconfigmap.NewDefaultUntypedStore(
"tracing-config",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
eventingconfigmap.DefaultConstructors{
enableZeroSamplingCM: tracingconfig.NewTracingConfigFromConfigMap,
},
tracerUpdater)
configStore.WatchConfigs(configMapWatcher)
Expand Down
29 changes: 29 additions & 0 deletions pkg/utils/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"log"
"os"
)

// GetRequiredEnvOrFatal gets the environment variable's value. If the environment variable is not
// defined, then log.Fatal will be called, exiting the program.
func GetRequiredEnvOrFatal(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}