diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 4de113feddd..e8c6b107491 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -12,6 +12,7 @@ import ( "runtime" "runtime/debug" "sort" + "strings" "testing" "github.com/davecgh/go-spew/spew" @@ -363,3 +364,18 @@ func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { return iw.Close() } + +// ErrorContainsString fails if error is nil or does not contain the given string. +func ErrorContainsString(tb testing.TB, err error, str string) { + tb.Helper() + + _, file, line, _ := runtime.Caller(1) + + if err == nil { + tb.Fatalf("\033[31m%s:%d: Expected error containing string %q, but error is nil.\n\n", filepath.Base(file), line, str) + } + + if !strings.Contains(err.Error(), str) { + tb.Fatalf("\033[31m%s:%d: Expected error containing string %q, but got error %q.\n\n", filepath.Base(file), line, str, err.Error()) + } +} diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go new file mode 100644 index 00000000000..54118aabd95 --- /dev/null +++ b/test/e2e/native_histograms_test.go @@ -0,0 +1,141 @@ +package e2e_test + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/efficientgo/e2e" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" +) + +func TestQueryNativeHistograms(t *testing.T) { + e, err := e2e.NewDockerEnvironment("nat-hist-query") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "prom", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget), "", "quay.io/prometheus/prometheus:v2.40.5", "", "native-histograms", "remote-write-receiver") + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + + querier := e2ethanos.NewQuerierBuilder(e, "querier", sidecar.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + rawRemoteWriteURL := "http://" + prom.Endpoint("http") + "/api/v1/write" + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + ts := time.Now() + + writeRequest(t, ctx, rawRemoteWriteURL, nativeHistogramWriteRequest(ts)) + + // Make sure we can query native histogram directly from Prometheus. + queryAndAssertSeries(t, ctx, prom.Endpoint("http"), func() string { return "test_histogram" }, time.Now, promclient.QueryOptions{}, []model.Metric{ + { + "__name__": "test_histogram", + "foo": "bar", + }, + }) + + // Querying from querier should fail. + _, _, err = promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+querier.Endpoint("http")), "test_histogram", ts, promclient.QueryOptions{}) + testutil.ErrorContainsString(t, err, "invalid chunk encoding") +} + +func TestWriteNativeHistograms(t *testing.T) { + e, err := e2e.NewDockerEnvironment("nat-hist-write") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + receiver := e2ethanos.NewReceiveBuilder(e, "receiver").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(receiver)) + + querier := e2ethanos.NewQuerierBuilder(e, "querier", receiver.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + rawRemoteWriteURL := "http://" + receiver.Endpoint("remote-write") + "/api/v1/receive" + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + writeRequest(t, ctx, rawRemoteWriteURL, nativeHistogramWriteRequest(time.Now())) + + queryAndAssertSeries(t, ctx, querier.Endpoint("http"), func() string { return "test_histogram" }, time.Now, promclient.QueryOptions{}, []model.Metric{}) + queryAndAssertSeries(t, ctx, querier.Endpoint("http"), func() string { return "test_sample" }, time.Now, promclient.QueryOptions{}, []model.Metric{ + { + "__name__": "test_sample", + "bar": "foo", + "receive": "receive-receiver", + "tenant_id": "default-tenant", + }, + }) +} + +func writeRequest(t *testing.T, ctx context.Context, rawRemoteWriteURL string, req *prompb.WriteRequest) { + t.Helper() + + remoteWriteURL, err := url.Parse(rawRemoteWriteURL) + testutil.Ok(t, err) + + client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{ + URL: &config_util.URL{URL: remoteWriteURL}, + Timeout: model.Duration(30 * time.Second), + }) + testutil.Ok(t, err) + + var buf []byte + pBuf := proto.NewBuffer(nil) + err = pBuf.Marshal(req) + testutil.Ok(t, err) + + compressed := snappy.Encode(buf, pBuf.Bytes()) + + err = client.Store(ctx, compressed) + testutil.Ok(t, err) +} + +func nativeHistogramWriteRequest(ts time.Time) *prompb.WriteRequest { + return &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_histogram"}, + {Name: "foo", Value: "bar"}, + }, + Histograms: []prompb.Histogram{ + remote.HistogramToHistogramProto(ts.UnixMilli(), &histogram.Histogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-100, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) + }), + }, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_sample"}, + {Name: "bar", Value: "foo"}, + }, + Samples: []prompb.Sample{ + {Timestamp: ts.UnixMilli(), Value: 1.2}, + }, + }, + }, + } +} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 872e32b67cc..04b425061a4 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -31,7 +31,6 @@ import ( "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" @@ -1701,70 +1700,3 @@ func TestConnectedQueriesWithLazyProxy(t *testing.T) { }, time.Now, promclient.QueryOptions{}, 0) } - -func TestQueryNativeHistograms(t *testing.T) { - e, err := e2e.NewDockerEnvironment("native-hist") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - receiver := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().Init() - testutil.Ok(t, e2e.StartAndWaitReady(receiver)) - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget), "", "quay.io/prometheus/prometheus:v2.40.5", "", "native-histograms", "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - // Querier. Both fileSD and directly by flags. - querier := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc"), receiver.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(querier)) - - remoteWriteURL, err := url.Parse("http://" + prom1.Endpoint("http") + "/api/v1/write") - testutil.Ok(t, err) - - client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{ - URL: &config_util.URL{URL: remoteWriteURL}, - Timeout: model.Duration(30 * time.Second), - }) - testutil.Ok(t, err) - - testHistogram := remote.HistogramToHistogramProto(time.Now().UnixMilli(), &histogram.Histogram{ - Count: 5, - ZeroCount: 2, - Sum: 18.4, - ZeroThreshold: 1e-100, - Schema: 1, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 2}, - {Offset: 1, Length: 2}, - }, - PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) - }) - - samplespb := []prompb.TimeSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_histogram"}, - {Name: "foo", Value: "bar"}, - }, - Histograms: []prompb.Histogram{ - testHistogram, - }, - }, - } - - sample := &prompb.WriteRequest{ - Timeseries: samplespb, - } - - var buf []byte - pBuf := proto.NewBuffer(nil) - err = pBuf.Marshal(sample) - testutil.Ok(t, err) - - compressed := snappy.Encode(buf, pBuf.Bytes()) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - t.Cleanup(cancel) - - err = client.Store(ctx, compressed) - testutil.Ok(t, err) -}