Skip to content

Commit

Permalink
store: Fixed critical bug, when certain not-existing value queried wa…
Browse files Browse the repository at this point in the history
…s causing "invalid size" error.

Reason why we could not reproduce it locally was that for most of non-existing value
we were lucky that buffer was still long enough and we could read and decode some (malformed) variadic type.
For certain rare cases, buffer was not long enough.

Fixed and spotted thanks to amazing @mkabischev!

* Added more regression tests for binary header.

Without the fix it fails with:
```
            header_test.go:154: header_test.go:154:

                	exp: range not found

                	got: get postings offset entry: invalid size
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 8, 2020
1 parent 7b9d72d commit 00f0184
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 45 deletions.
111 changes: 77 additions & 34 deletions pkg/block/indexheader/binary_reader.go
Expand Up @@ -647,6 +647,18 @@ func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, er
return rngs[0], nil
}

func skipNAndName(d *encoding.Decbuf, buf *int) {
if *buf == 0 {
// Keycount+LabelName are always the same number of bytes,
// and it's faster to skip than parse.
*buf = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
*buf -= d.Len()
return
}
d.Skip(*buf)
}
func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
rngs := make([]index.Range, 0, len(values))
if r.indexVersion == index.FormatV1 {
Expand All @@ -673,76 +685,107 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
return nil, nil
}

skip := 0
buf := 0
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
valueIndex++
}

var tmpRngs []index.Range // The start, end offsets in the postings table in the original index file.
var newSameRngs []index.Range // The start, end offsets in the postings table in the original index file.
for valueIndex < len(values) {
value := values[valueIndex]
wantedValue := values[valueIndex]

i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= value })
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
break
}
if i > 0 && e.offsets[i].value != value {
if i > 0 && e.offsets[i].value != wantedValue {
// Need to look from previous entry.
i--
}

// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e.offsets[i].tableOff)

tmpRngs = tmpRngs[:0]
// Iterate on the offset table.
newSameRngs = newSameRngs[:0]
for d.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
// Posting format entry is as follows:
// │ ┌────────────────────────────────────────┐ │
// │ │ n = 2 <1b> │ │
// │ ├──────────────────────┬─────────────────┤ │
// │ │ len(name) <uvarint> │ name <bytes> │ │
// │ ├──────────────────────┼─────────────────┤ │
// │ │ len(value) <uvarint> │ value <bytes> │ │
// │ ├──────────────────────┴─────────────────┤ │
// │ │ offset <uvarint64> │ │
// │ └────────────────────────────────────────┘ │
// First, let's skip n and name.
skipNAndName(&d, &buf)
value := d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64())

if len(newSameRngs) > 0 {
// We added some ranges in previous iteration. Use next posting offset as end of all our new ranges.
for j := range newSameRngs {
newSameRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, newSameRngs...)
newSameRngs = newSameRngs[:0]
}
v := d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for string(v) >= value {
if string(v) == value {
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + postingLengthFieldSize})

for string(value) >= wantedValue {
// If wantedValue is equals of greater than current value, loop over all given wanted values in the values until
// this is no longer true or there are no more values wanted.
// This ensures we cover case when someone asks for postingsOffset(name, value1, value1, value1).

// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
}
valueIndex++
if valueIndex == len(values) {
break
}
value = values[valueIndex]
wantedValue = values[valueIndex]
}

if i+1 == len(e.offsets) {
for i := range tmpRngs {
tmpRngs[i].End = e.lastValOffset
// No more offsets for this name.
// Break this loop and record lastOffset on the way for ranges we just added if any.
for j := range newSameRngs {
newSameRngs[j].End = e.lastValOffset
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
rngs = append(rngs, newSameRngs...)
break
}

if value >= e.offsets[i+1].value || valueIndex == len(values) {
d.Skip(skip)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for j := range tmpRngs {
tmpRngs[j].End = postingOffset - crc32.Size
if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}

// Nothing wanted or wantedValue is larger than next offset we know about.
// Let's exit and do binary search again / exit if nothing wanted.

if len(newSameRngs) > 0 {
// We added some ranges in this iteration. Use next posting offset as the end of our ranges.
// We know it exists as we never go further in this loop than e.offsets[i, i+1].

skipNAndName(&d, &buf)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64())

for j := range newSameRngs {
newSameRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
break
rngs = append(rngs, newSameRngs...)
}
break
}
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings offset entry")
Expand Down
75 changes: 64 additions & 11 deletions pkg/block/indexheader/header_test.go
Expand Up @@ -45,7 +45,17 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "a", Value: "5"}},
{{Name: "a", Value: "6"}},
{{Name: "a", Value: "7"}},
{{Name: "a", Value: "8"}},
{{Name: "a", Value: "9"}},
// Missing 10 on purpose.
{{Name: "a", Value: "11"}},
{{Name: "a", Value: "12"}},
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)

Expand Down Expand Up @@ -100,12 +110,62 @@ func TestReaders(t *testing.T) {
if id == id1 {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 50}, br.toc)
testutil.Equals(t, int64(330), br.indexLastPostingEnd)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc)
testutil.Equals(t, int64(710), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 3, len(br.postings))
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 440,
},
"a": {
offsets: []postingOffset{
{value: "1", tableOff: 9},
{value: "13", tableOff: 32},
{value: "4", tableOff: 54},
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 640,
},
"longer-string": {
offsets: []postingOffset{
{value: "1", tableOff: 96},
{value: "2", tableOff: 115},
},
lastValOffset: 706,
},
}, br.postings)

vals, err := br.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
_, err = br.PostingsOffset("not-existing", "1")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "0")
testutil.Equals(t, NotFoundRangeErr, err)
// Unlucky case, because the bug was causing unnecessary read & decode requiring more bytes than
// available. For rest cases read was noop wrong, but at least not failing.
_, err = br.PostingsOffset("a", "10")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "121")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "131")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("a", "91")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "0")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "11")
testutil.Equals(t, NotFoundRangeErr, err)
_, err = br.PostingsOffset("longer-string", "21")
testutil.Equals(t, NotFoundRangeErr, err)
}

compareIndexToHeader(t, b, br)
Expand All @@ -123,7 +183,7 @@ func TestReaders(t *testing.T) {
if id == id1 {
testutil.Equals(t, 6, len(jr.symbols))
testutil.Equals(t, 2, len(jr.lvals))
testutil.Equals(t, 6, len(jr.postings))
testutil.Equals(t, 15, len(jr.postings))
}

compareIndexToHeader(t, b, jr)
Expand Down Expand Up @@ -223,13 +283,6 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe
testutil.Ok(t, err)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
}

func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta {
Expand Down

0 comments on commit 00f0184

Please sign in to comment.