Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Kafka source tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed May 8, 2020
1 parent ac9b94e commit f71f616
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 127 deletions.
46 changes: 46 additions & 0 deletions kafka/source/config/config-observability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,49 @@ data:
# flag to "true" could cause extra Stackdriver charge.
# If metrics.backend-destination is not Stackdriver, this is ignored.
metrics.allow-stackdriver-custom-metrics: "false"
---

apiVersion: v1
kind: ConfigMap
metadata:
name: config-tracing
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
knative.dev/config-propagation: original
knative.dev/config-category: eventing
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################
# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# This may be "zipkin" or "stackdriver", the default is "none"
backend: "none"
# URL to zipkin collector where traces are sent.
# This must be specified when backend is "zipkin"
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"
# The GCP project into which stackdriver metrics will be written
# when backend is "stackdriver". If unspecified, the project-id
# is read from GCP metadata when running on GCP.
stackdriver-project-id: "my-project"
# Enable zipkin debug mode. This allows all spans to be sent to the server
# bypassing sampling.
debug: "false"
# Percentage (0-1) of requests to trace
sample-rate: "0.1"
6 changes: 5 additions & 1 deletion kafka/source/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"strings"

"go.opencensus.io/trace"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/source"
Expand Down Expand Up @@ -116,12 +117,15 @@ func (a *Adapter) Start(stopCh <-chan struct{}) error {
// --------------------------------------------------------------------

func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool, error) {
ctx, span := trace.StartSpan(ctx, "kafka-source")
defer span.End()

req, err := a.httpMessageSender.NewCloudEventRequest(ctx)
if err != nil {
return false, err
}

err = a.ConsumerMessageToHttpRequest(ctx, msg, req, a.logger)
err = a.ConsumerMessageToHttpRequest(ctx, span, msg, req, a.logger)
if err != nil {
return true, err
}
Expand Down
132 changes: 81 additions & 51 deletions kafka/source/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/v2/types"
"go.uber.org/zap"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/source"

"go.uber.org/zap"

"github.com/Shopify/sarama"

"knative.dev/eventing/pkg/kncloudevents"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
)

func TestPostMessage_ServeHTTP(t *testing.T) {
func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
aTimestamp := time.Now()

testCases := map[string]struct {
Expand All @@ -60,12 +58,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -81,12 +80,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -103,12 +103,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -125,12 +126,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -155,6 +157,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
Expand Down Expand Up @@ -186,6 +189,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
Expand Down Expand Up @@ -226,24 +230,20 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
},
Timestamp: aTimestamp,
},
// Because we need to write the distributed tracing extension
expectedHeaders: map[string]string{
"content-type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-id": "A234-1234-1234",
"ce-time": "2018-04-05T17:31:00Z",
"ce-type": "com.github.pull.create",
"ce-subject": "123",
"ce-source": "https://github.com/cloudevents/spec/pull",
"ce-comexampleextension1": "value",
"ce-comexampleothervalue": "5",
"content-type": "application/json",
},
expectedBody: string(mustJsonMarshal(t, map[string]interface{}{
"specversion": "1.0",
"type": "com.github.pull.create",
"source": "https://github.com/cloudevents/spec/pull",
"subject": "123",
"id": "A234-1234-1234",
"time": "2018-04-05T17:31:00Z",
"comexampleextension1": "value",
"comexampleothervalue": 5,
"datacontenttype": "application/json",
"data": map[string]string{
"hello": "Francesco",
},
})),
error: false,
expectedBody: `{"hello":"Francesco"}`,
error: false,
},
"accepted_binary": {
sink: sinkAccepted,
Expand Down Expand Up @@ -301,12 +301,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
expectedBody: `{"key":"value"}`,
error: true,
Expand All @@ -323,6 +324,15 @@ func TestPostMessage_ServeHTTP(t *testing.T) {

statsReporter, _ := source.NewStatsReporter()

// If you wanna test tracing using a local zipkin server, uncomment this
//tracing.SetupStaticPublishing(zap.L().Sugar(), "localhost", &tracingconfig.Config{
// Backend: tracingconfig.Zipkin,
// Debug: true,
// SampleRate: 1.0,
// ZipkinEndpoint: "http://localhost:9411/api/v2/spans",
//})
//defer time.Sleep(1 * time.Second)

s, err := kncloudevents.NewHttpMessageSender(nil, sinkServer.URL)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -350,12 +360,32 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
t.Errorf("expected error, but got %v", err)
}

// Remove headers we aren't interested to test
h.header.Del("user-agent")
h.header.Del("accept-encoding")
h.header.Del("content-length")

// Check headers
for k, expected := range tc.expectedHeaders {
actual := h.header.Get(k)
if actual != expected {
t.Errorf("Expected header with key %s: '%q', but got '%q'", k, expected, actual)
}
h.header.Del(k)
}

// Check tracing headers
if h.header.Get("traceparent") == "" {
t.Errorf("Expected traceparent header")
}
h.header.Del("traceparent")
if h.header.Get("ce-traceparent") == "" {
t.Errorf("Expected ce-traceparent header")
}
h.header.Del("ce-traceparent")

if len(h.header) != 0 {
t.Errorf("Unexpected headers: %v", h.header)
}

// Check body
Expand Down
11 changes: 8 additions & 3 deletions kafka/source/pkg/adapter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import (
"github.com/Shopify/sarama"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/protocol/kafka_sarama"
"go.opencensus.io/trace"
"go.uber.org/zap"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
)

func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.Span, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
msg := kafka_sarama.NewMessageFromConsumerMessage(cm)

defer func() {
Expand All @@ -44,9 +46,12 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.C
}
}()

// Build tracing ext to write it as output
tracingExt := extensions.FromSpanContext(span.SpanContext())

if msg.ReadEncoding() != binding.EncodingUnknown {
// Message is a CloudEvent -> Encode directly to HTTP
return http.WriteRequest(ctx, msg, req)
return http.WriteRequest(ctx, msg, req, tracingExt.WriteTransformer())
}

// Message is not a CloudEvent -> We need to translate it to a valid CloudEvent
Expand All @@ -67,7 +72,7 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.C
return err
}

return http.WriteRequest(ctx, binding.ToMessage(&event), req)
return http.WriteRequest(ctx, binding.ToMessage(&event), req, tracingExt.WriteTransformer())
}

func makeEventId(partition int32, offset int64) string {
Expand Down
11 changes: 5 additions & 6 deletions kafka/source/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (
"os"

"k8s.io/client-go/tools/cache"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/source"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/resolver"

"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
Expand All @@ -58,6 +58,7 @@ func NewController(
deploymentLister: deploymentInformer.Lister(),
receiveAdapterImage: raImage,
loggingContext: ctx,
configs: source.StartWatchingSourceConfigurations(ctx, component, cmw),
}

impl := kafkasource.NewImpl(ctx, c)
Expand All @@ -72,7 +73,5 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

cmw.Watch(logging.ConfigMapName(), c.UpdateFromLoggingConfigMap)
cmw.Watch(metrics.ConfigMapName(), c.UpdateFromMetricsConfigMap)
return impl
}

0 comments on commit f71f616

Please sign in to comment.