Skip to content

Commit

Permalink
feat(span-customizer): adds support for span customizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcchavezs committed Sep 20, 2020
1 parent 1c762e0 commit 92e7337
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 42 deletions.
30 changes: 23 additions & 7 deletions middleware/grpc/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,25 +42,41 @@ func WithRemoteServiceName(name string) ClientOption {
}
}

// WithClientOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the outgoing request payload
func WithClientOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outPayload = parser
}
}

// WithClientOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the outgoing request payload
func WithClientOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outHeader = parser
}
}

// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption {
// the incoming response payload
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption {
// the incoming response trailer
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption {
// the incoming response header
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inHeader = parser
}
Expand Down
105 changes: 105 additions & 0 deletions middleware/grpc/client_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_test

import (
"context"
"testing"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
service "github.com/openzipkin/zipkin-go/proto/testing"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

func TestGRPCClientCanAccessToPayloadAndMetadata(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer()
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{
responseHeader: metadata.Pairs("test_key", "test_value_1"),
responseTrailer: metadata.Pairs("test_key", "test_value_2"),
})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
grpc.WithStatsHandler(zipkingrpc.NewClientHandler(
tracer,
zipkingrpc.WithClientOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.SpanCustomizer) {
m, ok := outPayload.Payload.(*service.HelloRequest)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloResponse")
}
if want, have := "Hello", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value", outHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInPayloadParser(func(inPayload *stats.InPayload, span zipkin.SpanCustomizer) {
m, ok := inPayload.Payload.(*service.HelloResponse)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloRequest")
}
if want, have := "World", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInHeaderParser(func(inHeader *stats.InHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value_1", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.SpanCustomizer) {
if want, have := "test_value_2", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
)),
)

if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}
}
7 changes: 6 additions & 1 deletion middleware/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,6 +125,8 @@ func (g *sequentialIdGenerator) reset() {

type TestHelloService struct {
service.UnimplementedHelloServiceServer
responseHeader metadata.MD
responseTrailer metadata.MD
}

func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {
Expand Down Expand Up @@ -158,6 +160,9 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)
}
}

grpc.SetTrailer(ctx, s.responseTrailer)
grpc.SendHeader(ctx, s.responseHeader)

return resp, nil
}

Expand Down
38 changes: 27 additions & 11 deletions middleware/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,26 +41,42 @@ func ServerTags(tags map[string]string) ServerOption {
}

// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption {
// the incoming request payload
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption {
// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the incoming request header
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inTrailer = parser
h.handleRPCParser.inHeader = parser
}
}

// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
// WithServerOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the outgoing response payload
func WithServerOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inHeader = parser
h.handleRPCParser.outPayload = parser
}
}

// WithServerOutTrailerParser adds a parser for the stats.OutTrailer to be able to access
// the outgoing response trailer
func WithServerOutTrailerParser(parser func(*stats.OutTrailer, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outTrailer = parser
}
}

// WithServerOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the outgoing response payload
func WithServerOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outHeader = parser
}
}

Expand Down
49 changes: 35 additions & 14 deletions middleware/grpc/server_parser_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,30 +77,56 @@ func TestGRPCServerCreatesASpanAndContext(t *testing.T) {
}
}

func TestGRPCServerCanAccessToHeaders(t *testing.T) {
func TestGRPCServerCanAccessToPayloadAndMetadata(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
zipkingrpc.WithServerInPayloadParser(func(inPayload *stats.InPayload, span zipkin.SpanCustomizer) {
m, ok := inPayload.Payload.(*service.HelloRequest)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloRequest")
}
if want, have := "Hello", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.SpanCustomizer) {
m, ok := outPayload.Payload.(*service.HelloResponse)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloResponse")
}
if want, have := "World", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value_1", outHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
zipkingrpc.WithServerOutTrailerParser(func(outTrailer *stats.OutTrailer, span zipkin.SpanCustomizer) {
if want, have := "test_value_2", outTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("incorrect trailer value, want %q, have %q", want, have)
}
}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})
service.RegisterHelloServiceServer(s, &TestHelloService{
responseHeader: metadata.Pairs("test_key", "test_value_1"),
responseTrailer: metadata.Pairs("test_key", "test_value_2"),
})

dialer := initListener(s)

Expand All @@ -118,7 +144,7 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) {

client := service.NewHelloServiceClient(conn)

ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value")
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
Expand All @@ -130,9 +156,4 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) {
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}
Loading

0 comments on commit 92e7337

Please sign in to comment.