Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support float histogram in store gateway #6925

Merged
merged 5 commits into from Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Expand Up @@ -1251,7 +1251,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram {
if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram || in.Encoding() == chunkenc.EncFloatHistogram {
b, err := save(in.Bytes())
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/bucket_test.go
Expand Up @@ -1301,6 +1301,13 @@ func TestBucketHistogramSeries(t *testing.T) {
})
}

func TestBucketFloatHistogramSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloatHistogram, false, false, samplesPerSeries, series, 1)
})
}

func TestBucketSkipChunksSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/store/storepb/testutil/series.go
Expand Up @@ -120,6 +120,8 @@
appendFloatSamples(t, app, tsLabel, opts)
case chunkenc.ValHistogram:
appendHistogramSamples(t, app, tsLabel, opts)
case chunkenc.ValFloatHistogram:
appendFloatHistogramSamples(t, app, tsLabel, opts)
}
}
testutil.Ok(t, app.Commit())
Expand Down Expand Up @@ -222,6 +224,33 @@
}
}

func appendFloatHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) {
sample := &histogram.FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 15,
Sum: 11.5,
PositiveSpans: []histogram.Span{{-2, 2}, {1, 3}},

Check failure on line 233 in pkg/store/storepb/testutil/series.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

composites: github.com/prometheus/prometheus/model/histogram.Span struct literal uses unkeyed fields (govet)
PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5},
NegativeSpans: []histogram.Span{{3, 2}, {3, 2}},

Check failure on line 235 in pkg/store/storepb/testutil/series.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

composites: github.com/prometheus/prometheus/model/histogram.Span struct literal uses unkeyed fields (govet)
NegativeBuckets: []float64{1.5, 0.5, 2.5, 3},
}

ref, err := app.AppendHistogram(
0,
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix), "j", fmt.Sprintf("%v", tsLabel)),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
nil,
sample,
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
_, err := app.AppendHistogram(ref, labels.EmptyLabels(), int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), nil, sample)
testutil.Ok(t, err)
}
}

// SeriesServer is test gRPC storeAPI series server.
type SeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
Expand Down
55 changes: 43 additions & 12 deletions pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -57,18 +57,31 @@
PromAddrPlaceHolder = "PROMETHEUS_ADDRESS"
)

var histogramSample = histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
}
var (
histogramSample = histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
}

floatHistogramSample = histogram.FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 15,
Sum: 11.5,
PositiveSpans: []histogram.Span{{-2, 2}, {1, 3}},

Check failure on line 79 in pkg/testutil/e2eutil/prometheus.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

composites: github.com/prometheus/prometheus/model/histogram.Span struct literal uses unkeyed fields (govet)
PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5},
NegativeSpans: []histogram.Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{1.5, 0.5, 2.5, 3},
}
)

func PrometheusBinary() string {
return "prometheus-" + defaultPrometheusVersion
Expand Down Expand Up @@ -463,6 +476,22 @@
return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValHistogram)
}

// CreateFloatHistogramBlockWithDelay writes a block with the given float native histogram series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateFloatHistogramBlockWithDelay(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
hashFunc metadata.HashFunc,
) (id ulid.ULID, err error) {
return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValFloatHistogram)
}

func createBlockWithDelay(ctx context.Context, dir string, series []labels.Labels, numSamples int, mint int64, maxt int64, blockDelay time.Duration, extLset labels.Labels, resolution int64, hashFunc metadata.HashFunc, samplesType chunkenc.ValueType) (ulid.ULID, error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, samplesType)
if err != nil {
Expand Down Expand Up @@ -545,6 +574,8 @@
randMutex.Unlock()
} else if sampleType == chunkenc.ValHistogram {
_, err = app.AppendHistogram(0, lset, t, &histogramSample, nil)
} else if sampleType == chunkenc.ValFloatHistogram {
_, err = app.AppendHistogram(0, lset, t, nil, &floatHistogramSample)
}
if err != nil {
if rerr := app.Rollback(); rerr != nil {
Expand Down
64 changes: 52 additions & 12 deletions test/e2e/store_gateway_test.go
Expand Up @@ -96,11 +96,13 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))

floatSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "2")}
nativeHistogramSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "3")}
floatHistogramSeries := []labels.Labels{labels.FromStrings("a", "2", "b", "3")}
extLset := labels.FromStrings("ext1", "value1", "replica", "1")
extLset2 := labels.FromStrings("ext1", "value1", "replica", "2")
extLset3 := labels.FromStrings("ext1", "value2", "replica", "3")
extLset4 := labels.FromStrings("ext1", "value1", "replica", "3")
extLset5 := labels.FromStrings("ext1", "value3", "replica", "1")
extLset6 := labels.FromStrings("ext1", "value2", "replica", "1")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)
Expand All @@ -116,6 +118,8 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, err)
id5, err := e2eutil.CreateHistogramBlockWithDelay(ctx, dir, nativeHistogramSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset5, 0, metadata.NoneFunc)
testutil.Ok(t, err)
id6, err := e2eutil.CreateFloatHistogramBlockWithDelay(ctx, dir, floatHistogramSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset6, 0, metadata.NoneFunc)
testutil.Ok(t, err)
l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l,
e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed")
Expand All @@ -126,13 +130,14 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id4.String()), id4.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id6.String()), id6.String()))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand Down Expand Up @@ -161,13 +166,19 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
"replica": "1",
},
},
)

// 2 x postings, 3 x series, 2 x chunks.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_bucket_store_series_blocks_queried"))

tenant1Opts := []e2emon.MetricsOption{
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")),
Expand Down Expand Up @@ -195,16 +206,21 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"b": "3",
"ext1": "value3",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
},
},
)

tenant2Opts := []e2emon.MetricsOption{
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-2")),
e2emon.WaitMissingMetrics(),
}
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(18), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3+3), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(24), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(16), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+4), "thanos_bucket_store_series_blocks_queried"))

// Test tenant some tenant specific store metrics
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, tenant2Opts...))
Expand All @@ -222,10 +238,10 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand All @@ -247,6 +263,12 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
"replica": "1",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+4), "thanos_bucket_store_series_blocks_queried"))
Expand Down Expand Up @@ -288,6 +310,12 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
"replica": "1",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(11+2), "thanos_bucket_store_series_blocks_queried"))
Expand All @@ -297,10 +325,10 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1+1), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand All @@ -321,6 +349,12 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
"replica": "1",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(14+1), "thanos_bucket_store_series_blocks_queried"))
Expand All @@ -344,6 +378,12 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "2",
"b": "3",
"ext1": "value2",
"replica": "1",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(15+2), "thanos_bucket_store_series_blocks_queried"))
Expand Down