Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing support to in-memory dispatcher. #1199

Merged
merged 3 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 23 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion cmd/in_memory/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,24 @@ import (
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
"github.com/knative/eventing/pkg/provisioners/swappable"
"github.com/knative/eventing/pkg/tracing"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/system"
pkgtracing "github.com/knative/pkg/tracing"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

const (
NAMESPACE = "NAMESPACE"
)

var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
Expand Down Expand Up @@ -68,9 +77,15 @@ func main() {
logger.Fatal("Unable to create channel watcher.", zap.Error(err))
}

kc := kubernetes.NewForConfigOrDie(mgr.GetConfig())
configMapWatcher := configmap.NewInformedWatcher(kc, system.Namespace())
if err = tracing.SetupDynamicZipkinPublishing(logger.Sugar(), configMapWatcher, "in-memory-dispatcher"); err != nil {
logger.Fatal("Error setting up Zipkin publishing", zap.Error(err))
}

s := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: sh,
Handler: pkgtracing.HTTPSpanMiddleware(sh),
ErrorLog: zap.NewStdLog(logger),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Expand All @@ -86,6 +101,12 @@ func main() {

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// configMapWatcher does not block, so start it first.
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatal("Failed to start ConfigMap watcher", zap.Error(err))
}

// Start blocks forever.
if err = mgr.Start(stopCh); err != nil {
logger.Error("manager.Start() returned an error", zap.Error(err))
Expand Down
48 changes: 48 additions & 0 deletions config/config-tracing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-tracing
namespace: knative-eventing
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################

# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# If true we enable adding spans within our applications.
enable: "false"

# URL to zipkin collector where traces are sent.
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"

# Enable zipkin debug mode. This allows all spans to be sent to the server
# bypassing sampling.
debug: "false"

# Percentage (0-1) of requests to trace
sample-rate: "0.1"
33 changes: 33 additions & 0 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,39 @@ roleRef:

---

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: in-memory-channel-dispatcher
namespace: knative-eventing
rules:
- apiGroups:
- ""
resources:
- "configmaps"
verbs:
- get
- list
- watch

---

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: in-memory-channel-dispatcher
namespace: knative-eventing
subjects:
- kind: ServiceAccount
name: in-memory-channel-dispatcher
namespace: knative-eventing
roleRef:
kind: Role
name: in-memory-channel-dispatcher
apiGroup: rbac.authorization.k8s.io

---

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
13 changes: 7 additions & 6 deletions pkg/provisioners/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
"net/http"
"strings"

"github.com/knative/pkg/tracing"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
)

// MessageReceiver starts a server to receive new messages for the channel dispatcher. The new
// message is emitted via the receiver function.
const (
// MessageReceiverPort is the port that MessageReceiver opens an HTTP server on.
MessageReceiverPort = 8080
)

// MessageReceiver receives messages.
// MessageReceiver starts a server to receive new messages for the channel dispatcher. The new
// message is emitted via the receiver function.
type MessageReceiver struct {
receiverFunc func(ChannelReference, *Message) error
forwardHeaders sets.String
Expand Down Expand Up @@ -113,7 +114,7 @@ func (r *MessageReceiver) stop(srv *http.Server) {

// handler creates the http.Handler used by the http.Server started in MessageReceiver.Run.
func (r *MessageReceiver) handler() http.Handler {
return http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
return tracing.HTTPSpanMiddleware(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" {
res.WriteHeader(http.StatusNotFound)
return
Expand All @@ -124,7 +125,7 @@ func (r *MessageReceiver) handler() http.Handler {
}

r.HandleRequest(res, req)
})
}))
}

// HandleRequest is an http.Handler function. The request is converted to a
Expand All @@ -139,7 +140,7 @@ func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Reque
r.logger.Infof("Received request for %s", host)
channel, err := r.hostToChannelFunc(host)
if err != nil {
r.logger.Info("Could not extract channel", zap.Error(err))
r.logger.Infow("Could not extract channel", zap.Error(err))
res.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
113 changes: 113 additions & 0 deletions pkg/tracing/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"fmt"

"github.com/knative/pkg/configmap"
"github.com/knative/pkg/tracing"
tracingconfig "github.com/knative/pkg/tracing/config"
"github.com/openzipkin/zipkin-go"
"go.uber.org/zap"
)

// TODO Move this to knative/pkg.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdemirhan I'm intending to move this up to knative/pkg once I prove it is useful in eventing. Would you mind reviewing this file?


var (
// DebugCfg is a configuration to use to record all traces.
DebugCfg = &tracingconfig.Config{
Enable: true,
Debug: true,
SampleRate: 1,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
// EnabledZeroSampling is a configuration that enables tracing, but has a sampling rate of zero.
// The intention is that this allows this to record traces that other components started, but
// will not start traces itself.
EnabledZeroSampling = &tracingconfig.Config{
Enable: true,
Debug: false,
SampleRate: 0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
)

// setupZipkinPublishing sets up Zipkin trace publishing for the process. Note that other pieces
// still need to generate the traces, this just ensures that if generated, they are collected
// appropriately. This is normally done by using tracing.HTTPSpanMiddleware as a middleware HTTP
// handler.
func setupZipkinPublishing(serviceName string) (*tracing.OpenCensusTracer, error) {
// TODO Should we fill in the hostPort?
zipkinEndpoint, err := zipkin.NewEndpoint(serviceName, "")
if err != nil {
return nil, fmt.Errorf("unable to create tracing endpoint: %v", err)
}
oct := tracing.NewOpenCensusTracer(tracing.WithZipkinExporter(tracing.CreateZipkinReporter, zipkinEndpoint))
return oct, nil
}

// SetupStaticZipkinPublishing sets up Zipkin trace publishing for the process. Note that other
// pieces still need to generate the traces, this just ensures that if generated, they are collected
// appropriately. This is normally done by using tracing.HTTPSpanMiddleware as a middleware HTTP
// handler. The configuration will not be dynamically updated.
func SetupStaticZipkinPublishing(serviceName string, cfg *tracingconfig.Config) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
}
err = oct.ApplyConfig(cfg)
if err != nil {
return fmt.Errorf("unable to set OpenCensusTracing config: %v", err)
}
return nil
}

// SetupDynamicZipkinPublishing sets up Zipkin trace publishing for the process, by watching a
// ConfigMap for the configuration. Note that other pieces still need to generate the traces, this
// just ensures that if generated, they are collected appropriately. This is normally done by using
// tracing.HTTPSpanMiddleware as a middleware HTTP handler. The configuration will be dynamically
// updated when the ConfigMap is updated.
func SetupDynamicZipkinPublishing(logger *zap.SugaredLogger, configMapWatcher configmap.Watcher, serviceName string) error {
oct, err := setupZipkinPublishing(serviceName)
if err != nil {
return err
}

tracerUpdater := func(name string, value interface{}) {
if name == tracingconfig.ConfigName {
cfg := value.(*tracingconfig.Config)
logger.Debugw("Updating tracing config", zap.Any("cfg", cfg))
err = oct.ApplyConfig(cfg)
if err != nil {
logger.Errorw("Unable to apply open census tracer config", zap.Error(err))
return
}
}
}

// Set up our config store.
configStore := configmap.NewUntypedStore(
"tracing-config",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
},
tracerUpdater)
configStore.WatchConfigs(configMapWatcher)
return nil
}