Skip to content

Commit

Permalink
store: Fixed critical bug, when certain not-existing value queried wa…
Browse files Browse the repository at this point in the history
…s causing "invalid size" error.

Reason why we could not reproduce it locally was that for most of non-existing value
we were lucky that buffer was still long enough and we could read and decode some (malformed) variadic type.
For certain rare cases, buffer was not long enough.

Fixed and spotted thanks to amazing @mkabischev!

* Added more regression tests for binary header.

Without the fix it fails with:
```
            header_test.go:154: header_test.go:154:

                	exp: range not found

                	got: get postings offset entry: invalid size
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 8, 2020
1 parent c0b9179 commit c5f4081
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -26,6 +26,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2319](https://github.com/thanos-io/thanos/pull/2319) Query: fixed inconsistent naming of metrics.
- [#2390](https://github.com/thanos-io/thanos/pull/2390) Store: Fixed bug which was causing all posting offsets to be used instead of 1/32 as it was meant.
Added hidden flag to control this behavior.
- [#2393](https://github.com/thanos-io/thanos/pull/2393) Store: Fixed bug causing certain not-existing label values queried to fail with "invalid-size" error from binary header.

### Added

Expand Down
111 changes: 77 additions & 34 deletions pkg/block/indexheader/binary_reader.go
Expand Up @@ -653,6 +653,18 @@ func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, er
return rngs[0], nil
}

func skipNAndName(d *encoding.Decbuf, buf *int) {
if *buf == 0 {
// Keycount+LabelName are always the same number of bytes,
// and it's faster to skip than parse.
*buf = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
*buf -= d.Len()
return
}
d.Skip(*buf)
}
func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
rngs := make([]index.Range, 0, len(values))
if r.indexVersion == index.FormatV1 {
Expand All @@ -679,76 +691,107 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
return nil, nil
}

skip := 0
buf := 0
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
valueIndex++
}

var tmpRngs []index.Range // The start, end offsets in the postings table in the original index file.
var newSameRngs []index.Range // The start, end offsets in the postings table in the original index file.
for valueIndex < len(values) {
value := values[valueIndex]
wantedValue := values[valueIndex]

i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= value })
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
break
}
if i > 0 && e.offsets[i].value != value {
if i > 0 && e.offsets[i].value != wantedValue {
// Need to look from previous entry.
i--
}

// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e.offsets[i].tableOff)

tmpRngs = tmpRngs[:0]
// Iterate on the offset table.
newSameRngs = newSameRngs[:0]
for d.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
// Posting format entry is as follows:
// │ ┌────────────────────────────────────────┐ │
// │ │ n = 2 <1b> │ │
// │ ├──────────────────────┬─────────────────┤ │
// │ │ len(name) <uvarint> │ name <bytes> │ │
// │ ├──────────────────────┼─────────────────┤ │
// │ │ len(value) <uvarint> │ value <bytes> │ │
// │ ├──────────────────────┴─────────────────┤ │
// │ │ offset <uvarint64> │ │
// │ └────────────────────────────────────────┘ │
// First, let's skip n and name.
skipNAndName(&d, &buf)
value := d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64())

if len(newSameRngs) > 0 {
// We added some ranges in previous iteration. Use next posting offset as end of all our new ranges.
for j := range newSameRngs {
newSameRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, newSameRngs...)
newSameRngs = newSameRngs[:0]
}
v := d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for string(v) >= value {
if string(v) == value {
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + postingLengthFieldSize})

for string(value) >= wantedValue {
// If wantedValue is equals of greater than current value, loop over all given wanted values in the values until
// this is no longer true or there are no more values wanted.
// This ensures we cover case when someone asks for postingsOffset(name, value1, value1, value1).

// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
}
valueIndex++
if valueIndex == len(values) {
break
}
value = values[valueIndex]
wantedValue = values[valueIndex]
}

if i+1 == len(e.offsets) {
for i := range tmpRngs {
tmpRngs[i].End = e.lastValOffset
// No more offsets for this name.
// Break this loop and record lastOffset on the way for ranges we just added if any.
for j := range newSameRngs {
newSameRngs[j].End = e.lastValOffset
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
rngs = append(rngs, newSameRngs...)
break
}

if value >= e.offsets[i+1].value || valueIndex == len(values) {
d.Skip(skip)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for j := range tmpRngs {
tmpRngs[j].End = postingOffset - crc32.Size
if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}

// Nothing wanted or wantedValue is larger than next offset we know about.
// Let's exit and do binary search again / exit if nothing wanted.

if len(newSameRngs) > 0 {
// We added some ranges in this iteration. Use next posting offset as the end of our ranges.
// We know it exists as we never go further in this loop than e.offsets[i, i+1].

skipNAndName(&d, &buf)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64())

for j := range newSameRngs {
newSameRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
break
rngs = append(rngs, newSameRngs...)
}
break
}
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings offset entry")
Expand Down
57 changes: 40 additions & 17 deletions pkg/block/indexheader/header_test.go
Expand Up @@ -55,6 +55,7 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "12"}},
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)

Expand Down Expand Up @@ -110,12 +111,14 @@ func TestReaders(t *testing.T) {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc)
testutil.Equals(t, int64(666), br.indexLastPostingEnd)
testutil.Equals(t, int64(710), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 416,
lastValOffset: 440,
},
"a": {
offsets: []postingOffset{
Expand All @@ -125,15 +128,44 @@ func TestReaders(t *testing.T) {
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 612,
lastValOffset: 640,
},
"longer-string": {
offsets: []postingOffset{{value: "1", tableOff: 96}},
lastValOffset: 662,
offsets: []postingOffset{
{value: "1", tableOff: 96},
{value: "2", tableOff: 115},
},
lastValOffset: 706,
},
}, br.postings)
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))

vals, err := br.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
_, err = br.PostingsOffset("not-existing", "1")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "0")
testutil.Equals(t, NotFoundRangeErr, err)
// Unlucky case, because the bug was causing unnecessary read & decode requiring more bytes than
// available. For rest cases read was noop wrong, but at least not failing.
_, err = br.PostingsOffset("a", "10")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "121")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "131")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "91")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "0")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "11")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "21")
testutil.Equals(t, NotFoundRangeErr, err)
}

compareIndexToHeader(t, b, br)
Expand All @@ -151,7 +183,7 @@ func TestReaders(t *testing.T) {
if id == id1 {
testutil.Equals(t, 14, len(jr.symbols))
testutil.Equals(t, 2, len(jr.lvals))
testutil.Equals(t, 14, len(jr.postings))
testutil.Equals(t, 15, len(jr.postings))
}

compareIndexToHeader(t, b, jr)
Expand Down Expand Up @@ -251,15 +283,6 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe
testutil.Ok(t, err)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

// Check not existing.
vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)
_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
_, err = headerReader.PostingsOffset("a", "10")
testutil.NotOk(t, err)
}

func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta {
Expand Down

0 comments on commit c5f4081

Please sign in to comment.