forked from DataDog/dd-trace-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
121 lines (110 loc) · 3.22 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2019 Datadog, Inc.
package grpc
import (
context "golang.org/x/net/context"
"google.golang.org/grpc"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
)
type serverStream struct {
grpc.ServerStream
cfg *config
method string
ctx context.Context
}
// Context returns the ServerStream Context.
//
// One subtle difference between the server stream and the client stream is the
// order the contexts are created. In the client stream we pass the context to
// the streamer function, which means the ClientStream.Context() derives from
// the span context, so we want to return that. However with the ServerStream
// the span context derives from the ServerStream.Context, so we want to return
// the span context instead.
func (ss *serverStream) Context() context.Context {
return ss.ctx
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
span, _ := startSpanFromContext(
ss.ctx,
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
ss.cfg.analyticsRate,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
err = ss.ServerStream.RecvMsg(m)
return err
}
func (ss *serverStream) SendMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
span, _ := startSpanFromContext(
ss.ctx,
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
ss.cfg.analyticsRate,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
err = ss.ServerStream.SendMsg(m)
return err
}
// StreamServerInterceptor will trace streaming requests to the given gRPC server.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := new(config)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
if cfg.serviceName == "" {
cfg.serviceName = "grpc.server"
}
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx := ss.Context()
// if we've enabled call tracing, create a span
if cfg.traceStreamCalls {
var span ddtrace.Span
span, ctx = startSpanFromContext(
ctx,
info.FullMethod,
"grpc.server",
cfg.serviceName,
cfg.analyticsRate,
)
defer func() { finishWithError(span, err, cfg) }()
}
// call the original handler with a new stream, which traces each send
// and recv if message tracing is enabled
err = handler(srv, &serverStream{
ServerStream: ss,
cfg: cfg,
method: info.FullMethod,
ctx: ctx,
})
return err
}
}
// UnaryServerInterceptor will trace requests to the given grpc server.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
cfg := new(config)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
span, ctx := startSpanFromContext(
ctx,
info.FullMethod,
"grpc.server",
cfg.serverServiceName(),
cfg.analyticsRate,
)
resp, err := handler(ctx, req)
finishWithError(span, err, cfg)
return resp, err
}
}