From 3d7fa4858201de78f19903853bb4147164855838 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 21 Nov 2019 18:01:10 +0100 Subject: [PATCH] pkg/rule: retain original path for rule files This change ensures that the /api/v1/rules endpoint returns the original path of the rule file instead of the path of the temporary file generated by Thanos ruler. Signed-off-by: Simon Pasquier --- cmd/thanos/rule.go | 21 +++++----- pkg/rule/api/v1.go | 2 +- pkg/rule/api/v1_test.go | 14 +++++-- pkg/rule/rule.go | 78 +++++++++++++++++++++++++----------- pkg/rule/rule_test.go | 88 +++++++++++++++++++++++++---------------- pkg/ui/rule.go | 23 ++++++----- test/e2e/rule_test.go | 74 ++++++++++++++++++++++++++-------- test/e2e/spinup_test.go | 54 +++++++++---------------- 8 files changed, 217 insertions(+), 137 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index b34016376fc..9660dbd275d 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -291,7 +291,7 @@ func runRule( var ( alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgrs = thanosrule.Managers{} + ruleMgr = thanosrule.NewManager(dataDir) ) { notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { @@ -338,15 +338,16 @@ func runRule( opts.Context = ctx opts.QueryFunc = queryFunc(logger, dnsProvider, duplicatedQuery, ruleEvalWarnings, s) - ruleMgrs[s] = rules.NewManager(&opts) + mgr := rules.NewManager(&opts) + ruleMgr.SetRuleManager(s, mgr) g.Add(func() error { - ruleMgrs[s].Run() + mgr.Run() <-ctx.Done() return nil }, func(error) { cancel() - ruleMgrs[s].Stop() + mgr.Stop() }) } } @@ -447,7 +448,7 @@ func runRule( level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) - if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil { + if err := ruleMgr.Update(evalInterval, files); err != nil { configSuccess.Set(0) level.Error(logger).Log("msg", "reloading rules failed", "err", err) continue @@ -457,10 +458,8 @@ func runRule( configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9) rulesLoaded.Reset() - for s, mgr := range ruleMgrs { - for _, group := range mgr.RuleGroups() { - rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) - } + for _, group := range ruleMgr.RuleGroups() { + rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) } } @@ -547,9 +546,9 @@ func runRule( ins := extpromhttp.NewInstrumentationMiddleware(reg) - ui.NewRuleUI(logger, reg, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) + ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, ruleMgrs) + api := v1.NewAPI(logger, reg, ruleMgr) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. diff --git a/pkg/rule/api/v1.go b/pkg/rule/api/v1.go index f49eb722913..bacdb5570a8 100644 --- a/pkg/rule/api/v1.go +++ b/pkg/rule/api/v1.go @@ -69,7 +69,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { for _, grp := range api.ruleRetriever.RuleGroups() { apiRuleGroup := &RuleGroup{ Name: grp.Name(), - File: grp.File(), + File: grp.OriginalFile(), Interval: grp.Interval().Seconds(), Rules: []rule{}, PartialResponseStrategy: grp.PartialResponseStrategy.String(), diff --git a/pkg/rule/api/v1_test.go b/pkg/rule/api/v1_test.go index d949441740c..ce71f5d16b9 100644 --- a/pkg/rule/api/v1_test.go +++ b/pkg/rule/api/v1_test.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/storage/tsdb" qapi "github.com/thanos-io/thanos/pkg/query/api" thanosrule "github.com/thanos-io/thanos/pkg/rule" + "github.com/thanos-io/thanos/pkg/store/storepb" ) // NewStorage returns a new storage for testing purposes @@ -92,8 +93,12 @@ func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group { recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) r = append(r, recordingRule) - group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) - return []thanosrule.Group{thanosrule.Group{Group: group}} + return []thanosrule.Group{ + thanosrule.Group{ + Group: rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts), + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + } } func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule { @@ -189,7 +194,7 @@ func testEndpoints(t *testing.T, api *API) { RuleGroups: []*RuleGroup{ { Name: "grp", - File: "/path/to/file", + File: "", Interval: 1, PartialResponseStrategy: "WARN", Rules: []rule{ @@ -263,13 +268,14 @@ func testEndpoints(t *testing.T, api *API) { } func assertAPIError(t *testing.T, got *qapi.ApiError) { + t.Helper() if got != nil { t.Fatalf("Unexpected error: %s", got) - return } } func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) { + t.Helper() if !reflect.DeepEqual(exp, got) { respJSON, err := json.Marshal(got) if err != nil { diff --git a/pkg/rule/rule.go b/pkg/rule/rule.go index 5bad87e4c18..970ffbe8bca 100644 --- a/pkg/rule/rule.go +++ b/pkg/rule/rule.go @@ -4,9 +4,9 @@ import ( "fmt" "io/ioutil" "os" - "path" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -21,9 +21,14 @@ const tmpRuleDir = ".tmp-rules" type Group struct { *rules.Group + originalFile string PartialResponseStrategy storepb.PartialResponseStrategy } +func (g Group) OriginalFile() string { + return g.originalFile +} + type AlertingRule struct { *rules.AlertingRule PartialResponseStrategy storepb.PartialResponseStrategy @@ -38,21 +43,45 @@ type RuleGroup struct { PartialResponseStrategy *storepb.PartialResponseStrategy } -type Managers map[storepb.PartialResponseStrategy]*rules.Manager +type Manager struct { + workDir string + mgrs map[storepb.PartialResponseStrategy]*rules.Manager + + mtx sync.RWMutex + ruleFiles map[string]string +} -func (m Managers) RuleGroups() []Group { +func NewManager(dataDir string) *Manager { + return &Manager{ + workDir: filepath.Join(dataDir, tmpRuleDir), + mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager), + ruleFiles: make(map[string]string), + } +} + +func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.Manager) { + m.mgrs[s] = mgr +} + +func (m *Manager) RuleGroups() []Group { + m.mtx.RLock() + defer m.mtx.RUnlock() var res []Group - for s, r := range m { + for s, r := range m.mgrs { for _, group := range r.RuleGroups() { - res = append(res, Group{Group: group, PartialResponseStrategy: s}) + res = append(res, Group{ + Group: group, + PartialResponseStrategy: s, + originalFile: m.ruleFiles[group.File()], + }) } } return res } -func (m Managers) AlertingRules() []AlertingRule { +func (m *Manager) AlertingRules() []AlertingRule { var res []AlertingRule - for s, r := range m { + for s, r := range m.mgrs { for _, r := range r.AlertingRules() { res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s}) } @@ -67,12 +96,12 @@ func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) if err := unmarshal(&rs); err != nil { - return errors.Wrapf(err, errMsg) + return errors.Wrap(err, errMsg) } rg := rulefmt.RuleGroup{} if err := unmarshal(&rg); err != nil { - return errors.Wrapf(err, errMsg) + return errors.Wrap(err, "failed to unmarshal rulefmt.RuleGroup") } p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] @@ -110,17 +139,18 @@ func (r RuleGroup) MarshalYAML() (interface{}, error) { // Update updates rules from given files to all managers we hold. We decide which groups should go where, based on // special field in RuleGroup file. -func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []string) error { +func (m *Manager) Update(evalInterval time.Duration, files []string) error { var ( - errs = tsdberrors.MultiError{} + errs tsdberrors.MultiError filesByStrategy = map[storepb.PartialResponseStrategy][]string{} + ruleFiles = map[string]string{} ) - if err := os.RemoveAll(path.Join(dataDir, tmpRuleDir)); err != nil { - return errors.Wrapf(err, "rm %s", path.Join(dataDir, tmpRuleDir)) + if err := os.RemoveAll(m.workDir); err != nil { + return errors.Wrapf(err, "failed to remove %s", m.workDir) } - if err := os.MkdirAll(path.Join(dataDir, tmpRuleDir), os.ModePerm); err != nil { - return errors.Wrapf(err, "mkdir %s", path.Join(dataDir, tmpRuleDir)) + if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil { + return errors.Wrapf(err, "failed to create %s", m.workDir) } for _, fn := range files { @@ -132,7 +162,7 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st var rg RuleGroups if err := yaml.Unmarshal(b, &rg); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, fn)) continue } @@ -153,33 +183,37 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st for s, rg := range groupsByStrategy { b, err := yaml.Marshal(rg) if err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) continue } - newFn := path.Join(dataDir, tmpRuleDir, filepath.Base(fn)+"."+s.String()) + newFn := filepath.Join(m.workDir, filepath.Base(fn)+"."+s.String()) if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, newFn)) continue } filesByStrategy[s] = append(filesByStrategy[s], newFn) + ruleFiles[newFn] = fn } } + m.mtx.Lock() for s, fs := range filesByStrategy { - mgr, ok := (*m)[s] + mgr, ok := m.mgrs[s] if !ok { - errs = append(errs, errors.Errorf("no updater found for %v", s)) + errs = append(errs, errors.Errorf("no manager found for %v", s)) continue } // We add external labels in `pkg/alert.Queue`. // TODO(bwplotka): Investigate if we should put ext labels here or not. if err := mgr.Update(evalInterval, fs, nil); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrapf(err, "strategy %s", s)) continue } } + m.ruleFiles = ruleFiles + m.mtx.Unlock() return errs.Err() } diff --git a/pkg/rule/rule_test.go b/pkg/rule/rule_test.go index 5353bb35037..8b9864a17e0 100644 --- a/pkg/rule/rule_test.go +++ b/pkg/rule/rule_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "sort" "strings" "testing" @@ -70,59 +71,76 @@ groups: - alert: "some" expr: "up" `), os.ModePerm)) - testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "combined-wrong.yaml"), []byte(` -groups: -- name: "something8" - partial_response_strategy: "warn" - rules: - - alert: "some" - expr: "up" -- name: "something9" - partial_response_strategy: "adad" # Err 2 - rules: - - alert: "some" - expr: "up" -`), os.ModePerm)) opts := rules.ManagerOptions{ Logger: log.NewLogfmtLogger(os.Stderr), } - m := Managers{ - storepb.PartialResponseStrategy_ABORT: rules.NewManager(&opts), - storepb.PartialResponseStrategy_WARN: rules.NewManager(&opts), - } + m := NewManager(dir) + m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, rules.NewManager(&opts)) + m.SetRuleManager(storepb.PartialResponseStrategy_WARN, rules.NewManager(&opts)) - err = m.Update(dir, 10*time.Second, []string{ + err = m.Update(10*time.Second, []string{ path.Join(dir, "no_strategy.yaml"), path.Join(dir, "abort.yaml"), path.Join(dir, "warn.yaml"), path.Join(dir, "wrong.yaml"), path.Join(dir, "combined.yaml"), - path.Join(dir, "combined_wrong.yaml"), + path.Join(dir, "non_existing.yaml"), }) testutil.NotOk(t, err) - testutil.Assert(t, strings.HasPrefix(err.Error(), "2 errors: failed to unmarshal 'partial_response_strategy'"), err.Error()) - - g := m[storepb.PartialResponseStrategy_WARN].RuleGroups() - testutil.Equals(t, 2, len(g)) + testutil.Assert(t, strings.Contains(err.Error(), "wrong.yaml: failed to unmarshal 'partial_response_strategy'"), err.Error()) + testutil.Assert(t, strings.Contains(err.Error(), "non_existing.yaml: no such file or directory"), err.Error()) + g := m.RuleGroups() sort.Slice(g, func(i, j int) bool { return g[i].Name() < g[j].Name() }) - testutil.Equals(t, "something3", g[0].Name()) - testutil.Equals(t, "something5", g[1].Name()) - - g = m[storepb.PartialResponseStrategy_ABORT].RuleGroups() - testutil.Equals(t, 4, len(g)) - sort.Slice(g, func(i, j int) bool { - return g[i].Name() < g[j].Name() - }) - testutil.Equals(t, "something1", g[0].Name()) - testutil.Equals(t, "something2", g[1].Name()) - testutil.Equals(t, "something6", g[2].Name()) - testutil.Equals(t, "something7", g[3].Name()) + exp := []struct { + name string + file string + strategy storepb.PartialResponseStrategy + }{ + { + name: "something1", + file: filepath.Join(dir, "no_strategy.yaml"), + strategy: storepb.PartialResponseStrategy_ABORT, + }, + { + name: "something2", + file: filepath.Join(dir, "abort.yaml"), + strategy: storepb.PartialResponseStrategy_ABORT, + }, + { + name: "something3", + file: filepath.Join(dir, "warn.yaml"), + strategy: storepb.PartialResponseStrategy_WARN, + }, + { + name: "something5", + file: filepath.Join(dir, "combined.yaml"), + strategy: storepb.PartialResponseStrategy_WARN, + }, + { + name: "something6", + file: filepath.Join(dir, "combined.yaml"), + strategy: storepb.PartialResponseStrategy_ABORT, + }, + { + name: "something7", + file: filepath.Join(dir, "combined.yaml"), + strategy: storepb.PartialResponseStrategy_ABORT, + }, + } + testutil.Equals(t, len(exp), len(g)) + for i := range exp { + t.Run(exp[i].name, func(t *testing.T) { + testutil.Equals(t, exp[i].strategy, g[i].PartialResponseStrategy) + testutil.Equals(t, exp[i].name, g[i].Name()) + testutil.Equals(t, exp[i].file, g[i].OriginalFile()) + }) + } } func TestRuleGroupMarshalYAML(t *testing.T) { diff --git a/pkg/ui/rule.go b/pkg/ui/rule.go index 39b753d726b..12188df9d91 100644 --- a/pkg/ui/rule.go +++ b/pkg/ui/rule.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/prometheus/rules" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" thanosrule "github.com/thanos-io/thanos/pkg/rule" - "github.com/thanos-io/thanos/pkg/store/storepb" ) type Rule struct { @@ -24,18 +23,18 @@ type Rule struct { flagsMap map[string]string - ruleManagers thanosrule.Managers - queryURL string - reg prometheus.Registerer + ruleManager *thanosrule.Manager + queryURL string + reg prometheus.Registerer } -func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManagers map[storepb.PartialResponseStrategy]*rules.Manager, queryURL string, flagsMap map[string]string) *Rule { +func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManager *thanosrule.Manager, queryURL string, flagsMap map[string]string) *Rule { return &Rule{ - BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), - flagsMap: flagsMap, - ruleManagers: ruleManagers, - queryURL: queryURL, - reg: reg, + BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), + flagsMap: flagsMap, + ruleManager: ruleManager, + queryURL: queryURL, + reg: reg, } } @@ -115,7 +114,7 @@ func ruleTmplFuncs(queryURL string) template.FuncMap { } func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { - alerts := ru.ruleManagers.AlertingRules() + alerts := ru.ruleManager.AlertingRules() alertsSorter := byAlertStateAndNameSorter{alerts: alerts} sort.Sort(alertsSorter) @@ -138,7 +137,7 @@ func (ru *Rule) rules(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(ru.logger, ru.flagsMap, r) // TODO(bwplotka): Update HTML to include partial response. - ru.executeTemplate(w, "rules.html", prefix, ru.ruleManagers) + ru.executeTemplate(w, "rules.html", prefix, ru.ruleManager) } // Root redirects / requests to /graph, taking into account the path prefix value. diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 272a3ed98a7..5784938aa85 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -8,7 +8,7 @@ import ( "math" "net/http" "os" - "path" + "path/filepath" "sort" "testing" "time" @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/promclient" + rapi "github.com/thanos-io/thanos/pkg/rule/api" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" @@ -52,9 +53,14 @@ groups: ` ) -var ( - alertsToTest = []string{testAlertRuleAbortOnPartialResponse, testAlertRuleWarnOnPartialResponse} -) +func createRuleFiles(t *testing.T, dir string) { + t.Helper() + + for i, rule := range []string{testAlertRuleAbortOnPartialResponse, testAlertRuleWarnOnPartialResponse} { + err := ioutil.WriteFile(filepath.Join(dir, fmt.Sprintf("rules-%d.yaml", i)), []byte(rule), 0666) + testutil.Ok(t, err) + } +} func TestRule(t *testing.T) { a := newLocalAddresser() @@ -62,8 +68,13 @@ func TestRule(t *testing.T) { am := alertManager(a.New()) qAddr := a.New() - r1 := rule(a.New(), a.New(), alertsToTest, am.HTTP, []address{qAddr}, nil) - r2 := rule(a.New(), a.New(), alertsToTest, am.HTTP, nil, []address{qAddr}) + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + createRuleFiles(t, rulesDir) + + r1 := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) + r2 := rule(a.New(), a.New(), rulesDir, am.HTTP, nil, []address{qAddr}) q := querier(qAddr, a.New(), []address{r1.GRPC, r2.GRPC}, nil) @@ -205,14 +216,23 @@ func TestRule(t *testing.T) { })) testutil.Equals(t, 2, checks) - // Verify API endpoints. - for _, endpoint := range []string{"/api/v1/rules", "/api/v1/alerts"} { - for _, r := range []*serverScheduler{r1, r2} { - code, _, err := getAPIEndpoint(ctx, r.HTTP.URL()+endpoint) - testutil.Ok(t, err) - testutil.Equals(t, 200, code) + // Verify the rules API endpoint. + for _, r := range []*serverScheduler{r1, r2} { + rgs, err := queryRules(ctx, r.HTTP.URL()) + testutil.Ok(t, err) + testutil.Equals(t, 2, len(rgs)) + for i := range rgs { + testutil.Equals(t, filepath.Join(rulesDir, fmt.Sprintf("rules-%d.yaml", i)), rgs[i].File) + testutil.Equals(t, "example", rgs[i].Name) } } + + // Verify the alerts API endpoint. + for _, r := range []*serverScheduler{r1, r2} { + code, _, err := getAPIEndpoint(ctx, r.HTTP.URL()+"/api/v1/alerts") + testutil.Ok(t, err) + testutil.Equals(t, 200, code) + } } type failingStoreAPI struct{} @@ -271,7 +291,10 @@ func TestRulePartialResponse(t *testing.T) { f := fakeStoreAPI(a.New(), &failingStoreAPI{}) am := alertManager(a.New()) - r := ruleWithDir(a.New(), a.New(), dir, nil, am.HTTP, []address{qAddr}, nil) + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + r := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC, f.GRPC}, nil) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) @@ -316,9 +339,7 @@ func TestRulePartialResponse(t *testing.T) { })) // Add alerts to ruler, we want to add it only when Querier is rdy, otherwise we will get "no store match the query". - for i, rule := range alertsToTest { - testutil.Ok(t, ioutil.WriteFile(path.Join(dir, fmt.Sprintf("rules-%d.yaml", i)), []byte(rule), 0666)) - } + createRuleFiles(t, rulesDir) resp, err := http.Post(r.HTTP.URL()+"/-/reload", "", nil) testutil.Ok(t, err) @@ -468,6 +489,27 @@ func queryAlertmanagerAlerts(ctx context.Context, url string) ([]*model.Alert, e return v.Data, nil } +func queryRules(ctx context.Context, url string) ([]*rapi.RuleGroup, error) { + code, body, err := getAPIEndpoint(ctx, url+"/api/v1/rules") + if err != nil { + return nil, err + } + if code != 200 { + return nil, errors.Errorf("expected 200 response, got %d", code) + } + + var resp struct { + Data rapi.RuleDiscovery + } + if err = json.Unmarshal(body, &resp); err != nil { + return nil, err + } + sort.Slice(resp.Data.RuleGroups, func(i, j int) bool { + return resp.Data.RuleGroups[i].File < resp.Data.RuleGroups[j].File + }) + return resp.Data.RuleGroups, nil +} + func getAPIEndpoint(ctx context.Context, url string) (int, []byte, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 9c6b3b09ad7..6827f4a4732 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -10,7 +10,7 @@ import ( "net" "os" "os/exec" - "path" + "path/filepath" "strconv" "syscall" "testing" @@ -120,13 +120,13 @@ type prometheusScheduler struct { func prometheus(http address, config string) *prometheusScheduler { s := &prometheusScheduler{ - RelDir: path.Join("data", "prom", http.Port), + RelDir: filepath.Join("data", "prom", http.Port), } s.serverScheduler = serverScheduler{ HTTP: http, schedule: func(workDir string) (execs Exec, e error) { - promDir := path.Join(workDir, s.RelDir) + promDir := filepath.Join(workDir, s.RelDir) if err := os.MkdirAll(promDir, 0777); err != nil { return nil, errors.Wrap(err, "create prom dir failed") } @@ -152,7 +152,7 @@ func sidecar(http, grpc address, prom *prometheusScheduler) *serverScheduler { HTTP: http, GRPC: grpc, schedule: func(workDir string) (Exec, error) { - promDir := path.Join(workDir, prom.RelDir) + promDir := filepath.Join(workDir, prom.RelDir) return newCmdExec(exec.Command("thanos", "sidecar", "--debug.name", fmt.Sprintf("sidecar-%s", http.Port), "--grpc-address", grpc.HostPort(), @@ -174,7 +174,7 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec HTTP: http, GRPC: grpc, schedule: func(workDir string) (Exec, error) { - receiveDir := path.Join(workDir, "data", "receive", http.Port) + receiveDir := filepath.Join(workDir, "data", "receive", http.Port) if err := os.MkdirAll(receiveDir, 0777); err != nil { return nil, errors.Wrap(err, "create receive dir") } @@ -184,7 +184,7 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) } - if err := ioutil.WriteFile(path.Join(receiveDir, "hashrings.json"), b, 0666); err != nil { + if err := ioutil.WriteFile(filepath.Join(receiveDir, "hashrings.json"), b, 0666); err != nil { return nil, errors.Wrap(err, "creating receive config") } @@ -195,11 +195,11 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec "--http-address", metric.HostPort(), "--remote-write.address", http.HostPort(), "--label", fmt.Sprintf(`receive="%s"`, http.Port), - "--tsdb.path", path.Join(receiveDir, "tsdb"), + "--tsdb.path", filepath.Join(receiveDir, "tsdb"), "--log.level", "debug", "--receive.replication-factor", strconv.Itoa(replicationFactor), "--receive.local-endpoint", remoteWriteEndpoint(http), - "--receive.hashrings-file", path.Join(receiveDir, "hashrings.json"), + "--receive.hashrings-file", filepath.Join(receiveDir, "hashrings.json"), "--receive.hashrings-file-refresh-interval", "5s")), nil }, } @@ -226,7 +226,7 @@ func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses } if len(fileSDStoreAddresses) > 0 { - queryFileSDDir := path.Join(workDir, "data", "querier", http.Port) + queryFileSDDir := filepath.Join(workDir, "data", "querier", http.Port) if err := os.MkdirAll(queryFileSDDir, 0777); err != nil { return nil, errors.Wrap(err, "create query dir failed") } @@ -236,7 +236,7 @@ func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses } args = append(args, - "--store.sd-files", path.Join(queryFileSDDir, "filesd.json"), + "--store.sd-files", filepath.Join(queryFileSDDir, "filesd.json"), "--store.sd-interval", "5s", ) } @@ -251,7 +251,7 @@ func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte) HTTP: http, GRPC: grpc, schedule: func(workDir string) (Exec, error) { - dbDir := path.Join(workDir, "data", "store-gateway", http.Port) + dbDir := filepath.Join(workDir, "data", "store-gateway", http.Port) if err := os.MkdirAll(dbDir, 0777); err != nil { return nil, errors.Wrap(err, "creating store gateway dir failed") @@ -278,7 +278,7 @@ func alertManager(http address) *serverScheduler { return &serverScheduler{ HTTP: http, schedule: func(workDir string) (Exec, error) { - dir := path.Join(workDir, "data", "alertmanager", http.Port) + dir := filepath.Join(workDir, "data", "alertmanager", http.Port) if err := os.MkdirAll(dir, 0777); err != nil { return nil, errors.Wrap(err, "creating alertmanager dir failed") @@ -304,35 +304,17 @@ receivers: } } -func rule(http, grpc address, rules []string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { - return ruleWithDir(http, grpc, "", rules, am, queryAddresses, queryFileSDAddresses) -} - -func ruleWithDir(http, grpc address, dir string, rules []string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { +func rule(http, grpc address, ruleDir string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { return &serverScheduler{ HTTP: http, GRPC: grpc, schedule: func(workDir string) (Exec, error) { - ruleDir := path.Join(workDir, "data", "rule", http.Port) - if dir != "" { - ruleDir = dir - } - - if err := os.MkdirAll(ruleDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir") - } - for i, rule := range rules { - if err := ioutil.WriteFile(path.Join(ruleDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { - return nil, errors.Wrapf(err, "writing rule %s", path.Join(ruleDir, fmt.Sprintf("/rules-%d.yaml", i))) - } - } - args := []string{ "rule", "--debug.name", fmt.Sprintf("rule-%s", http.Port), "--label", fmt.Sprintf(`replica="%s"`, http.Port), - "--data-dir", path.Join(ruleDir, "data"), - "--rule-file", path.Join(ruleDir, "*.yaml"), + "--data-dir", filepath.Join(workDir, "data"), + "--rule-file", filepath.Join(ruleDir, "*.yaml"), "--eval-interval", "1s", "--alertmanagers.url", am.URL(), "--grpc-address", grpc.HostPort(), @@ -347,10 +329,10 @@ func ruleWithDir(http, grpc address, dir string, rules []string, am address, que } if len(queryFileSDAddresses) > 0 { - if err := ioutil.WriteFile(path.Join(ruleDir, "filesd.json"), []byte(generateFileSD(queryFileSDAddresses)), 0666); err != nil { + if err := ioutil.WriteFile(filepath.Join(workDir, "filesd.json"), []byte(generateFileSD(queryFileSDAddresses)), 0666); err != nil { return nil, errors.Wrap(err, "creating ruler filesd config") } - args = append(args, "--query.sd-files", path.Join(ruleDir, "filesd.json")) + args = append(args, "--query.sd-files", filepath.Join(workDir, "filesd.json")) } return newCmdExec(exec.Command("thanos", args...)), nil }, @@ -430,7 +412,7 @@ func minio(http address, config s3.Config) *serverScheduler { return &serverScheduler{ HTTP: http, schedule: func(workDir string) (Exec, error) { - dbDir := path.Join(workDir, "data", "minio", http.Port) + dbDir := filepath.Join(workDir, "data", "minio", http.Port) if err := os.MkdirAll(dbDir, 0777); err != nil { return nil, errors.Wrap(err, "creating minio dir failed") }