forked from elastic/apm-agent-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
118 lines (108 loc) · 2.95 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package apmgrpc
import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"github.com/elastic/apm-agent-go"
)
// NewUnaryServerInterceptor returns a grpc.UnaryServerInterceptor that
// traces gRPC requests with the given options.
//
// The interceptor will trace transactions with the "grpc" type for each
// incoming request. The transaction will be added to the context, so
// server methods can use elasticapm.StartSpan with the provided context.
//
// By default, the interceptor will trace with elasticapm.DefaultTracer,
// and will not recover any panics. Use WithTracer to specify an
// alternative tracer, and WithRecovery to enable panic recovery.
func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor {
opts := serverOptions{
tracer: elasticapm.DefaultTracer,
recover: false,
}
for _, o := range o {
o(&opts)
}
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
if !opts.tracer.Active() {
return handler(ctx, req)
}
tx := opts.tracer.StartTransaction(info.FullMethod, "grpc")
ctx = elasticapm.ContextWithTransaction(ctx, tx)
defer tx.End()
if tx.Sampled() {
p, ok := peer.FromContext(ctx)
if ok {
grpcContext := map[string]interface{}{
"peer.address": p.Addr.String(),
}
if p.AuthInfo != nil {
grpcContext["auth"] = map[string]interface{}{
"type": p.AuthInfo.AuthType(),
}
}
tx.Context.SetCustom("grpc", grpcContext)
}
}
defer func() {
r := recover()
if r != nil {
e := opts.tracer.Recovered(r, tx)
e.Handled = opts.recover
e.Send()
if opts.recover {
err = status.Errorf(codes.Internal, "%s", r)
} else {
panic(r)
}
}
}()
resp, err = handler(ctx, req)
if err == nil {
tx.Result = codes.OK.String()
} else {
statusCode := codes.Unknown
s, ok := status.FromError(err)
if ok {
statusCode = s.Code()
}
tx.Result = statusCode.String()
}
return resp, err
}
}
type serverOptions struct {
tracer *elasticapm.Tracer
recover bool
}
// ServerOption sets options for server-side tracing.
type ServerOption func(*serverOptions)
// WithTracer returns a ServerOption which sets t as the tracer
// to use for tracing server requests.
func WithTracer(t *elasticapm.Tracer) ServerOption {
if t == nil {
panic("t == nil")
}
return func(o *serverOptions) {
o.tracer = t
}
}
// WithRecovery returns a ServerOption which enables panic recovery
// in the gRPC server interceptor.
//
// The interceptor will report panics as errors to Elastic APM,
// but unless this is enabled, they will still cause the server to
// be terminated. With recovery enabled, panics will be translated
// to gRPC errors with the code gprc/codes.Internal.
func WithRecovery() ServerOption {
return func(o *serverOptions) {
o.recover = true
}
}