-
Notifications
You must be signed in to change notification settings - Fork 37
/
server.go
147 lines (124 loc) · 4.12 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// (c) Copyright IBM Corp. 2021
// (c) Copyright Instana Inc. 2020
//go:build go1.19
// +build go1.19
package instagrpc
import (
"context"
"net"
instana "github.com/instana/go-sensor"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// UnaryServerInterceptor returns a tracing interceptor to be used in grpc.NewServer() calls.
// This interceptor is responsible for extracting the Instana OpenTracing headers from incoming requests
// and staring a new span that can later be accessed inside the handler:
//
// if parent, ok := instana.SpanFromContext(ctx); ok {
// sp := parent.Tracer().StartSpan("child-span")
// defer sp.Finish()
// }
//
// If the handler returns an error or panics, the error message is then attached to the span logs.
func UnaryServerInterceptor(sensor instana.TracerLogger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
sp := startServerSpan(ctx, info.FullMethod, "unary", sensor)
defer sp.Finish()
// log request in case handler panics
defer func() {
if err := recover(); err != nil {
addRPCError(sp, err)
// re-throw
panic(err)
}
}()
m, err := handler(instana.ContextWithSpan(ctx, sp), req)
if err != nil {
addRPCError(sp, err)
}
return m, err
}
}
// StreamServerInterceptor returns a tracing interceptor to be used in grpc.NewServer() calls.
// This interceptor is responsible for extracting the Instana OpenTracing headers from incoming streaming
// requests and starting a new span that can later be accessed inside the handler:
//
// if parent, ok := instana.SpanFromContext(srv.Context()); ok {
// sp := parent.Tracer().StartSpan("child-span")
// defer sp.Finish()
// }
//
// If the handler returns an error or panics, the error message is then attached to the span logs.
func StreamServerInterceptor(sensor instana.TracerLogger) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
sp := startServerSpan(ss.Context(), info.FullMethod, "stream", sensor)
defer sp.Finish()
// log request in case handler panics
defer func() {
if err := recover(); err != nil {
addRPCError(sp, err)
// re-throw
panic(err)
}
}()
if err := handler(srv, &wrappedServerStream{ss, sp}); err != nil {
addRPCError(sp, err)
return err
}
return nil
}
}
func startServerSpan(ctx context.Context, method, callType string, sensor instana.TracerLogger) ot.Span {
tracer := sensor.Tracer()
opts := []ot.StartSpanOption{
ext.SpanKindRPCServer,
ot.Tags{
"rpc.flavor": "grpc",
"rpc.call": method,
"rpc.call_type": callType,
},
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return tracer.StartSpan("rpc-server", opts...)
}
host, port := extractServerAddr(md)
if host != "" {
opts = append(opts, ot.Tags{
"rpc.host": host,
"rpc.port": port,
})
}
switch wireContext, err := tracer.Extract(ot.HTTPHeaders, ot.HTTPHeadersCarrier(md)); err {
case nil:
opts = append(opts, ext.RPCServerOption(wireContext))
case ot.ErrSpanContextNotFound:
sensor.Logger().Debug("no tracing context found in request to ", method, ", starting a new trace")
case ot.ErrUnsupportedFormat:
sensor.Logger().Warn("unsupported grpc request context format for ", method)
default:
sensor.Logger().Error("failed to extract request context for ", method, ": ", err)
}
return tracer.StartSpan("rpc-server", opts...)
}
func extractServerAddr(md metadata.MD) (string, string) {
authority := md.Get(":authority")
if len(authority) == 0 {
return "", ""
}
host, port, err := net.SplitHostPort(authority[0])
if err != nil {
// take our best guess and use :authority as a host if the net.SplitHostPort() fails to parse
return authority[0], ""
}
return host, port
}
type wrappedServerStream struct {
grpc.ServerStream
Span ot.Span
}
func (ss wrappedServerStream) Context() context.Context {
return instana.ContextWithSpan(ss.ServerStream.Context(), ss.Span)
}