Skip to content

Commit

Permalink
Deduplicate chunk dups on proxy StoreAPI level. Recommend chunk sorti…
Browse files Browse the repository at this point in the history
…ng for StoreAPI.

Also: Merge same series together on proxy level instead select. This allows better dedup efficiency.

Partially fixes: #2303

Cases like overlapped data from store and sidecar and 1:1 duplicates are optimized as soon as it's possible.
This case was highly visible on GitLab repro data and exists in most of Thanos setup.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 13, 2020
1 parent 34859f1 commit 4f3f53d
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also
moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument
anymore.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).

## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30

Expand Down
5 changes: 2 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,9 +981,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()

// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
// Chunks of returned series might be out of order w.r.t to their time range.
// This must be accounted for later by clients.
// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
// blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later.
set := storepb.MergeSeriesSets(res...)
for set.Next() {
var series storepb.Series
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
}
return s.currSeries.Labels, s.currSeries.Chunks
}

func (s *streamSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
Expand Down
154 changes: 109 additions & 45 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,19 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}

// MergeSeriesSets returns a new series set that is the union of the input sets.
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
Expand All @@ -100,29 +112,48 @@ type SeriesSet interface {
// mergedSeriesSet takes two series sets as a single series set.
type mergedSeriesSet struct {
a, b SeriesSet
// peek nil means not set iterator exhausted or not started.
apeek, bpeek *Series
aok, bok bool

lset []Label
chunks []AggrChunk
adone, bdone bool
// Current.
curr *Series
}

// newMergedSeriesSet takes two series sets as a single series set.
// Series that occur in both sets should have disjoint time ranges.
// If the ranges overlap b samples are appended to a samples.
// If the single SeriesSet returns same series within many iterations,
// merge series set will not try to merge those.
func (s *mergedSeriesSet) adone() bool { return s.apeek == nil }
func (s *mergedSeriesSet) bdone() bool { return s.bpeek == nil }

func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
s.adone = !s.a.Next()
s.bdone = !s.b.Next()

s.apeek, s.aok = peekSameLset(s.a, s.a.Next())
s.bpeek, s.bok = peekSameLset(s.b, s.b.Next())
return s
}

func peekSameLset(ss SeriesSet, sok bool) (peek *Series, _ bool) {
if !sok {
return nil, false
}

peek = &Series{}
peek.Labels, peek.Chunks = ss.At()
for ss.Next() {
lset, chks := ss.At()
if CompareLabels(lset, peek.Labels) != 0 {
return peek, true
}

// TODO: Slice reuse is not generally safe with nested merge iterators?
peek.Chunks = append(peek.Chunks, chks...)
}
return peek, false
}

func (s *mergedSeriesSet) At() ([]Label, []AggrChunk) {
return s.lset, s.chunks
if s.curr == nil {
return nil, nil
}
return s.curr.Labels, s.curr.Chunks
}

func (s *mergedSeriesSet) Err() error {
Expand All @@ -132,48 +163,81 @@ func (s *mergedSeriesSet) Err() error {
return s.b.Err()
}

func (s *mergedSeriesSet) compare() int {
if s.adone {
func (s *mergedSeriesSet) comparePeeks() int {
if s.adone() {
return 1
}
if s.bdone {
if s.bdone() {
return -1
}
lsetA, _ := s.a.At()
lsetB, _ := s.b.At()
return CompareLabels(lsetA, lsetB)
return CompareLabels(s.apeek.Labels, s.bpeek.Labels)
}

func (s *mergedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
if (s.adone() && s.bdone()) || s.Err() != nil {
return false
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
d := s.comparePeeks()
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
} else {
// Concatenate chunks from both series sets. They may be expected of order
// w.r.t to their time range. This must be accounted for later.
lset, chksA := s.a.At()
_, chksB := s.b.At()

s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
s.chunks = append(s.chunks, chksA...)
s.chunks = append(s.chunks, chksB...)

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
s.curr = s.bpeek
s.bpeek, s.bok = peekSameLset(s.b, s.bok)
return true
}
if d < 0 {
s.curr = s.apeek
s.apeek, s.aok = peekSameLset(s.a, s.aok)
return true
}
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.

s.curr = &Series{
Labels: s.apeek.Labels,
// Slice reuse is not generally safe with nested merge iterators. We are on the safe side creating a new slice.
Chunks: make([]AggrChunk, 0, len(s.apeek.Chunks)+len(s.bpeek.Chunks)),
}

b := 0
Outer:
for a := range s.apeek.Chunks {
for {
if b >= len(s.bpeek.Chunks) {
// No more b chunks.
s.curr.Chunks = append(s.curr.Chunks, s.apeek.Chunks[a:]...)
break Outer
}

if s.apeek.Chunks[a].MinTime < s.bpeek.Chunks[b].MinTime {
s.curr.Chunks = append(s.curr.Chunks, s.apeek.Chunks[a])
break
}

if s.apeek.Chunks[a].MinTime > s.bpeek.Chunks[b].MinTime {
s.curr.Chunks = append(s.curr.Chunks, s.bpeek.Chunks[b])
b++
continue
}

if strings.Compare(s.apeek.Chunks[a].String(), s.bpeek.Chunks[b].String()) == 0 {
// Exact duplicated chunks, discard one from b.
b++
continue
}

// Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop).
s.curr.Chunks = append(s.curr.Chunks, s.bpeek.Chunks[b])
b++
}
}

if b < len(s.bpeek.Chunks) {
s.curr.Chunks = append(s.curr.Chunks, s.bpeek.Chunks[b:]...)
}

s.apeek, s.aok = peekSameLset(s.a, s.aok)
s.bpeek, s.bok = peekSameLset(s.b, s.bok)
return true
}

Expand Down

0 comments on commit 4f3f53d

Please sign in to comment.