forked from thanos-io/thanos
/
prometheus.go
217 lines (186 loc) · 5.11 KB
/
prometheus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package testutil
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"os/exec"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)
// Prometheus represents a test instance for integration testing.
// It can be populated with data before being started.
type Prometheus struct {
dir string
db *tsdb.DB
running bool
cmd *exec.Cmd
addr string
}
func NewTSDB() (*tsdb.DB, error) {
dir, err := ioutil.TempDir("", "prometheus-test")
if err != nil {
return nil, err
}
return tsdb.Open(dir, nil, nil, &tsdb.Options{
BlockRanges: []int64{2 * 3600 * 1000},
RetentionDuration: math.MaxInt64,
})
}
// NewPrometheus creates a new test Prometheus instance that will listen on address.
func NewPrometheus() (*Prometheus, error) {
db, err := NewTSDB()
if err != nil {
return nil, err
}
// Just touch an empty config file. We don't need to actually scrape anything.
_, err = os.Create(filepath.Join(db.Dir(), "prometheus.yml"))
if err != nil {
return nil, err
}
return &Prometheus{
dir: db.Dir(),
db: db,
addr: "<prometheus-not-started>",
}, nil
}
// Start running the Prometheus instance and return.
func (p *Prometheus) Start() error {
p.running = true
if err := p.db.Close(); err != nil {
return err
}
port, err := FreePort()
if err != nil {
return err
}
p.addr = fmt.Sprintf("localhost:%d", port)
p.cmd = exec.Command(
"prometheus",
"--storage.tsdb.path="+p.db.Dir(),
"--web.listen-address="+p.addr,
"--config.file="+filepath.Join(p.db.Dir(), "prometheus.yml"),
)
go func() {
if b, err := p.cmd.CombinedOutput(); err != nil {
fmt.Fprintln(os.Stderr, "running Prometheus failed", err)
fmt.Fprintln(os.Stderr, string(b))
}
}()
time.Sleep(2 * time.Second)
return nil
}
// Addr gets correct address after Start method.
func (p *Prometheus) Addr() string {
return p.addr
}
// SetConfig updates the contents of the config file.
func (p *Prometheus) SetConfig(s string) (err error) {
f, err := os.Create(filepath.Join(p.dir, "prometheus.yml"))
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(nil, &err, f, "prometheus config")
_, err = f.Write([]byte(s))
return err
}
// Stop terminates Prometheus and clean up its data directory.
func (p *Prometheus) Stop() error {
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir())
}
time.Sleep(time.Second / 2)
return p.cleanup()
}
func (p *Prometheus) cleanup() error {
return os.RemoveAll(p.db.Dir())
}
// Appender returns a new appender to populate the Prometheus instance with data.
// All appenders must be closed before Start is called and no new ones must be opened
// afterwards.
func (p *Prometheus) Appender() tsdb.Appender {
if p.running {
panic("Appender must not be called after start")
}
return p.db.Appender()
}
// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer runutil.CloseWithErrCapture(log.NewNopLogger(), &err, h, "TSDB Head")
var g errgroup.Group
var timeStepSize = (maxt - mint) / int64(numSamples+1)
var batchSize = len(series) / runtime.GOMAXPROCS(0)
for len(series) > 0 {
l := batchSize
if len(series) < 1000 {
l = len(series)
}
batch := series[:l]
series = series[l:]
g.Go(func() error {
t := mint
for i := 0; i < numSamples; i++ {
app := h.Appender()
for _, lset := range batch {
_, err := app.Add(lset, t, rand.Float64())
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return errors.Wrap(err, "add sample")
}
}
if err := app.Commit(); err != nil {
return errors.Wrap(err, "commit")
}
t += timeStepSize
}
return nil
})
}
if err := g.Wait(); err != nil {
return id, err
}
c, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{maxt - mint}, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
id, err = c.Write(dir, h, mint, maxt)
if err != nil {
return id, errors.Wrap(err, "write block")
}
if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{
Labels: extLset.Map(),
Downsample: block.ThanosDownsampleMeta{Resolution: resolution},
Source: block.TestSource,
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}
if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil {
return id, errors.Wrap(err, "remove tombstones")
}
return id, nil
}