Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial benchmarks for deduplicate filter. #2118

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 136 additions & 15 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -418,49 +419,78 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync
// DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data.
type DeduplicateFilter struct {
DuplicateIDs []ulid.ULID

// Micro optimizations: root and metaSlice to reuse.
root *Node
metaSlice []*metadata.Meta
}

// NewDeduplicateFilter creates DeduplicateFilter.
func NewDeduplicateFilter() *DeduplicateFilter {
return &DeduplicateFilter{}
return &DeduplicateFilter{
root: NewNode(&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}),
}
}

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
// TODO(bwplotka): change arg to have sorted meta instead?
// TODO(bwplotka): Explore what we can improve here - it's bit slow.
func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
root := NewNode(&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
})
var wg sync.WaitGroup

for _, res := range []int64{
int64(0), int64(5 * 60 * 1000), int64(60 * 60 * 1000),
} {
wg.Add(1)
go func(res int64) {
defer wg.Done()
f.filter(metas, res, synced)
}(res)
}

wg.Wait()
}

func (f *DeduplicateFilter) filter(metas map[ulid.ULID]*metadata.Meta, resolution int64, synced GaugeLabeled) {
if cap(f.metaSlice) < len(metas) {
f.metaSlice = make([]*metadata.Meta, 0, len(metas))
}

metaSlice := []*metadata.Meta{}
for _, meta := range metas {
metaSlice = append(metaSlice, meta)
f.metaSlice = append(f.metaSlice, meta)
}
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
sort.Slice(f.metaSlice, func(i, j int) bool {
ilen := len(f.metaSlice[i].Compaction.Sources)
jlen := len(f.metaSlice[j].Compaction.Sources)

if ilen == jlen {
return metaSlice[i].ULID.Compare(metaSlice[j].ULID) < 0
return f.metaSlice[i].ULID.Compare(f.metaSlice[j].ULID) < 0
}

return ilen-jlen > 0
})

for _, meta := range metaSlice {
addNodeBySources(root, NewNode(meta))
for _, meta := range f.metaSlice {
addNodeBySources(f.root, NewNode(meta))
}

duplicateULIDs := getNonRootIDs(root)
duplicateULIDs := getNonRootIDs(f.root)
for _, id := range duplicateULIDs {
if metas[id] != nil {
f.DuplicateIDs = append(f.DuplicateIDs, id)
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, id)
}

// Clean the children, while preserving underlying allocated array.
f.root.Children = f.root.Children[:0]
f.metaSlice = f.metaSlice[:0]
}

func addNodeBySources(root *Node, add *Node) bool {
Expand Down Expand Up @@ -491,7 +521,38 @@ func addNodeBySources(root *Node, add *Node) bool {
return addNodeBySources(rootNode, add)
}

// S1 contains S2. We expect slices to be sorted.
// Benchmark shows literally NO improvement over contains2. Maybe this is not bottleneck?
// TODO(bwplotka): Explore more. Check CPU profiling if that is really bottleneck.
func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
i := 0

// Quick path:
if len(s2) > len(s1) {
return false
}
for _, uid2 := range s2 {
for i < len(s1) {

cmp := uid2.Compare(s1[i])
if cmp < 0 {
return false
}
i++
if cmp == 0 {
break
}

// cmp > 0
if len(s1) == i {
return false
}
}
}
return true
}

func contains2(s1 []ulid.ULID, s2 []ulid.ULID) bool {
for _, a := range s2 {
found := false
for _, e := range s1 {
Expand All @@ -506,3 +567,63 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
}
return true
}

// This is something that was before deduplicate block filter.
// TODO: Explore if this is better than what we have now; Right not allocations are too huge.
// TODO: Extend to support all test cases.
type DeduplicateFilterV0 struct {
DuplicateIDs []ulid.ULID
}

func NewDeduplicateFilterV0() *DeduplicateFilterV0 {
return &DeduplicateFilterV0{}
}

func (f *DeduplicateFilterV0) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
// Map each block to its highest priority parent. Initial blocks have themselves
// in their source section, i.e. are their own parent.
parents := make(map[ulid.ULID]ulid.ULID)

for id, meta := range metas {
// For each source block we contain, check whether we are the highest priority parent block.
for _, sid := range meta.Compaction.Sources {
pid, ok := parents[sid]
// No parents for the source block so far.
if !ok {
parents[sid] = id
continue
}
pmeta, ok := metas[parents[sid]]
if !ok {
panic(fmt.Sprintf("previous parent block %s not found", pid))
}
// The current block is the higher priority parent for the source if its
// compaction level is higher than that of the previously set parent.
// If compaction levels are equal, the more recent ULID wins.
//
// The ULID recency alone is not sufficient since races, e.g. induced
// by downtime of garbage collection, may re-compact blocks that are
// were already compacted into higher-level blocks multiple times.
level, plevel := meta.Compaction.Level, pmeta.Compaction.Level

if level > plevel || (level == plevel && id.Compare(pid) > 0) {
parents[sid] = id
}
}
}

// A block can safely be deleted if they are not the highest priority parent for
// any source block.
topParents := map[ulid.ULID]struct{}{}
for _, pid := range parents {
topParents[pid] = struct{}{}
}

for id := range metas {
if _, ok := topParents[id]; ok {
continue
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, id)
}
}
Loading