/
config.go
308 lines (267 loc) · 9.15 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package receive
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"io"
"os"
"path/filepath"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"gopkg.in/fsnotify.v1"
)
var (
// An errParseConfigurationFile is returned by the ConfigWatcher when parsing failed.
errParseConfigurationFile = errors.New("configuration file is not parsable")
// An errEmptyConfigurationFile is returned by the ConfigWatcher when attempting to load an empty configuration file.
errEmptyConfigurationFile = errors.New("configuration file is empty")
)
type ReceiverMode string
const (
RouterOnly ReceiverMode = "RouterOnly"
IngestorOnly ReceiverMode = "IngestorOnly"
RouterIngestor ReceiverMode = "RouterIngestor"
)
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
ExternalLabels []string `json:"external_labels"`
}
// ConfigWatcher is able to watch a file containing a hashring configuration
// for updates.
type ConfigWatcher struct {
ch chan []HashringConfig
path string
interval time.Duration
logger log.Logger
watcher *fsnotify.Watcher
hashGauge prometheus.Gauge
successGauge prometheus.Gauge
lastSuccessTimeGauge prometheus.Gauge
changesCounter prometheus.Counter
errorCounter prometheus.Counter
refreshCounter prometheus.Counter
hashringNodesGauge *prometheus.GaugeVec
hashringTenantsGauge *prometheus.GaugeVec
// lastLoadedConfigHash is the hash of the last successfully loaded configuration.
lastLoadedConfigHash float64
}
// NewConfigWatcher creates a new ConfigWatcher.
func NewConfigWatcher(logger log.Logger, reg prometheus.Registerer, path string, interval model.Duration) (*ConfigWatcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, errors.Wrap(err, "creating file watcher")
}
if err := watcher.Add(path); err != nil {
return nil, errors.Wrapf(err, "adding path %s to file watcher", path)
}
c := &ConfigWatcher{
ch: make(chan []HashringConfig),
path: path,
interval: time.Duration(interval),
logger: logger,
watcher: watcher,
hashGauge: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_hash",
Help: "Hash of the currently loaded hashring configuration file.",
}),
successGauge: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_successful",
Help: "Whether the last hashring configuration file reload attempt was successful.",
}),
lastSuccessTimeGauge: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful hashring configuration file reload.",
}),
changesCounter: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_changes_total",
Help: "The number of times the hashrings configuration file has changed.",
}),
errorCounter: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_errors_total",
Help: "The number of errors watching the hashrings configuration file.",
}),
refreshCounter: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_refreshes_total",
Help: "The number of refreshes of the hashrings configuration file.",
}),
hashringNodesGauge: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_receive_hashring_nodes",
Help: "The number of nodes per hashring.",
},
[]string{"name"}),
hashringTenantsGauge: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_receive_hashring_tenants",
Help: "The number of tenants per hashring.",
},
[]string{"name"}),
}
return c, nil
}
// Run starts the ConfigWatcher until the given context is canceled.
func (cw *ConfigWatcher) Run(ctx context.Context) {
defer cw.Stop()
cw.refresh(ctx)
ticker := time.NewTicker(cw.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case event := <-cw.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if event.Name == "" {
break
}
// Everything but a CHMOD requires rereading.
// If the file was removed, we can't read it, so skip.
if event.Op^(fsnotify.Chmod|fsnotify.Remove) == 0 {
break
}
// Changes to a file can spawn various sequences of events with
// different combinations of operations. For all practical purposes
// this is inaccurate.
// The most reliable solution is to reload everything if anything happens.
cw.refresh(ctx)
case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose
// those files forever.
cw.refresh(ctx)
case err := <-cw.watcher.Errors:
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "error watching file", "err", err)
}
}
}
}
// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
}
// ValidateConfig returns an error if the configuration that's being watched is not valid.
func (cw *ConfigWatcher) ValidateConfig() ([]HashringConfig, error) {
config, _, err := loadConfig(cw.logger, cw.path)
return config, err
}
// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)
done := make(chan struct{})
defer close(done)
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
}
close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}
// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()
config, cfgHash, err := loadConfig(cw.logger, cw.path)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
return
}
// If there was no change to the configuration, return early.
if cw.lastLoadedConfigHash == cfgHash {
return
}
cw.changesCounter.Inc()
// Save the last known configuration.
cw.lastLoadedConfigHash = cfgHash
cw.hashGauge.Set(cfgHash)
cw.successGauge.Set(1)
cw.lastSuccessTimeGauge.SetToCurrentTime()
for _, c := range config {
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints)))
cw.hashringTenantsGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Tenants)))
}
level.Debug(cw.logger).Log("msg", "refreshed hashring config")
select {
case <-ctx.Done():
return
case cw.ch <- config:
return
}
}
// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}
config, err := parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path)
}
return config, hashAsMetricValue(cfgContent), nil
}
// readFile reads the configuration file and returns content of configuration file.
func readFile(logger log.Logger, path string) ([]byte, error) {
fd, err := os.Open(filepath.Clean(path))
if err != nil {
return nil, err
}
defer func() {
if err := fd.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path)
}
}()
return io.ReadAll(fd)
}
// parseConfig parses the raw configuration content and returns a HashringConfig.
func parseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
}
// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}