/
wrapper.go
119 lines (102 loc) · 3.12 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package wrapper
import (
"context"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"github.com/Cloudera-Sz/golang-micro/tracing"
"strings"
)
//MDReaderWriter metadata Reader and Writer
type MDReaderWriter struct {
metadata.MD
}
// ForeachKey implements ForeachKey of opentracing.TextMapReader
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, vs := range c.MD {
for _, v := range vs {
if err := handler(k, v); err != nil {
return err
}
}
}
return nil
}
// Set implements Set() of opentracing.TextMapWriter
func (c MDReaderWriter) Set(key, val string) {
key = strings.ToLower(key)
c.MD[key] = append(c.MD[key], val)
}
// DialOption grpc client option
func DialOption(tracer opentracing.Tracer) grpc.DialOption {
return grpc.WithUnaryInterceptor(ClientInterceptor(tracer))
}
// ServerOption grpc server option
func ServerOption(tracer opentracing.Tracer) grpc.ServerOption {
return grpc.UnaryInterceptor(ServerInterceptor(tracer))
}
// ClientInterceptor grpc client wrapper
func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string,
req, reply interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var parentCtx opentracing.SpanContext = tracing.GetSpanID(ctx)
//parentSpan := opentracing.SpanFromContext(ctx)
//if parentSpan != nil {
// parentCtx = parentSpan.Context()
//}
span := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
ext.SpanKindRPCClient,
)
defer span.Finish()
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}
mdWriter := MDReaderWriter{md}
err := tracer.Inject(span.Context(), opentracing.TextMap, mdWriter)
if err != nil {
span.LogFields(log.String("inject-error", err.Error()))
}
newCtx := metadata.NewOutgoingContext(ctx, md)
err = invoker(newCtx, method, req, reply, cc, opts...)
if err != nil {
span.LogFields(log.String("call-error", err.Error()))
}
return err
}
}
// ServerInterceptor grpc server wrapper
func ServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
}
spanContext, err := tracer.Extract(opentracing.TextMap, MDReaderWriter{md})
if err != nil && err != opentracing.ErrSpanContextNotFound {
grpclog.Errorf("extract from metadata err: %v", err)
} else {
span := tracer.StartSpan(
info.FullMethod,
ext.RPCServerOption(spanContext),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
ext.SpanKindRPCServer,
)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
}
return handler(ctx, req)
}
}