Skip to content

Commit

Permalink
Add tracing support to in-memory dispatcher. (#1199)
Browse files Browse the repository at this point in the history
* Add tracing support to in-memory dispatcher.

* Remove ClusterRole configmaps.
  • Loading branch information
Harwayne authored and knative-prow-robot committed May 9, 2019
1 parent fd45b55 commit 64df1c3
Show file tree
Hide file tree
Showing 38 changed files with 3,005 additions and 9 deletions.
25 changes: 23 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion cmd/in_memory/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,24 @@ import (
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
"github.com/knative/eventing/pkg/provisioners/swappable"
"github.com/knative/eventing/pkg/tracing"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/system"
pkgtracing "github.com/knative/pkg/tracing"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

const (
NAMESPACE = "NAMESPACE"
)

var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
Expand Down Expand Up @@ -68,9 +77,15 @@ func main() {
logger.Fatal("Unable to create channel watcher.", zap.Error(err))
}

kc := kubernetes.NewForConfigOrDie(mgr.GetConfig())
configMapWatcher := configmap.NewInformedWatcher(kc, system.Namespace())
if err = tracing.SetupDynamicZipkinPublishing(logger.Sugar(), configMapWatcher, "in-memory-dispatcher"); err != nil {
logger.Fatal("Error setting up Zipkin publishing", zap.Error(err))
}

s := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: sh,
Handler: pkgtracing.HTTPSpanMiddleware(sh),
ErrorLog: zap.NewStdLog(logger),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Expand All @@ -86,6 +101,12 @@ func main() {

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// configMapWatcher does not block, so start it first.
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatal("Failed to start ConfigMap watcher", zap.Error(err))
}

// Start blocks forever.
if err = mgr.Start(stopCh); err != nil {
logger.Error("manager.Start() returned an error", zap.Error(err))
Expand Down
48 changes: 48 additions & 0 deletions config/config-tracing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-tracing
namespace: knative-eventing
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################
# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# If true we enable adding spans within our applications.
enable: "false"
# URL to zipkin collector where traces are sent.
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"
# Enable zipkin debug mode. This allows all spans to be sent to the server
# bypassing sampling.
debug: "false"
# Percentage (0-1) of requests to trace
sample-rate: "0.1"
33 changes: 33 additions & 0 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,39 @@ roleRef:

---

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: in-memory-channel-dispatcher
namespace: knative-eventing
rules:
- apiGroups:
- ""
resources:
- "configmaps"
verbs:
- get
- list
- watch

---

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: in-memory-channel-dispatcher
namespace: knative-eventing
subjects:
- kind: ServiceAccount
name: in-memory-channel-dispatcher
namespace: knative-eventing
roleRef:
kind: Role
name: in-memory-channel-dispatcher
apiGroup: rbac.authorization.k8s.io

---

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
13 changes: 7 additions & 6 deletions pkg/provisioners/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
"net/http"
"strings"

"github.com/knative/pkg/tracing"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
)

// MessageReceiver starts a server to receive new messages for the channel dispatcher. The new
// message is emitted via the receiver function.
const (
// MessageReceiverPort is the port that MessageReceiver opens an HTTP server on.
MessageReceiverPort = 8080
)

// MessageReceiver receives messages.
// MessageReceiver starts a server to receive new messages for the channel dispatcher. The new
// message is emitted via the receiver function.
type MessageReceiver struct {
receiverFunc func(ChannelReference, *Message) error
forwardHeaders sets.String
Expand Down Expand Up @@ -113,7 +114,7 @@ func (r *MessageReceiver) stop(srv *http.Server) {

// handler creates the http.Handler used by the http.Server started in MessageReceiver.Run.
func (r *MessageReceiver) handler() http.Handler {
return http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
return tracing.HTTPSpanMiddleware(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" {
res.WriteHeader(http.StatusNotFound)
return
Expand All @@ -124,7 +125,7 @@ func (r *MessageReceiver) handler() http.Handler {
}

r.HandleRequest(res, req)
})
}))
}

// HandleRequest is an http.Handler function. The request is converted to a
Expand All @@ -139,7 +140,7 @@ func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Reque
r.logger.Infof("Received request for %s", host)
channel, err := r.hostToChannelFunc(host)
if err != nil {
r.logger.Info("Could not extract channel", zap.Error(err))
r.logger.Infow("Could not extract channel", zap.Error(err))
res.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
113 changes: 113 additions & 0 deletions pkg/tracing/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"fmt"

"github.com/knative/pkg/configmap"
"github.com/knative/pkg/tracing"
tracingconfig "github.com/knative/pkg/tracing/config"
"github.com/openzipkin/zipkin-go"
"go.uber.org/zap"
)

// TODO Move this to knative/pkg.

var (
// DebugCfg is a configuration to use to record all traces.
DebugCfg = &tracingconfig.Config{
Enable: true,
Debug: true,
SampleRate: 1,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
// EnabledZeroSampling is a configuration that enables tracing, but has a sampling rate of zero.
// The intention is that this allows this to record traces that other components started, but
// will not start traces itself.
EnabledZeroSampling = &tracingconfig.Config{
Enable: true,
Debug: false,
SampleRate: 0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
)

// setupZipkinPublishing sets up Zipkin trace publishing for the process. Note that other pieces
// still need to generate the traces, this just ensures that if generated, they are collected
// appropriately. This is normally done by using tracing.HTTPSpanMiddleware as a middleware HTTP
// handler.
func setupZipkinPublishing(serviceName string) (*tracing.OpenCensusTracer, error) {
// TODO Should we fill in the hostPort?
zipkinEndpoint, err := zipkin.NewEndpoint(serviceName, "")
if err != nil {
return nil, fmt.Errorf("unable to create tracing endpoint: %v", err)
}
oct := tracing.NewOpenCensusTracer(tracing.WithZipkinExporter(tracing.CreateZipkinReporter, zipkinEndpoint))
return oct, nil
}

// SetupStaticZipkinPublishing sets up Zipkin trace publishing for the process. Note that other
// pieces still need to generate the traces, this just ensures that if generated, they are collected
// appropriately. This is normally done by using tracing.HTTPSpanMiddleware as a middleware HTTP
// handler. The configuration will not be dynamically updated.
func SetupStaticZipkinPublishing(serviceName string, cfg *tracingconfig.Config) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
}
err = oct.ApplyConfig(cfg)
if err != nil {
return fmt.Errorf("unable to set OpenCensusTracing config: %v", err)
}
return nil
}

// SetupDynamicZipkinPublishing sets up Zipkin trace publishing for the process, by watching a
// ConfigMap for the configuration. Note that other pieces still need to generate the traces, this
// just ensures that if generated, they are collected appropriately. This is normally done by using
// tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically
// updated when the ConfigMap is updated.
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName string) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
}

tracerUpdater := func(name string, value interface{}) {
if name == tracingconfig.ConfigName {
cfg := value.(*tracingconfig.Config)
logger.Debugw("Updating tracing config", zap.Any("cfg", cfg))
err = oct.ApplyConfig(cfg)
if err != nil {
logger.Errorw("Unable to apply open census tracer config", zap.Error(err))
return
}
}
}

// Set up our config store.
configStore := configmap.NewUntypedStore(
"tracing-config",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
},
tracerUpdater)
configStore.WatchConfigs(configMapWatcher)
return nil
}

0 comments on commit 64df1c3

Please sign in to comment.