Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Postings fetching optimizations #2294

Merged
merged 19 commits into from
Mar 23, 2020
Merged
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 131 additions & 87 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,10 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR
func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) {
var postingGroups []*postingGroup
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

allRequested := false
hasAdds := false
keys := []labels.Label(nil)

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
// Each group is separate to tell later what postings are intersecting with what.
Expand All @@ -1320,23 +1324,72 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return nil, errors.Wrap(err, "toPostingGroup")
}

// Intersection with empty postings would return no postings anyway.
if pg.alwaysEmptyPostings() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if pg.alwaysEmptyPostings() {
if pg.IsEmptyPostings() {

Copy link
Contributor Author

@pstibrany pstibrany Mar 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsEmptyPostings is a different guarantee... there can be labels that are not present in the block, and will also return empty postings. IsEmptyPostings name would suggest that it returns true in such case, but it doesn't. On the other hand, IsAlwaysEmptyPostings (new suggested name) says that this will be empty no matter the block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method in favor of checking for specific emptyPostingsGroup sentinel value.

Copy link
Contributor Author

@pstibrany pstibrany Mar 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now removed check for sentinel value again, since it's incorrect. We really need to check if group is empty. There are other ways how group can be empty, e.g. by using label="non-existant-value" matcher.

return nil, nil
}

postingGroups = append(postingGroups, pg)
allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later,
// since we don't build any label -> keys index map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// since we don't build any label -> keys index map.
// to avoid building label -> keys index map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to say that "because we don't build label -> keys map, we must iterate in the same order".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified the comment a bit.

keys = append(keys, pg.addKeys...)
keys = append(keys, pg.removeKeys...)
}

if len(postingGroups) == 0 {
return nil, nil
}

if err := r.fetchPostings(postingGroups); err != nil {
allKeyIndex := -1
// we only need All postings if there are no other adds. If there are, we can skip fetching ALL postings completely.
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if allRequested && !hasAdds {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// Remember the index (will be used later as a flag, and also to access postings),
// and ask fetchPostings to fetch ALL postings too.
allKeyIndex = len(keys)
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
keys = append(keys, getAllPostingsKeyLabel())
}

fetchedPostings, err := r.fetchPostings(keys)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

pstibrany marked this conversation as resolved.
Show resolved Hide resolved
var postings []index.Postings
// get add/remove postings from groups. While we iterate over postingGroups and their keys
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// again, this is exactly the same order as before, when building the groups, so we can simply
// use one incrementing index to fetch postings from returned slice.
postingIndex := 0

var groupAdds, removals []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
// We cannot add empty set to groupAdds, since they are intersected.
if len(g.addKeys) > 0 {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
var toMerge []index.Postings
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
for _, l := range g.addKeys {
toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}

groupAdds = append(groupAdds, index.Merge(toMerge...))
}

for _, l := range g.removeKeys {
removals = append(removals, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
if allKeyIndex >= 0 {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
// If we have fetched "ALL" postings, add it.
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex]))
}

result := index.Without(index.Intersect(groupAdds...), index.Merge(removals...))

ps, err := index.ExpandPostings(result)
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1352,150 +1405,141 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return ps, nil
}

type postingGroup struct {
keys labels.Labels
postings []index.Postings
func getAllPostingsKeyLabel() labels.Label {
name, value := index.AllPostingsKey()
return labels.Label{Name: name, Value: value}
}

aggregate func(postings []index.Postings) index.Postings
// Logical result of each individual group is:
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// If addAll is set: ALL postings minus postings for removeKeys labels. (No need to merge postings for addKeys in this case)
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
// This happens in ExpandedPostings.
type postingGroup struct {
addAll bool
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
addKeys []labels.Label
removeKeys []labels.Label
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
addAll: addAll,
addKeys: addKeys,
removeKeys: removeKeys,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
// returns true, if this postingGroup will always return empty postings.
func (p *postingGroup) alwaysEmptyPostings() bool {
return !p.addAll && len(p.addKeys) == 0
}

func (p *postingGroup) Postings() index.Postings {
if len(p.keys) == 0 {
return index.EmptyPostings()
func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if p == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l))
}

for i, posting := range p.postings {
if posting == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i]))
}
}

return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
return p
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
var matchingLabels labels.Labels
// This matches all values, including no value. If it is the only matcher, it will return all postings.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matches all values including no value??? (:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code line makes sense but I don't get the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about now?

if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return newPostingGroup(true, nil, nil), nil
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

// NOT matching any value = match nothing. We can shortcut this easily.
if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return newPostingGroup(false, nil, nil), nil
}
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
if m.Matches("") {
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toRemove []labels.Label
for _, val := range vals {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge), nil
}

return newPostingGroup(matchingLabels, allWithout), nil
return newPostingGroup(true, nil, toRemove), nil
}

// Fast-path for equal matching.
if m.Type == labels.MatchEqual {
return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil
}

vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toAdd []labels.Label
for _, val := range vals {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
}
}

return newPostingGroup(matchingLabels, merge), nil
return newPostingGroup(false, toAdd, nil), nil
}

type postingPtr struct {
groupID int
keyID int
ptr index.Range
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
var ptrs []postingPtr

// Fetch postings from the cache with a single call.
keys := make([]labels.Label, 0)
for _, g := range groups {
keys = append(keys, g.keys...)
}
output := make([]index.Postings, len(keys))

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys)

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
for ix, key := range keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

g.Fill(j, l)
continue
_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
output[ix] = l
continue
}

if err != nil {
return errors.Wrap(err, "index header PostingsOffset")
}
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
output[ix] = index.EmptyPostings()
continue
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
if err != nil {
return nil, errors.Wrap(err, "index header PostingsOffset")
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1543,8 +1587,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:]))
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes)
output[p.keyID] = newBigEndianPostings(pBytes[4:])
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1555,7 +1599,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
})
}

return g.Wait()
return output, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
Expand Down