Skip to content

Commit

Permalink
pkg/rule: retain original path for rule files
Browse files Browse the repository at this point in the history
This change ensures that the /api/v1/rules endpoint returns the original
path of the rule file instead of the path of the temporary file
generated by Thanos ruler.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Nov 25, 2019
1 parent bacbb78 commit 3d7fa48
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 137 deletions.
21 changes: 10 additions & 11 deletions cmd/thanos/rule.go
Expand Up @@ -291,7 +291,7 @@ func runRule(
var (
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgrs = thanosrule.Managers{}
ruleMgr = thanosrule.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
Expand Down Expand Up @@ -338,15 +338,16 @@ func runRule(
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, dnsProvider, duplicatedQuery, ruleEvalWarnings, s)

ruleMgrs[s] = rules.NewManager(&opts)
mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
g.Add(func() error {
ruleMgrs[s].Run()
mgr.Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
ruleMgrs[s].Stop()
mgr.Stop()
})
}
}
Expand Down Expand Up @@ -447,7 +448,7 @@ func runRule(

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))

if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
if err := ruleMgr.Update(evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
Expand All @@ -457,10 +458,8 @@ func runRule(
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

rulesLoaded.Reset()
for s, mgr := range ruleMgrs {
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
for _, group := range ruleMgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}

}
Expand Down Expand Up @@ -547,9 +546,9 @@ func runRule(

ins := extpromhttp.NewInstrumentationMiddleware(reg)

ui.NewRuleUI(logger, reg, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)
ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, ruleMgrs)
api := v1.NewAPI(logger, reg, ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
Expand Down
2 changes: 1 addition & 1 deletion pkg/rule/api/v1.go
Expand Up @@ -69,7 +69,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) {
for _, grp := range api.ruleRetriever.RuleGroups() {
apiRuleGroup := &RuleGroup{
Name: grp.Name(),
File: grp.File(),
File: grp.OriginalFile(),
Interval: grp.Interval().Seconds(),
Rules: []rule{},
PartialResponseStrategy: grp.PartialResponseStrategy.String(),
Expand Down
14 changes: 10 additions & 4 deletions pkg/rule/api/v1_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
qapi "github.com/thanos-io/thanos/pkg/query/api"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

// NewStorage returns a new storage for testing purposes
Expand Down Expand Up @@ -92,8 +93,12 @@ func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group {
recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{})
r = append(r, recordingRule)

group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts)
return []thanosrule.Group{thanosrule.Group{Group: group}}
return []thanosrule.Group{
thanosrule.Group{
Group: rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts),
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
}
}

func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule {
Expand Down Expand Up @@ -189,7 +194,7 @@ func testEndpoints(t *testing.T, api *API) {
RuleGroups: []*RuleGroup{
{
Name: "grp",
File: "/path/to/file",
File: "",
Interval: 1,
PartialResponseStrategy: "WARN",
Rules: []rule{
Expand Down Expand Up @@ -263,13 +268,14 @@ func testEndpoints(t *testing.T, api *API) {
}

func assertAPIError(t *testing.T, got *qapi.ApiError) {
t.Helper()
if got != nil {
t.Fatalf("Unexpected error: %s", got)
return
}
}

func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) {
t.Helper()
if !reflect.DeepEqual(exp, got) {
respJSON, err := json.Marshal(got)
if err != nil {
Expand Down
78 changes: 56 additions & 22 deletions pkg/rule/rule.go
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -21,9 +21,14 @@ const tmpRuleDir = ".tmp-rules"

type Group struct {
*rules.Group
originalFile string
PartialResponseStrategy storepb.PartialResponseStrategy
}

func (g Group) OriginalFile() string {
return g.originalFile
}

type AlertingRule struct {
*rules.AlertingRule
PartialResponseStrategy storepb.PartialResponseStrategy
Expand All @@ -38,21 +43,45 @@ type RuleGroup struct {
PartialResponseStrategy *storepb.PartialResponseStrategy
}

type Managers map[storepb.PartialResponseStrategy]*rules.Manager
type Manager struct {
workDir string
mgrs map[storepb.PartialResponseStrategy]*rules.Manager

mtx sync.RWMutex
ruleFiles map[string]string
}

func (m Managers) RuleGroups() []Group {
func NewManager(dataDir string) *Manager {
return &Manager{
workDir: filepath.Join(dataDir, tmpRuleDir),
mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager),
ruleFiles: make(map[string]string),
}
}

func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.Manager) {
m.mgrs[s] = mgr
}

func (m *Manager) RuleGroups() []Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
var res []Group
for s, r := range m {
for s, r := range m.mgrs {
for _, group := range r.RuleGroups() {
res = append(res, Group{Group: group, PartialResponseStrategy: s})
res = append(res, Group{
Group: group,
PartialResponseStrategy: s,
originalFile: m.ruleFiles[group.File()],
})
}
}
return res
}

func (m Managers) AlertingRules() []AlertingRule {
func (m *Manager) AlertingRules() []AlertingRule {
var res []AlertingRule
for s, r := range m {
for s, r := range m.mgrs {
for _, r := range r.AlertingRules() {
res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s})
}
Expand All @@ -67,12 +96,12 @@ func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error {

errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ","))
if err := unmarshal(&rs); err != nil {
return errors.Wrapf(err, errMsg)
return errors.Wrap(err, errMsg)
}

rg := rulefmt.RuleGroup{}
if err := unmarshal(&rg); err != nil {
return errors.Wrapf(err, errMsg)
return errors.Wrap(err, "failed to unmarshal rulefmt.RuleGroup")
}

p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)]
Expand Down Expand Up @@ -110,17 +139,18 @@ func (r RuleGroup) MarshalYAML() (interface{}, error) {

// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on
// special field in RuleGroup file.
func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []string) error {
func (m *Manager) Update(evalInterval time.Duration, files []string) error {
var (
errs = tsdberrors.MultiError{}
errs tsdberrors.MultiError
filesByStrategy = map[storepb.PartialResponseStrategy][]string{}
ruleFiles = map[string]string{}
)

if err := os.RemoveAll(path.Join(dataDir, tmpRuleDir)); err != nil {
return errors.Wrapf(err, "rm %s", path.Join(dataDir, tmpRuleDir))
if err := os.RemoveAll(m.workDir); err != nil {
return errors.Wrapf(err, "failed to remove %s", m.workDir)
}
if err := os.MkdirAll(path.Join(dataDir, tmpRuleDir), os.ModePerm); err != nil {
return errors.Wrapf(err, "mkdir %s", path.Join(dataDir, tmpRuleDir))
if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil {
return errors.Wrapf(err, "failed to create %s", m.workDir)
}

for _, fn := range files {
Expand All @@ -132,7 +162,7 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st

var rg RuleGroups
if err := yaml.Unmarshal(b, &rg); err != nil {
errs = append(errs, err)
errs = append(errs, errors.Wrap(err, fn))
continue
}

Expand All @@ -153,33 +183,37 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
for s, rg := range groupsByStrategy {
b, err := yaml.Marshal(rg)
if err != nil {
errs = append(errs, err)
errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn))
continue
}

newFn := path.Join(dataDir, tmpRuleDir, filepath.Base(fn)+"."+s.String())
newFn := filepath.Join(m.workDir, filepath.Base(fn)+"."+s.String())
if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil {
errs = append(errs, err)
errs = append(errs, errors.Wrap(err, newFn))
continue
}

filesByStrategy[s] = append(filesByStrategy[s], newFn)
ruleFiles[newFn] = fn
}
}

m.mtx.Lock()
for s, fs := range filesByStrategy {
mgr, ok := (*m)[s]
mgr, ok := m.mgrs[s]
if !ok {
errs = append(errs, errors.Errorf("no updater found for %v", s))
errs = append(errs, errors.Errorf("no manager found for %v", s))
continue
}
// We add external labels in `pkg/alert.Queue`.
// TODO(bwplotka): Investigate if we should put ext labels here or not.
if err := mgr.Update(evalInterval, fs, nil); err != nil {
errs = append(errs, err)
errs = append(errs, errors.Wrapf(err, "strategy %s", s))
continue
}
}
m.ruleFiles = ruleFiles
m.mtx.Unlock()

return errs.Err()
}

0 comments on commit 3d7fa48

Please sign in to comment.