-
Notifications
You must be signed in to change notification settings - Fork 568
/
promutil.go
120 lines (104 loc) · 3.79 KB
/
promutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Package promutil contains utilities for collecting Prometheus metrics.
package promutil
import (
"io"
"net/http"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
var (
inFlightMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "http_client_in_flight_requests",
Help: "A gauge of in-flight requests being made against an HTTP API, by client.",
}, []string{"client"})
requestCountMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "http_client_requests_total",
Help: "A summary of requests made against an HTTP API, by client, status code, and request method.",
}, []string{"client", "code", "method"})
requestTimeMetric = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "http_client_request_duration_seconds",
Help: "A histogram of request timing against an HTTP API, by client and request method.",
Buckets: []float64{0.001, 0.005, 0.01, 0.1, 1, 10, 30, 60, 300, 600, 1800, 3600},
}, []string{"client", "method"})
)
type loggingRT struct {
name string
underlying http.RoundTripper
}
func (rt *loggingRT) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
ctx := pctx.Child(req.Context(), "outgoingHttp", pctx.WithFields([]log.Field{
zap.String("name", rt.name),
zap.String("method", req.Method),
zap.String("uri", req.URL.String()),
}...))
// Log the start of long HTTP requests.
timer := time.AfterFunc(10*time.Second, func() {
var ff []log.Field
if dl, ok := req.Context().Deadline(); ok {
ff = append(ff, zap.Duration("deadline", time.Until(dl)))
}
ff = append(ff, zap.Duration("duration", time.Since(start)))
log.Info(ctx, "ongoing long http request", ff...)
})
defer timer.Stop()
res, err := rt.underlying.RoundTrip(req)
if err != nil {
log.Info(ctx, "outgoing http request completed with error", zap.Error(err))
return res, errors.EnsureStack(err)
}
if res != nil {
log.Debug(ctx, "outgoing http request complete",
zap.Duration("duration", time.Since(start)), zap.String("status", res.Status))
}
return res, errors.EnsureStack(err)
}
// InstrumentRoundTripper returns an http.RoundTripper that collects Prometheus metrics; delegating
// to the underlying RoundTripper to actually make requests.
func InstrumentRoundTripper(name string, rt http.RoundTripper) http.RoundTripper {
if rt == nil {
rt = http.DefaultTransport
}
ls := prometheus.Labels{"client": name}
return promhttp.InstrumentRoundTripperInFlight(
inFlightMetric.With(ls),
promhttp.InstrumentRoundTripperDuration(
requestTimeMetric.MustCurryWith(ls),
promhttp.InstrumentRoundTripperCounter(
requestCountMetric.MustCurryWith(ls),
&loggingRT{name: name, underlying: rt})))
}
// Adder is something that can be added to.
type Adder interface {
Add(float64) // Implemented by prometheus.Counter.
}
// In the event that Prometheus changes their API, you'll be reading this comment.
var _ Adder = prometheus.NewCounter(prometheus.CounterOpts{})
// CountingReader exports a count of bytes read from an underlying io.Reader.
type CountingReader struct {
io.Reader
Counter Adder
}
// Read implements io.Reader.
func (r *CountingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.Counter.Add(float64(n))
return
}
// CountingWriter exports a count of bytes written to an underlying io.Writer.
type CountingWriter struct {
io.Writer
Counter Adder
}
// Write implements io.Writer.
func (w *CountingWriter) Write(p []byte) (n int, err error) {
n, err = w.Writer.Write(p)
w.Counter.Add(float64(n))
return
}