-
Notifications
You must be signed in to change notification settings - Fork 2k
/
gate.go
271 lines (233 loc) · 6.98 KB
/
gate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package gate
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promgate "github.com/prometheus/prometheus/util/gate"
)
// Gate controls the maximum number of concurrently running and waiting queries.
//
// Example of use:
//
// g := gate.New(r, 5)
//
// if err := g.Start(ctx); err != nil {
// return
// }
// defer g.Done()
type Gate interface {
// Start initiates a new request and waits until it's our turn to fulfill a request.
Start(ctx context.Context) error
// Done finishes a query.
Done()
}
// Keeper is used to create multiple gates sharing the same metrics.
//
// Deprecated: when Keeper is used to create several gates, the metric tracking
// the number of in-flight metric isn't meaningful because it is hard to say
// whether requests are being blocked or not. For clients that call
// gate.(*Keeper).NewGate only once, it is recommended to use gate.New()
// instead. Otherwise it is recommended to use the
// github.com/prometheus/prometheus/util/gate package directly and wrap the
// returned gate with gate.InstrumentGateDuration().
type Keeper struct {
reg prometheus.Registerer
}
// NewKeeper creates a new Keeper.
//
// Deprecated: see Keeper.
func NewKeeper(reg prometheus.Registerer) *Keeper {
return &Keeper{
reg: reg,
}
}
// NewGate returns a new Gate ready for use.
//
// Deprecated: see Keeper.
func (k *Keeper) NewGate(maxConcurrent int) Gate {
return New(k.reg, maxConcurrent, Queries)
}
type OperationName string
const (
Queries OperationName = "queries"
Selects OperationName = "selects"
Gets OperationName = "gets"
Sets OperationName = "sets"
WriteRequests OperationName = "write_requests"
)
type GateFactory interface {
New() Gate
}
type gateProducer struct {
reg prometheus.Registerer
opName OperationName
maxConcurrent int
durationHist prometheus.Histogram
total prometheus.Counter
inflight prometheus.Gauge
}
func (g *gateProducer) New() Gate {
var gate Gate
if g.maxConcurrent <= 0 {
gate = NewNoop()
} else {
gate = promgate.New(g.maxConcurrent)
}
return InstrumentGateDuration(
g.durationHist,
InstrumentGateTotal(
g.total,
InstrumentGateInFlight(
g.inflight,
gate,
),
),
)
}
var (
maxGaugeOpts = func(opName OperationName) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Name: fmt.Sprintf("gate_%s_max", opName),
Help: fmt.Sprintf("Maximum number of concurrent %s.", opName),
}
}
inFlightGaugeOpts = func(opName OperationName) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Name: fmt.Sprintf("gate_%s_in_flight", opName),
Help: fmt.Sprintf("Number of %s that are currently in flight.", opName),
}
}
totalCounterOpts = func(opName OperationName) prometheus.CounterOpts {
return prometheus.CounterOpts{
Name: fmt.Sprintf("gate_%s_total", opName),
Help: fmt.Sprintf("Total number of %s.", opName),
}
}
durationHistogramOpts = func(opName OperationName) prometheus.HistogramOpts {
return prometheus.HistogramOpts{
Name: fmt.Sprintf("gate_%s_duration_seconds", opName),
Help: fmt.Sprintf("How many seconds it took for %s to wait at the gate.", opName),
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
}
)
// NewGateFactory creates a Gate factory. They act like Gate but each produced Gate
// acts individually in terms of the limit and they have unified metrics.
func NewGateFactory(reg prometheus.Registerer, maxConcurrent int, opName OperationName) GateFactory {
promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent))
return &gateProducer{
reg: reg,
opName: opName,
maxConcurrent: maxConcurrent,
durationHist: promauto.With(reg).NewHistogram(durationHistogramOpts(opName)),
total: promauto.With(reg).NewCounter(totalCounterOpts(opName)),
inflight: promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)),
}
}
// New returns an instrumented gate limiting the number of requests being
// executed concurrently.
//
// The gate implementation is based on the
// github.com/prometheus/prometheus/util/gate package.
//
// It can be called several times but not with the same registerer otherwise it
// will panic when trying to register the same metric multiple times.
func New(reg prometheus.Registerer, maxConcurrent int, opName OperationName) Gate {
promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent))
var gate Gate
if maxConcurrent <= 0 {
gate = NewNoop()
} else {
gate = promgate.New(maxConcurrent)
}
return InstrumentGateDuration(
promauto.With(reg).NewHistogram(durationHistogramOpts(opName)),
InstrumentGateTotal(
promauto.With(reg).NewCounter(totalCounterOpts(opName)),
InstrumentGateInFlight(
promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)),
gate,
),
),
)
}
type noopGate struct{}
func (noopGate) Start(context.Context) error { return nil }
func (noopGate) Done() {}
func NewNoop() Gate { return noopGate{} }
type instrumentedDurationGate struct {
g Gate
duration prometheus.Observer
}
// InstrumentGateDuration instruments the provided Gate to track how much time
// the request has been waiting in the gate.
func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate {
return &instrumentedDurationGate{
g: g,
duration: duration,
}
}
// Start implements the Gate interface.
func (g *instrumentedDurationGate) Start(ctx context.Context) error {
start := time.Now()
defer func() {
g.duration.Observe(time.Since(start).Seconds())
}()
return g.g.Start(ctx)
}
// Done implements the Gate interface.
func (g *instrumentedDurationGate) Done() {
g.g.Done()
}
type instrumentedInFlightGate struct {
g Gate
inflight prometheus.Gauge
}
// InstrumentGateInFlight instruments the provided Gate to track how many
// requests are currently in flight.
func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate {
return &instrumentedInFlightGate{
g: g,
inflight: inflight,
}
}
// Start implements the Gate interface.
func (g *instrumentedInFlightGate) Start(ctx context.Context) error {
if err := g.g.Start(ctx); err != nil {
return err
}
g.inflight.Inc()
return nil
}
// Done implements the Gate interface.
func (g *instrumentedInFlightGate) Done() {
g.inflight.Dec()
g.g.Done()
}
type instrumentedTotalGate struct {
g Gate
total prometheus.Counter
}
// InstrumentGateTotal instruments the provided Gate to track total requests.
func InstrumentGateTotal(total prometheus.Counter, g Gate) Gate {
return &instrumentedTotalGate{
g: g,
total: total,
}
}
// Start implements the Gate interface.
func (g *instrumentedTotalGate) Start(ctx context.Context) error {
g.total.Inc()
if err := g.g.Start(ctx); err != nil {
return err
}
return nil
}
// Done implements the Gate interface.
func (g *instrumentedTotalGate) Done() {
g.g.Done()
}