/
grpc_stats.go
126 lines (106 loc) · 3.36 KB
/
grpc_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package grpcutil
import (
"context"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/stats"
)
type GRPCStats struct {
Duration clock.Duration
Method string
Failed float64
Success float64
}
type contextKey struct{}
var statsContextKey = contextKey{}
// GRPCStatsHandler Implements the Prometheus collector interface. Such that when the /metrics handler is
// called this collector pulls all the stats from
type GRPCStatsHandler struct {
reqCh chan *GRPCStats
wg syncutil.WaitGroup
grpcRequestCount *prometheus.CounterVec
grpcRequestDuration *prometheus.SummaryVec
}
func NewGRPCStatsHandler() *GRPCStatsHandler {
c := &GRPCStatsHandler{
grpcRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_grpc_request_counts",
Help: "GRPC requests by status.",
}, []string{"status", "method"}),
grpcRequestDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_grpc_request_duration",
Help: "GRPC request durations in seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}, []string{"method"}),
}
c.run()
return c
}
func (c *GRPCStatsHandler) run() {
c.reqCh = make(chan *GRPCStats, 10000)
c.wg.Until(func(done chan struct{}) bool {
select {
case stat := <-c.reqCh:
c.grpcRequestCount.With(prometheus.Labels{"status": "failed", "method": stat.Method}).Add(stat.Failed)
c.grpcRequestCount.With(prometheus.Labels{"status": "success", "method": stat.Method}).Add(stat.Success)
c.grpcRequestDuration.With(prometheus.Labels{"method": stat.Method}).Observe(stat.Duration.Seconds())
case <-done:
return false
}
return true
})
}
func (c *GRPCStatsHandler) Describe(ch chan<- *prometheus.Desc) {
c.grpcRequestCount.Describe(ch)
c.grpcRequestDuration.Describe(ch)
}
func (c *GRPCStatsHandler) Collect(ch chan<- prometheus.Metric) {
c.grpcRequestCount.Collect(ch)
c.grpcRequestDuration.Collect(ch)
}
func (c *GRPCStatsHandler) Close() {
c.wg.Stop()
}
func (c *GRPCStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
rs := StatsFromContext(ctx)
if rs == nil {
return
}
switch t := s.(type) {
// case *stats.Begin:
// case *stats.InPayload:
// case *stats.InHeader:
// case *stats.InTrailer:
// case *stats.OutPayload:
// case *stats.OutHeader:
// case *stats.OutTrailer:
case *stats.End:
rs.Duration = t.EndTime.Sub(t.BeginTime)
if t.Error != nil {
rs.Failed = 1
} else {
rs.Success = 1
}
c.reqCh <- rs
}
}
func (c *GRPCStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {}
func (c *GRPCStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (c *GRPCStatsHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
return ContextWithStats(ctx, &GRPCStats{Method: tagInfo.FullMethodName})
}
// ContextWithStats Returns a new `context.Context` that holds a reference to `GRPCStats`.
func ContextWithStats(ctx context.Context, stats *GRPCStats) context.Context {
return context.WithValue(ctx, statsContextKey, stats)
}
// StatsFromContext Returns the `GRPCStats` previously associated with `ctx`.
func StatsFromContext(ctx context.Context) *GRPCStats {
val := ctx.Value(statsContextKey)
if rs, ok := val.(*GRPCStats); ok {
return rs
}
return nil
}