Skip to content

Commit

Permalink
Added file sd to rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Valkov committed Oct 4, 2018
1 parent f295159 commit 575c9b3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
64 changes: 62 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/improbable-eng/thanos/pkg/discovery"
)

// registerRule registers a rule command.
Expand Down Expand Up @@ -76,6 +77,9 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
bucketConfFile := cmd.Flag("objstore.config-file", "The object store configuration file path.").
PlaceHolder("<bucket.config.path>").String()

filesToWatch := cmd.Flag("filesd", "Path to file that contain addresses of query peers (repeatable).").
PlaceHolder("<path>").Strings()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand All @@ -97,6 +101,16 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
NoLockfile: true,
WALFlushInterval: 30 * time.Second,
}

var filesd *discovery.FileDiscoverer
if len(*filesToWatch) > 0 {
conf := &discovery.SDConfig{
Files: *filesToWatch,
RefreshInterval: 5 * time.Second,
}
filesd = discovery.NewFileDiscoverer(conf, logger)
}

return runRule(g,
logger,
reg,
Expand All @@ -116,6 +130,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
tsdbOpts,
name,
alertQueryURL,
filesd,
)
}
}
Expand All @@ -142,6 +157,7 @@ func runRule(
tsdbOpts *tsdb.Options,
component string,
alertQueryURL *url.URL,
fileSD *discovery.FileDiscoverer,
) error {
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_config_last_reload_successful",
Expand Down Expand Up @@ -169,6 +185,9 @@ func runRule(
})
}

// FileSD query addresses
addrFromFileSD := newFileSDAddrs()

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
Expand All @@ -181,8 +200,18 @@ func runRule(
return strings.Compare(ids[i], ids[j]) < 0
})

for _, i := range rand.Perm(len(ids)) {
vec, err := queryPrometheusInstant(ctx, logger, peers[ids[i]].QueryAPIAddr, q, t)
var addrs []string
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}
addrFromFileSD.mtx.Lock()
for _, fsdAddrs := range addrFromFileSD.addrs {
addrs = append(addrs, fsdAddrs...)
}
addrFromFileSD.mtx.Unlock()

for _, i := range rand.Perm(len(addrs)) {
vec, err := queryPrometheusInstant(ctx, logger, peers[addrs[i]].QueryAPIAddr, q, t)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,6 +331,37 @@ func runRule(
cancel()
})
}
// Run File Service Discovery and update the query addresses when the files are modified
{
if fileSD != nil {
var fileSDUpdates chan *discovery.Discoverable
ctx, cancel := context.WithCancel(context.Background())

fileSDUpdates = make(chan *discovery.Discoverable)

g.Add(func() error {
fileSD.Run(ctx, fileSDUpdates)
return nil
}, func(error) {
cancel()
})

g.Add(func() error {
for {
select {
case update := <-fileSDUpdates:
// TODO(ivan): resolve dns here maybe?
addrFromFileSD.update(update.Source, update.Services)
case <-ctx.Done():
return nil
}
}
}, func(error) {
cancel()
close(fileSDUpdates)
})
}
}

// Handle reload and termination interrupts.
reload := make(chan struct{}, 1)
Expand Down
2 changes: 2 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,7 @@ Flags:
in all alerts 'Source' field
--objstore.config-file=<bucket.config.path>
The object store configuration file path.
--filesd=<path> ... Path to file that contain addresses of query
peers (repeatable).

```

0 comments on commit 575c9b3

Please sign in to comment.