Skip to content

Commit

Permalink
Fix the metadata API flaky tests (#4013)
Browse files Browse the repository at this point in the history
* update the metadata API e2e test

Signed-off-by: yeya24 <yb532204897@gmail.com>

* address comment

Signed-off-by: yeya24 <yb532204897@gmail.com>

* add error messages to assert failure

Signed-off-by: yeya24 <yb532204897@gmail.com>

* wait till metadata ready

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fix the metadata unit test

Signed-off-by: yeya24 <yb532204897@gmail.com>

exhaust response body

Signed-off-by: yeya24 <yb532204897@gmail.com>

* change POST /-/reload to SIGHUP

Signed-off-by: yeya24 <yb532204897@gmail.com>

* remove lifecycle flag

Signed-off-by: yeya24 <yb532204897@gmail.com>

* address comments

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update config write

Signed-off-by: yeya24 <yb532204897@gmail.com>

* change testutil.Assert back to testutil.Equal

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Apr 9, 2021
1 parent f1d0172 commit ebda62e
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 33 deletions.
51 changes: 38 additions & 13 deletions pkg/metadata/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@ package metadata

import (
"context"
"fmt"
"net/http"
"net/url"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
testutil.TolerantVerifyLeakMain(m)
}

func TestPrometheus_Metadata_e2e(t *testing.T) {
p, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

testutil.Ok(t, p.SetConfig(`
p.SetConfig(fmt.Sprintf(`
global:
external_labels:
region: eu-west
Expand All @@ -36,17 +40,36 @@ scrape_configs:
scrape_interval: 1s
scrape_timeout: 1s
static_configs:
- targets: ['localhost:9090','localhost:80']
`))
- targets: ['%s']
`, e2eutil.PromAddrPlaceHolder))
testutil.Ok(t, p.Start())

time.Sleep(10 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

upctx, upcancel := context.WithTimeout(ctx, 10*time.Second)
defer upcancel()

logger := log.NewNopLogger()
err = p.WaitPrometheusUp(upctx, logger)
testutil.Ok(t, err)

u, err := url.Parse("http://" + p.Addr())
testutil.Ok(t, err)

prom := NewPrometheus(u, promclient.NewDefaultClient())
c := promclient.NewClient(http.DefaultClient, logger, "")

// Wait metadata response to be ready as Prometheus gets metadata after scrape.
testutil.Ok(t, runutil.Retry(3*time.Second, ctx.Done(), func() error {
meta, err := c.MetadataInGRPC(ctx, u, "", -1)
testutil.Ok(t, err)
if len(meta) > 0 {
return nil
}
return errors.New("empty metadata response from Prometheus")
}))

grpcClient := NewGRPCClient(NewPrometheus(u, c))
for _, tcase := range []struct {
name string
metric string
Expand All @@ -56,8 +79,9 @@ scrape_configs:
{
name: "all metadata return",
limit: -1,
// We just check two metrics here.
expectedFunc: func(m map[string][]metadatapb.Meta) bool {
return len(m["thanos_build_info"]) > 0 && len(m["prometheus_build_info"]) > 0
return len(m["prometheus_build_info"]) > 0 && len(m["prometheus_engine_query_duration_seconds"]) > 0
},
},
{
Expand All @@ -75,22 +99,23 @@ scrape_configs:
},
},
{
name: "only thanos_build_info metadata return",
metric: "thanos_build_info",
name: "only prometheus_build_info metadata return",
metric: "prometheus_build_info",
limit: 1,
expectedFunc: func(m map[string][]metadatapb.Meta) bool {
return len(m) == 1 && len(m["thanos_build_info"]) > 0
return len(m) == 1 && len(m["prometheus_build_info"]) > 0
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
meta, w, err := NewGRPCClient(prom).Metadata(context.Background(), &metadatapb.MetadataRequest{
meta, w, err := grpcClient.Metadata(ctx, &metadatapb.MetadataRequest{
Metric: tcase.metric,
Limit: tcase.limit,
})

testutil.Equals(t, storage.Warnings(nil), w)
testutil.Ok(t, err)
testutil.Assert(t, true, tcase.expectedFunc(meta))
testutil.Assert(t, tcase.expectedFunc(meta))
})
}
}
4 changes: 2 additions & 2 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func TestIsWALFileAccessible_e2e(t *testing.T) {

func TestExternalLabels_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
testutil.Ok(t, p.SetConfig(`
p.SetConfig(`
global:
external_labels:
region: eu-west
az: 1
`))
`)

testutil.Ok(t, p.Start())

Expand Down
4 changes: 2 additions & 2 deletions pkg/rules/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ func TestPrometheus_Rules_e2e(t *testing.T) {
testutil.Ok(t, err)
root := filepath.Join(curr, "../../")

testutil.Ok(t, p.SetConfig(fmt.Sprintf(`
p.SetConfig(fmt.Sprintf(`
global:
external_labels:
region: eu-west
rule_files:
- %s/examples/alerts/alerts.yaml
- %s/examples/alerts/rules.yaml
`, root, root)))
`, root, root))
testutil.Ok(t, p.Start())

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
Expand Down
5 changes: 3 additions & 2 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {

testutil.Ok(t, p.Start())

logger := log.NewNopLogger()
upctx, upcancel := context.WithTimeout(ctx, 10*time.Second)
defer upcancel()
testutil.Ok(t, p.WaitPrometheusUp(upctx))
testutil.Ok(t, p.WaitPrometheusUp(upctx, logger))

p.DisableCompaction()
testutil.Ok(t, p.Restart())

upctx2, upcancel2 := context.WithTimeout(ctx, 10*time.Second)
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2))
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc)

Expand Down
4 changes: 2 additions & 2 deletions pkg/targets/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestPrometheus_Targets_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

testutil.Ok(t, p.SetConfig(`
p.SetConfig(`
global:
external_labels:
region: eu-west
Expand All @@ -41,7 +41,7 @@ scrape_configs:
- source_labels: ['__address__']
regex: '^.+:80$'
action: drop
`))
`)
testutil.Ok(t, p.Start())

// For some reason it's better to wait much more than a few scrape intervals.
Expand Down
25 changes: 20 additions & 5 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (
promPathsEnvVar = "THANOS_TEST_PROMETHEUS_PATHS"
alertmanagerBinEnvVar = "THANOS_TEST_ALERTMANAGER_PATH"
minioBinEnvVar = "THANOS_TEST_MINIO_PATH"

// A placeholder for actual Prometheus instance address in the scrape config.
PromAddrPlaceHolder = "PROMETHEUS_ADDRESS"
)

func PrometheusBinary() string {
Expand Down Expand Up @@ -80,6 +83,8 @@ type Prometheus struct {
cmd *exec.Cmd
disabledCompaction bool
addr string

config string
}

func NewTSDB() (*tsdb.DB, error) {
Expand Down Expand Up @@ -176,6 +181,11 @@ func (p *Prometheus) start() error {
)
}
p.addr = fmt.Sprintf("localhost:%d", port)
// Write the final config to the config file.
// The address placeholder will be replaced with the actual address.
if err := p.writeConfig(strings.ReplaceAll(p.config, PromAddrPlaceHolder, p.addr)); err != nil {
return err
}
args := append([]string{
"--storage.tsdb.retention=2d", // Pass retention cause prometheus since 2.8.0 don't show default value for that flags in web/api: https://github.com/prometheus/prometheus/pull/5433.
"--storage.tsdb.path=" + p.db.Dir(),
Expand All @@ -199,7 +209,7 @@ func (p *Prometheus) start() error {
return nil
}

func (p *Prometheus) WaitPrometheusUp(ctx context.Context) error {
func (p *Prometheus) WaitPrometheusUp(ctx context.Context, logger log.Logger) error {
if !p.running {
return errors.New("method Start was not invoked.")
}
Expand All @@ -208,6 +218,7 @@ func (p *Prometheus) WaitPrometheusUp(ctx context.Context) error {
if err != nil {
return err
}
defer runutil.ExhaustCloseWithLogOnErr(logger, r.Body, "failed to exhaust and close body")

if r.StatusCode != 200 {
return errors.Errorf("Got non 200 response: %v", r.StatusCode)
Expand Down Expand Up @@ -238,15 +249,19 @@ func (p *Prometheus) DisableCompaction() {
p.disabledCompaction = true
}

// SetConfig updates the contents of the config file. By default it is empty.
func (p *Prometheus) SetConfig(s string) (err error) {
// SetConfig updates the contents of the config.
func (p *Prometheus) SetConfig(s string) {
p.config = s
}

// writeConfig writes the Prometheus config to the config file.
func (p *Prometheus) writeConfig(config string) (err error) {
f, err := os.Create(filepath.Join(p.dir, "prometheus.yml"))
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(&err, f, "prometheus config")

_, err = f.Write([]byte(s))
_, err = f.Write([]byte(config))
return err
}

Expand Down
47 changes: 40 additions & 7 deletions test/e2e/metadata_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ package e2e_test

import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/cortexproject/cortex/integration/e2e"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
Expand Down Expand Up @@ -66,32 +70,61 @@ func TestMetadataAPI_Fanout(t *testing.T) {
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2e.WaitMissingMetrics))

promMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)
var promMeta map[string][]metadatapb.Meta
// Wait metadata response to be ready as Prometheus gets metadata after scrape.
testutil.Ok(t, runutil.Retry(3*time.Second, ctx.Done(), func() error {
promMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)
if len(promMeta) > 0 {
return nil
}
return fmt.Errorf("empty metadata response from Prometheus")
}))

thanosMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)
testutil.Assert(t, len(thanosMeta) > 0, "got empty metadata response from Thanos")

// Metadata response from Prometheus and Thanos Querier should be the same after deduplication.
testutil.Equals(t, thanosMeta, promMeta)
metadataEqual(t, thanosMeta, promMeta)

// We only expect to see one metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1)
testutil.Equals(t, len(thanosMeta), 1)

// We only expect to see ten metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 10)
testutil.Equals(t, len(thanosMeta), 10)

// No metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 0)
testutil.Equals(t, len(thanosMeta), 0)

// Only prometheus_build_info metric will be returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1 && len(thanosMeta["prometheus_build_info"]) > 0)
testutil.Assert(t, len(thanosMeta) == 1 && len(thanosMeta["prometheus_build_info"]) > 0, "expected one prometheus_build_info metadata from Thanos, got %v", thanosMeta)
}

func metadataEqual(t *testing.T, meta1, meta2 map[string][]metadatapb.Meta) {
// The two responses should have equal # of entries.
testutil.Equals(t, len(meta1), len(meta2))

for metric := range meta1 {
// Get metadata for the metric.
meta1MetricMeta := meta1[metric]
meta2MetricMeta, ok := meta2[metric]
testutil.Assert(t, ok)

sort.Slice(meta1MetricMeta, func(i, j int) bool {
return meta1MetricMeta[i].Help < meta1MetricMeta[j].Help
})
sort.Slice(meta2MetricMeta, func(i, j int) bool {
return meta2MetricMeta[i].Help < meta2MetricMeta[j].Help
})
testutil.Equals(t, meta1MetricMeta, meta2MetricMeta)
}
}

0 comments on commit ebda62e

Please sign in to comment.