forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reloader.go
338 lines (303 loc) · 8.87 KB
/
reloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
// Package reloader contains helpers to trigger reloads of Prometheus instances
// on configuration changes and to substitute environment variables in config files.
//
// Reloader type is useful when you want to:
//
// * Watch on changes against certain file e.g (`cfgFile`) .
// * Optionally, specify different different output file for watched `cfgFile` (`cfgOutputFile`).
// This will also try decompress the `cfgFile` if needed and substitute ALL the envvars using Kubernetes substitution format: (`$(var)`)
// * Watch on changes against certain directories (`ruleDires`).
//
// Once any of those two changes Prometheus on given `reloadURL` will be notified, causing Prometheus to reload configuration and rules.
//
// This and below for reloader:
//
// u, _ := url.Parse("http://localhost:9090")
// rl := reloader.New(
// nil,
// reloader.ReloadURLFromBase(u),
// "/path/to/cfg",
// "/path/to/cfg.out",
// []string{"/path/to/dirs"},
// )
//
// The url of reloads can be generated with function ReloadURLFromBase().
// It will append the default path of reload into the given url:
//
// u, _ := url.Parse("http://localhost:9090")
// reloader.ReloadURLFromBase(u) // It will return "http://localhost:9090/-/reload"
//
// Start watching changes and stopped until the context gets canceled:
//
// ctx, cancel := context.WithCancel(context.Background())
// go func() {
// if err := rl.Watch(ctx); err != nil {
// log.Fatal(err)
// }
// }()
// // ...
// cancel()
//
// By default, reloader will make a schedule to check the given config files and dirs of sum of hash with the last result,
// even if it is no changes.
//
// A basic example of configuration template with environment variables:
//
// global:
// external_labels:
// replica: '$(HOSTNAME)'
package reloader
import (
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"hash"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/pkg/errors"
)
// Reloader can watch config files and trigger reloads of a Prometheus server.
// It optionally substitutes environment variables in the configuration.
// Referenced environment variables must be of the form `$(var)` (not `$var` or `${var}`).
type Reloader struct {
logger log.Logger
reloadURL *url.URL
cfgFile string
cfgOutputFile string
ruleDirs []string
ruleInterval time.Duration
retryInterval time.Duration
lastCfgHash []byte
lastRuleHash []byte
}
var firstGzipBytes = []byte{0x1f, 0x8b, 0x08}
// New creates a new reloader that watches the given config file and rule directory
// and triggers a Prometheus reload upon changes.
// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
func New(logger log.Logger, reloadURL *url.URL, cfgFile string, cfgOutputFile string, ruleDirs []string) *Reloader {
if logger == nil {
logger = log.NewNopLogger()
}
return &Reloader{
logger: logger,
reloadURL: reloadURL,
cfgFile: cfgFile,
cfgOutputFile: cfgOutputFile,
ruleDirs: ruleDirs,
ruleInterval: 3 * time.Minute,
retryInterval: 5 * time.Second,
}
}
// Watch starts to watch the config file and rules and process them until the context
// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if
// config or rules changed.
func (r *Reloader) Watch(ctx context.Context) error {
configWatcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "create watcher")
}
defer runutil.CloseWithLogOnErr(r.logger, configWatcher, "config watcher close")
if r.cfgFile != "" {
if err := configWatcher.Add(r.cfgFile); err != nil {
return errors.Wrap(err, "add config file watch")
}
level.Info(r.logger).Log(
"msg", "started watching config file for changes",
"in", r.cfgFile,
"out", r.cfgOutputFile)
err := r.apply(ctx)
if err != nil {
return err
}
}
tick := time.NewTicker(r.ruleInterval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
case event := <-configWatcher.Events:
if event.Name != r.cfgFile {
continue
}
case err := <-configWatcher.Errors:
level.Error(r.logger).Log("msg", "watch error", "err", err)
continue
}
err := r.apply(ctx)
if err != nil {
// Critical error.
return err
}
}
}
// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
// expand env vars into config file before reloading.
// Reload is retried in retryInterval until ruleInterval.
func (r *Reloader) apply(ctx context.Context) error {
var (
cfgHash []byte
ruleHash []byte
)
if r.cfgFile != "" {
h := sha256.New()
if err := hashFile(h, r.cfgFile); err != nil {
return errors.Wrap(err, "hash file")
}
cfgHash = h.Sum(nil)
if r.cfgOutputFile != "" {
b, err := ioutil.ReadFile(r.cfgFile)
if err != nil {
return errors.Wrap(err, "read file")
}
// detect and extract gzipped file
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")
b, err = ioutil.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}
b, err = expandEnv(b)
if err != nil {
return errors.Wrap(err, "expand environment variables")
}
if err := ioutil.WriteFile(r.cfgOutputFile, b, 0666); err != nil {
return errors.Wrap(err, "write file")
}
}
}
h := sha256.New()
for _, ruleDir := range r.ruleDirs {
walkDir, err := filepath.EvalSymlinks(ruleDir)
if err != nil {
return errors.Wrap(err, "ruleDir symlink eval")
}
err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}
// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return err
}
if targetFile.IsDir() {
return nil
}
if err := hashFile(h, path); err != nil {
return err
}
return nil
})
if err != nil {
return errors.Wrap(err, "build hash")
}
}
if len(r.ruleDirs) > 0 {
ruleHash = h.Sum(nil)
}
if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastRuleHash, ruleHash) {
// Nothing to do.
return nil
}
// Retry trigger reload until it succeeded or next tick is near.
retryCtx, cancel := context.WithTimeout(ctx, r.ruleInterval)
err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {
if err := r.triggerReload(ctx); err != nil {
return errors.Wrap(err, "trigger reload")
}
r.lastCfgHash = cfgHash
r.lastRuleHash = ruleHash
level.Info(r.logger).Log(
"msg", "Prometheus reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"rule_dirs", strings.Join(r.ruleDirs, ", "))
return nil
})
cancel()
if err != nil {
level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err)
}
return nil
}
func hashFile(h hash.Hash, fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
if _, err := h.Write([]byte{'\xff'}); err != nil {
return err
}
if _, err := h.Write([]byte(fn)); err != nil {
return err
}
if _, err := h.Write([]byte{'\xff'}); err != nil {
return err
}
if _, err := io.Copy(h, f); err != nil {
return err
}
return nil
}
func (r *Reloader) triggerReload(ctx context.Context) error {
req, err := http.NewRequest("POST", r.reloadURL.String(), nil)
if err != nil {
return errors.Wrap(err, "create request")
}
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrap(err, "reload request failed")
}
defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
if resp.StatusCode != 200 {
return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
}
return nil
}
// ReloadURLFromBase returns the standard Prometheus reload URL from its base URL.
func ReloadURLFromBase(u *url.URL) *url.URL {
r := *u
r.Path = path.Join(r.Path, "/-/reload")
return &r
}
var envRe = regexp.MustCompile(`\$\(([a-zA-Z_0-9]+)\)`)
func expandEnv(b []byte) (r []byte, err error) {
r = envRe.ReplaceAllFunc(b, func(n []byte) []byte {
if err != nil {
return nil
}
n = n[2 : len(n)-1]
v, ok := os.LookupEnv(string(n))
if !ok {
err = errors.Errorf("found reference to unset environment variable %q", n)
return nil
}
return []byte(v)
})
return r, err
}