forked from thanos-io/thanos
/
prometheus.go
381 lines (328 loc) · 9.12 KB
/
prometheus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
package testutil
import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"syscall"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
)
const (
defaultPrometheusVersion = "v2.13.0"
defaultAlertmanagerVersion = "v0.20.0"
defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"
// Space delimited list of versions.
promVersionsEnvVar = "THANOS_TEST_PROMETHEUS_VERSIONS"
alertmanagerBinEnvVar = "THANOS_TEST_ALERTMANAGER_PATH"
minioBinEnvVar = "THANOS_TEST_MINIO_PATH"
)
func PrometheusBinary() string {
return prometheusBin(defaultPrometheusVersion)
}
func prometheusBin(version string) string {
return fmt.Sprintf("prometheus-%s", version)
}
func AlertmanagerBinary() string {
b := os.Getenv(alertmanagerBinEnvVar)
if b == "" {
return fmt.Sprintf("alertmanager-%s", defaultAlertmanagerVersion)
}
return b
}
func MinioBinary() string {
b := os.Getenv(minioBinEnvVar)
if b == "" {
return fmt.Sprintf("minio-%s", defaultMinioVersion)
}
return b
}
// Prometheus represents a test instance for integration testing.
// It can be populated with data before being started.
type Prometheus struct {
dir string
db *tsdb.DB
prefix string
version string
running bool
cmd *exec.Cmd
disabledCompaction bool
addr string
}
func NewTSDB() (*tsdb.DB, error) {
dir, err := ioutil.TempDir("", "prometheus-test")
if err != nil {
return nil, err
}
return tsdb.Open(dir, nil, nil, &tsdb.Options{
BlockRanges: []int64{2 * 3600 * 1000},
RetentionDuration: math.MaxInt64,
})
}
func ForeachPrometheus(t *testing.T, testFn func(t testing.TB, p *Prometheus)) {
vers := os.Getenv(promVersionsEnvVar)
if vers == "" {
vers = defaultPrometheusVersion
}
for _, ver := range strings.Split(vers, " ") {
if ok := t.Run(ver, func(t *testing.T) {
p, err := newPrometheus(ver, "")
Ok(t, err)
testFn(t, p)
Ok(t, p.Stop())
}); !ok {
return
}
}
}
// NewPrometheus creates a new test Prometheus instance that will listen on local address.
// DEPRECATED: Use ForeachPrometheus instead.
func NewPrometheus() (*Prometheus, error) {
return newPrometheus("", "")
}
// NewPrometheusOnPath creates a new test Prometheus instance that will listen on local address and given prefix path.
func NewPrometheusOnPath(prefix string) (*Prometheus, error) {
return newPrometheus("", prefix)
}
func newPrometheus(version string, prefix string) (*Prometheus, error) {
if version == "" {
version = defaultPrometheusVersion
}
db, err := NewTSDB()
if err != nil {
return nil, err
}
// Just touch an empty config file. We don't need to actually scrape anything.
_, err = os.Create(filepath.Join(db.Dir(), "prometheus.yml"))
if err != nil {
return nil, err
}
return &Prometheus{
dir: db.Dir(),
db: db,
prefix: prefix,
version: version,
addr: "<prometheus-not-started>",
}, nil
}
// Start running the Prometheus instance and return.
func (p *Prometheus) Start() error {
if p.running {
return errors.New("Already started")
}
if err := p.db.Close(); err != nil {
return err
}
return p.start()
}
func (p *Prometheus) start() error {
p.running = true
port, err := FreePort()
if err != nil {
return err
}
var extra []string
if p.disabledCompaction {
extra = append(extra,
"--storage.tsdb.min-block-duration=2h",
"--storage.tsdb.max-block-duration=2h",
)
}
p.addr = fmt.Sprintf("localhost:%d", port)
args := append([]string{
"--storage.tsdb.retention=2d", // Pass retention cause prometheus since 2.8.0 don't show default value for that flags in web/api: https://github.com/prometheus/prometheus/pull/5433.
"--storage.tsdb.path=" + p.db.Dir(),
"--web.listen-address=" + p.addr,
"--web.route-prefix=" + p.prefix,
"--web.enable-admin-api",
"--config.file=" + filepath.Join(p.db.Dir(), "prometheus.yml"),
}, extra...)
p.cmd = exec.Command(prometheusBin(p.version), args...)
p.cmd.SysProcAttr = SysProcAttr()
go func() {
if b, err := p.cmd.CombinedOutput(); err != nil {
fmt.Fprintln(os.Stderr, "running Prometheus failed", err)
fmt.Fprintln(os.Stderr, string(b))
}
}()
time.Sleep(2 * time.Second)
return nil
}
func (p *Prometheus) WaitPrometheusUp(ctx context.Context) error {
if !p.running {
return errors.New("method Start was not invoked.")
}
return runutil.Retry(time.Second, ctx.Done(), func() error {
r, err := http.Get(fmt.Sprintf("http://%s/-/ready", p.addr))
if err != nil {
return err
}
if r.StatusCode != 200 {
return errors.Errorf("Got non 200 response: %v", r.StatusCode)
}
return nil
})
}
func (p *Prometheus) Restart() error {
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrap(err, "failed to kill Prometheus. Kill it manually")
}
_ = p.cmd.Wait()
return p.start()
}
// Dir returns TSDB dir.
func (p *Prometheus) Dir() string {
return p.dir
}
// Addr returns correct address after Start method.
func (p *Prometheus) Addr() string {
return p.addr + p.prefix
}
func (p *Prometheus) DisableCompaction() {
p.disabledCompaction = true
}
// SetConfig updates the contents of the config file. By default it is empty.
func (p *Prometheus) SetConfig(s string) (err error) {
f, err := os.Create(filepath.Join(p.dir, "prometheus.yml"))
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(&err, f, "prometheus config")
_, err = f.Write([]byte(s))
return err
}
// Stop terminates Prometheus and clean up its data directory.
func (p *Prometheus) Stop() error {
if !p.running {
return nil
}
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir())
}
time.Sleep(time.Second / 2)
return p.cleanup()
}
func (p *Prometheus) cleanup() error {
p.running = false
return os.RemoveAll(p.db.Dir())
}
// Appender returns a new appender to populate the Prometheus instance with data.
// All appenders must be closed before Start is called and no new ones must be opened
// afterwards.
func (p *Prometheus) Appender() tsdb.Appender {
if p.running {
panic("Appender must not be called after start")
}
return p.db.Appender()
}
// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
}
// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block.
func CreateBlockWithTombstone(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true)
}
func createBlock(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
tombstones bool,
) (id ulid.ULID, err error) {
h, err := tsdb.NewHead(nil, nil, nil, 10000000000)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer runutil.CloseWithErrCapture(&err, h, "TSDB Head")
var g errgroup.Group
var timeStepSize = (maxt - mint) / int64(numSamples+1)
var batchSize = len(series) / runtime.GOMAXPROCS(0)
for len(series) > 0 {
l := batchSize
if len(series) < 1000 {
l = len(series)
}
batch := series[:l]
series = series[l:]
g.Go(func() error {
t := mint
for i := 0; i < numSamples; i++ {
app := h.Appender()
for _, lset := range batch {
_, err := app.Add(lset, t, rand.Float64())
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return errors.Wrap(err, "add sample")
}
}
if err := app.Commit(); err != nil {
return errors.Wrap(err, "commit")
}
t += timeStepSize
}
return nil
})
}
if err := g.Wait(); err != nil {
return id, err
}
c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}
if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Source: metadata.TestSource,
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}
if !tombstones {
if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil {
return id, errors.Wrap(err, "remove tombstones")
}
}
return id, nil
}