forked from cortexproject/cortex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinstrumentation.go
111 lines (95 loc) · 3.17 KB
/
instrumentation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package gcp
import (
"io"
"time"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
var bigtableRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "bigtable_request_duration_seconds",
Help: "Time spent doing Bigtable requests.",
// Bigtable latency seems to range from a few ms to a few hundred ms and is
// important. So use 6 buckets from 1ms to 1s.
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})
func init() {
prometheus.MustRegister(bigtableRequestDuration)
}
func instrumentation() []option.ClientOption {
return []option.ClientOption{
option.WithGRPCDialOption(
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
grpcUnaryInstrumentation,
)),
),
option.WithGRPCDialOption(
grpc.WithStreamInterceptor(grpcStreamInstrumentation),
),
}
}
func grpcUnaryInstrumentation(
ctx context.Context, method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, resp, cc, opts...)
bigtableRequestDuration.WithLabelValues(method, instrument.ErrorCode(err)).Observe(time.Now().Sub(start).Seconds())
return err
}
func grpcStreamInstrumentation(
ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
start := time.Now()
stream, err := streamer(ctx, desc, cc, method, opts...)
return &instrumentedClientStream{
start: start,
method: method,
ClientStream: stream,
}, err
}
type instrumentedClientStream struct {
start time.Time
method string
grpc.ClientStream
}
func (s *instrumentedClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
return err
}
if err == io.EOF {
bigtableRequestDuration.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Now().Sub(s.start).Seconds())
} else {
bigtableRequestDuration.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
return err
}
if err == io.EOF {
bigtableRequestDuration.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Now().Sub(s.start).Seconds())
} else {
bigtableRequestDuration.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) Header() (metadata.MD, error) {
md, err := s.ClientStream.Header()
if err != nil {
bigtableRequestDuration.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds())
}
return md, err
}