Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Adjust deduplication for counters when querying for PromQL rates. #2548

Merged
merged 4 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Fixed

- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: Gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: Does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed
- [#2615](https://github.com/thanos-io/thanos/pull/2615) Rule: Fix bugs where rules were out of sync.
- [#2548](https://github.com/thanos-io/thanos/pull/2548) Query: Fixed rare cases of double counter reset accounting when querying `rate` with deduplication enabled.

### Added

Expand All @@ -33,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also
moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument
anymore.
- [#2548](https://github.com/thanos-io/thanos/pull/2548/commits/53e69bd89b2b08c18df298eed7d90cb7179cc0ec) Store, Querier: remove duplicated chunks on StoreAPI.

## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30

Expand Down
27 changes: 14 additions & 13 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.

ab := newAggrChunkBuilder()

// Encode first raw value; see CounterSeriesIterator.
// Encode first raw value; see ApplyCounterResetsSeriesIterator.
ab.apps[AggrCounter].Append(batch[0].t, batch[0].v)

lastT := downsampleBatch(batch, resolution, ab.add)

// Encode last raw value; see CounterSeriesIterator.
// Encode last raw value; see ApplyCounterResetsSeriesIterator.
ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v)

chks = append(chks, ab.encode())
Expand Down Expand Up @@ -525,7 +525,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
acs = append(acs, c.Iterator(reuseIt))
}
*buf = (*buf)[:0]
it := NewCounterSeriesIterator(acs...)
it := NewApplyCounterResetsIterator(acs...)

if err := expandChunkIterator(it, buf); err != nil {
return chk, err
Expand All @@ -538,7 +538,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()

// Retain first raw value; see CounterSeriesIterator.
// Retain first raw value; see ApplyCounterResetsSeriesIterator.
ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v)

lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
Expand All @@ -550,7 +550,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.apps[AggrCounter].Append(t, a.counter)
})

// Retain last raw value; see CounterSeriesIterator.
// Retain last raw value; see ApplyCounterResetsSeriesIterator.
ab.apps[AggrCounter].Append(lastT, it.lastV)

ab.mint = mint
Expand All @@ -563,7 +563,7 @@ type sample struct {
v float64
}

// CounterSeriesIterator generates monotonically increasing values by iterating
// ApplyCounterResetsSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values. Stale Markers
Expand All @@ -580,7 +580,7 @@ type sample struct {
// It handles overlapped chunks (removes overlaps).
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
type CounterSeriesIterator struct {
type ApplyCounterResetsSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
total int // Total number of processed samples.
Expand All @@ -589,11 +589,11 @@ type CounterSeriesIterator struct {
totalV float64 // Total counter state since beginning of series.
}

func NewCounterSeriesIterator(chks ...chunkenc.Iterator) *CounterSeriesIterator {
return &CounterSeriesIterator{chks: chks}
func NewApplyCounterResetsIterator(chks ...chunkenc.Iterator) *ApplyCounterResetsSeriesIterator {
return &ApplyCounterResetsSeriesIterator{chks: chks}
}

func (it *CounterSeriesIterator) Next() bool {
func (it *ApplyCounterResetsSeriesIterator) Next() bool {
for {
if it.i >= len(it.chks) {
return false
Expand Down Expand Up @@ -637,11 +637,12 @@ func (it *CounterSeriesIterator) Next() bool {
}
}

func (it *CounterSeriesIterator) At() (t int64, v float64) {
func (it *ApplyCounterResetsSeriesIterator) At() (t int64, v float64) {
return it.lastT, it.totalV
}

func (it *CounterSeriesIterator) Seek(x int64) bool {
func (it *ApplyCounterResetsSeriesIterator) Seek(x int64) bool {
// Don't use underlying Seek, but iterate over next to not miss counter resets.
for {
if t, _ := it.At(); t >= x {
return true
Expand All @@ -654,7 +655,7 @@ func (it *CounterSeriesIterator) Seek(x int64) bool {
}
}

func (it *CounterSeriesIterator) Err() error {
func (it *ApplyCounterResetsSeriesIterator) Err() error {
if it.i >= len(it.chks) {
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestDownsampleCounterBoundaryReset(t *testing.T) {
iters = append(iters, chk.Iterator(nil))
}

citer := NewCounterSeriesIterator(iters...)
citer := NewApplyCounterResetsIterator(iters...)
for citer.Next() {
t, v := citer.At()
res = append(res, sample{t: t, v: v})
Expand Down Expand Up @@ -592,7 +592,7 @@ var (
}
)

func TestCounterAggegationIterator(t *testing.T) {
func TestApplyCounterResetsIterator(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

for _, tcase := range []struct {
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestCounterAggegationIterator(t *testing.T) {
its = append(its, newSampleIterator(c))
}

x := NewCounterSeriesIterator(its...)
x := NewApplyCounterResetsIterator(its...)

var res []sample
for x.Next() {
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestCounterSeriesIteratorSeek(t *testing.T) {
}

var res []sample
x := NewCounterSeriesIterator(its...)
x := NewApplyCounterResetsIterator(its...)

ok := x.Seek(150)
testutil.Assert(t, ok, "Seek should return true")
Expand All @@ -718,7 +718,7 @@ func TestCounterSeriesIteratorSeekExtendTs(t *testing.T) {
its = append(its, newSampleIterator(c))
}

x := NewCounterSeriesIterator(its...)
x := NewApplyCounterResetsIterator(its...)

ok := x.Seek(500)
testutil.Assert(t, !ok, "Seek should return false")
Expand All @@ -738,7 +738,7 @@ func TestCounterSeriesIteratorSeekAfterNext(t *testing.T) {
}

var res []sample
x := NewCounterSeriesIterator(its...)
x := NewApplyCounterResetsIterator(its...)

x.Next()

Expand Down
Loading