Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Postings fetching optimizations #2294

Merged
merged 19 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.
- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02

Expand Down
217 changes: 128 additions & 89 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,12 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) {
var postingGroups []*postingGroup
var (
postingGroups []*postingGroup
allRequested = false
hasAdds = false
keys []labels.Label
)

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
Expand All @@ -1320,23 +1325,71 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return nil, errors.Wrap(err, "toPostingGroup")
}

// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
// postings would return no postings anyway.
// E.g. label="non-existing-value" returns empty group.
if !pg.addAll && len(pg.addKeys) == 0 {
return nil, nil
}

postingGroups = append(postingGroups, pg)
allRequested = allRequested || pg.addAll
hasAdds = hasAdds || len(pg.addKeys) > 0

// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
keys = append(keys, pg.addKeys...)
keys = append(keys, pg.removeKeys...)
}

if len(postingGroups) == 0 {
return nil, nil
}

if err := r.fetchPostings(postingGroups); err != nil {
// We only need special All postings if there are no other adds. If there are, we can skip fetching
// special All postings completely.
if allRequested && !hasAdds {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// add group with label to fetch "special All postings".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused why lint did not failed for this:

Suggested change
// add group with label to fetch "special All postings".
// Add group with label to fetch "special All postings".

name, value := index.AllPostingsKey()
allPostingsLabel := labels.Label{Name: name, Value: value}

postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil))
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(keys)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

pstibrany marked this conversation as resolved.
Show resolved Hide resolved
var postings []index.Postings
// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
// again, and this is exactly the same order as before (when building the groups), so we can simply
// use one incrementing index to fetch postings from returned slice.
postingIndex := 0

var groupAdds, groupRemovals []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
// We cannot add empty set to groupAdds, since they are intersected.
if len(g.addKeys) > 0 {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
toMerge := make([]index.Postings, 0, len(g.addKeys))
for _, l := range g.addKeys {
toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}

groupAdds = append(groupAdds, index.Merge(toMerge...))
}

for _, l := range g.removeKeys {
groupRemovals = append(groupRemovals, checkNilPosting(l, fetchedPostings[postingIndex]))
postingIndex++
}
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))

ps, err := index.ExpandPostings(result)
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1352,150 +1405,136 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
return ps, nil
}

// postingGroup keeps posting keys for single matcher. Logical result of the group is:
// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case.
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
// This computation happens in ExpandedPostings.
type postingGroup struct {
keys labels.Labels
postings []index.Postings

aggregate func(postings []index.Postings) index.Postings
addAll bool
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
addKeys []labels.Label
removeKeys []labels.Label
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
addAll: addAll,
addKeys: addKeys,
removeKeys: removeKeys,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
}

func (p *postingGroup) Postings() index.Postings {
if len(p.keys) == 0 {
return index.EmptyPostings()
func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if p == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l))
}

for i, posting := range p.postings {
if posting == nil {
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i]))
}
}

return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
return p
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
}
var (
allPostingsGroup = newPostingGroup(true, nil, nil)
emptyPostingsGroup = newPostingGroup(false, nil, nil)
)

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
var matchingLabels labels.Labels
// This matches any label value, and also series that don't have this label at all.
if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return allPostingsGroup, nil
}

// NOT matching any value = match nothing. We can shortcut this easily.
if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") {
return emptyPostingsGroup, nil
}

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
if m.Matches("") {
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toRemove []labels.Label
for _, val := range vals {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge), nil
}

return newPostingGroup(matchingLabels, allWithout), nil
return newPostingGroup(true, nil, toRemove), nil
}

// Fast-path for equal matching.
if m.Type == labels.MatchEqual {
return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil
}

vals, err := lvalsFn(m.Name)
if err != nil {
return nil, err
}

var toAdd []labels.Label
for _, val := range vals {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
}
}

return newPostingGroup(matchingLabels, merge), nil
return newPostingGroup(false, toAdd, nil), nil
}

type postingPtr struct {
groupID int
keyID int
ptr index.Range
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
var ptrs []postingPtr

// Fetch postings from the cache with a single call.
keys := make([]labels.Label, 0)
for _, g := range groups {
keys = append(keys, g.keys...)
}
output := make([]index.Postings, len(keys))

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys)

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
for ix, key := range keys {
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

g.Fill(j, l)
continue
_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
output[ix] = l
continue
}

if err != nil {
return errors.Wrap(err, "index header PostingsOffset")
}
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
if err == indexheader.NotFoundRangeErr {
// This block does not have any posting for given key.
output[ix] = index.EmptyPostings()
continue
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
if err != nil {
return nil, errors.Wrap(err, "index header PostingsOffset")
}

r.stats.postingsToFetch++
ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1543,8 +1582,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:]))
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes)
output[p.keyID] = newBigEndianPostings(pBytes[4:])
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1555,7 +1594,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
})
}

return g.Wait()
return output, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
Expand Down