forked from DataDog/dd-trace-go
/
server.go
93 lines (83 loc) · 2.89 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package grpc
import (
context "golang.org/x/net/context"
"google.golang.org/grpc"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
type serverStream struct {
grpc.ServerStream
cfg *interceptorConfig
method string
ctx context.Context
}
// Context returns the ServerStream Context.
//
// One subtle difference between the server stream and the client stream is the
// order the contexts are created. In the client stream we pass the context to
// the streamer function, which means the ClientStream.Context() derives from
// the span context, so we want to return that. However with the ServerStream
// the span context derives from the ServerStream.Context, so we want to return
// the span context instead.
func (ss *serverStream) Context() context.Context {
return ss.ctx
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
span, _ := startSpanFromContext(ss.ctx, ss.method, "grpc.message", ss.cfg.serverServiceName())
defer span.Finish(withStreamError(err))
}
err = ss.ServerStream.RecvMsg(m)
return err
}
func (ss *serverStream) SendMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
span, _ := startSpanFromContext(ss.ctx, ss.method, "grpc.message", ss.cfg.serverServiceName())
defer span.Finish(withStreamError(err))
}
err = ss.ServerStream.SendMsg(m)
return err
}
// StreamServerInterceptor will trace streaming requests to the given gRPC server.
func StreamServerInterceptor(opts ...InterceptorOption) grpc.StreamServerInterceptor {
cfg := new(interceptorConfig)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
if cfg.serviceName == "" {
cfg.serviceName = "grpc.server"
}
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx := ss.Context()
// if we've enabled call tracing, create a span
if cfg.traceStreamCalls {
var span ddtrace.Span
span, ctx = startSpanFromContext(ctx, info.FullMethod, "grpc.server", cfg.serviceName)
defer span.Finish(withStreamError(err))
}
// call the original handler with a new stream, which traces each send
// and recv if message tracing is enabled
err = handler(srv, &serverStream{
ServerStream: ss,
cfg: cfg,
method: info.FullMethod,
ctx: ctx,
})
return err
}
}
// UnaryServerInterceptor will trace requests to the given grpc server.
func UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor {
cfg := new(interceptorConfig)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
span, ctx := startSpanFromContext(ctx, info.FullMethod, "grpc.server", cfg.serverServiceName())
resp, err := handler(ctx, req)
span.Finish(tracer.WithError(err))
return resp, err
}
}