Skip to content

Commit

Permalink
Merge pull request #6137 from infracloudio/repair-xor-chunks
Browse files Browse the repository at this point in the history
Repair non-empty XOR chunks during 1h downsampling
  • Loading branch information
yeya24 committed Feb 22, 2023
2 parents b0da089 + 6fc86a5 commit 1967cd0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6098](https://github.com/thanos-io/thanos/pull/6098) Cache/Redis: upgrade `rueidis` to v0.0.93 to fix potential panic when the client-side caching is disabled.
- [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations.
- [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries.
- [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling.

### Changed

Expand Down
14 changes: 13 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,22 @@ func Downsample(
// https://github.com/thanos-io/thanos/issues/5272
level.Warn(logger).Log("msg", fmt.Sprintf("expected downsampled chunk (*downsample.AggrChunk) got an empty %T instead for series: %d", c.Chunk, postings.At()))
continue
} else {
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
aggrDataChunks := DownsampleRaw(all, ResLevel1)
for _, cn := range aggrDataChunks {
ac, ok = cn.Chunk.(*AggrChunk)
if !ok {
return id, errors.New("Not able to convert non-empty XOR chunks to 5m downsampled aggregated chunks.")
}
}
}
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got a non-empty %T instead for series: %d", c.Chunk, postings.At())

}
aggrChunks = append(aggrChunks, ac)

}
downsampledChunks, err := downsampleAggr(
aggrChunks,
Expand Down
73 changes: 66 additions & 7 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,21 +538,23 @@ func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) {
}

func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)
dir := t.TempDir()

ser := &series{lset: labels.FromStrings("__name__", "a")}
aggr := map[AggrType][]sample{
AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}, {t: 1587691199999, v: 20}, {t: 1587691499999, v: 20}, {t: 1587691799999, v: 20}, {t: 1587692099999, v: 20}, {t: 1587692399999, v: 20}, {t: 1587692699999, v: 16}, {t: 1587692999999, v: 20}, {t: 1587693299999, v: 20}, {t: 1587693590791, v: 20}},
AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587690899999, v: 9.447457e+06}, {t: 1587691199999, v: 9.542732e+06}, {t: 1587691499999, v: 9.630379e+06}, {t: 1587691799999, v: 9.715631e+06}, {t: 1587692099999, v: 9.799808e+06}, {t: 1587692399999, v: 9.888117e+06}, {t: 1587692699999, v: 2.98928e+06}, {t: 1587692999999, v: 81592}, {t: 1587693299999, v: 163711}, {t: 1587693590791, v: 255746}},
AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}, {t: 1587691499999, v: 479625}, {t: 1587691799999, v: 483709}, {t: 1587692099999, v: 488036}, {t: 1587692399999, v: 492223}, {t: 1587692699999, v: 75}, {t: 1587692999999, v: 2261}, {t: 1587693299999, v: 6210}, {t: 1587693590791, v: 10464}},
AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 496544}, {t: 1587692999999, v: 6010}, {t: 1587693299999, v: 10242}, {t: 1587693590791, v: 14956}},
AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 498647}, {t: 1587692999999, v: 502554}, {t: 1587693299999, v: 506786}, {t: 1587693590791, v: 511500}, {t: 1587693590791, v: 14956}},
AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}},
AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587693590791, v: 255746}},
AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}},
AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}},
AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}},
}
raw := chunkenc.NewXORChunk()
app, err := raw.Appender()
testutil.Ok(t, err)

app.Append(1587690005794, 42.5)

ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{
MinTime: math.MaxInt64,
MaxTime: math.MinInt64,
Expand All @@ -566,7 +568,64 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
fakeMeta.Thanos.Downsample.Resolution = 300_000
id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000)
_ = id
testutil.NotOk(t, err)
testutil.Ok(t, err)

expected := []map[AggrType][]sample{
{
AggrCount: {{1587690005794, 20}, {1587690005794, 20}, {1587690005794, 21}},
AggrSum: {{1587690005794, 9.276972e+06}, {1587690005794, 9.359861e+06}, {1587690005794, 255788.5}},
AggrMin: {{1587690005794, 461968}, {1587690005794, 466070}, {1587690005794, 470131}, {1587690005794, 42.5}},
AggrMax: {{1587690005794, 465870}, {1587690005794, 469951}, {1587690005794, 474726}},
AggrCounter: {{1587690005791, 461968}, {1587690599999, 469951}, {1587690599999, 469951}},
},
}

_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, indexr.Close()) }()

chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), block.ChunksDirname), NewPool())
testutil.Ok(t, err)
defer func() { testutil.Ok(t, chunkr.Close()) }()

pall, err := indexr.Postings(index.AllPostingsKey())
testutil.Ok(t, err)

var series []storage.SeriesRef
for pall.Next() {
series = append(series, pall.At())
}
testutil.Ok(t, pall.Err())
testutil.Equals(t, 1, len(series))

var builder labels.ScratchBuilder
var chks []chunks.Meta
testutil.Ok(t, indexr.Series(series[0], &builder, &chks))

var got []map[AggrType][]sample
for _, c := range chks {
chk, err := chunkr.Chunk(c)
testutil.Ok(t, err)

m := map[AggrType][]sample{}
for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} {
c, err := chk.(*AggrChunk).Get(at)
if err == ErrAggrNotExist {
continue
}
testutil.Ok(t, err)

buf := m[at]
testutil.Ok(t, expandChunkIterator(c.Iterator(nil), &buf))
m[at] = buf
}
got = append(got, m)
}
testutil.Equals(t, expected, got)

}

func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series {
Expand Down

0 comments on commit 1967cd0

Please sign in to comment.