Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: use loser trees #7304

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions pkg/losertree/tree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Original version copyright Bryan Boreham, 2024.
// https://github.com/bboreham/go-loser/tree/any.
// Loser tree, from https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree
yeya24 marked this conversation as resolved.
Show resolved Hide resolved

package losertree

type Sequence interface {
Next() bool // Advances and returns true if there is a value at this new position.
}

func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E, E) bool, close func(S)) *Tree[E, S] {
nSequences := len(sequences)
t := Tree[E, S]{
maxVal: maxVal,
at: at,
less: less,
close: close,
nodes: make([]node[E, S], nSequences*2),
}
for i, s := range sequences {
t.nodes[i+nSequences].items = s
t.moveNext(i + nSequences) // Must call Next on each item so that At() has a value.
}
if nSequences > 0 {
t.nodes[0].index = -1 // flag to be initialized on first call to Next().
}
return &t
}

// Call the close function on all sequences that are still open.
func (t *Tree[E, S]) Close() {
for _, e := range t.nodes[len(t.nodes)/2 : len(t.nodes)] {
if e.index == -1 {
continue
}
t.close(e.items)
}
}

// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2.
// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1.
// Node 0 is a special node, containing the winner of the contest.
type Tree[E any, S Sequence] struct {
maxVal E
at func(S) E
less func(E, E) bool
close func(S) // Called when Next() returns false.
nodes []node[E, S]
}

type node[E any, S Sequence] struct {
index int // This is the loser for all nodes except the 0th, where it is the winner.
value E // Value copied from the loser node, or winner for node 0.
items S // Only populated for leaf nodes.
}

func (t *Tree[E, S]) moveNext(index int) bool {
n := &t.nodes[index]
if n.items.Next() {
n.value = t.at(n.items)
return true
}
t.close(n.items) // Next() returned false; close it and mark as finished.
n.value = t.maxVal
n.index = -1
return false
}

func (t *Tree[E, S]) Winner() S {
return t.nodes[t.nodes[0].index].items
}

func (t *Tree[E, S]) At() E {
return t.nodes[0].value
}

func (t *Tree[E, S]) Next() bool {
nodes := t.nodes
if len(nodes) == 0 {
return false
}
if nodes[0].index == -1 { // If tree has not been initialized yet, do that.
t.initialize()
return nodes[nodes[0].index].index != -1
}
if nodes[nodes[0].index].index == -1 { // already exhausted.
return false
}
t.moveNext(nodes[0].index)
t.replayGames(nodes[0].index)
return nodes[nodes[0].index].index != -1
}

// Current winner has been advanced independently; fix up the loser tree.
func (t *Tree[E, S]) Fix(closed bool) {
nodes := t.nodes
cur := &nodes[nodes[0].index]
if closed {
cur.value = t.maxVal
cur.index = -1
} else {
cur.value = t.at(cur.items)
}
t.replayGames(nodes[0].index)
}

func (t *Tree[E, S]) IsEmpty() bool {
nodes := t.nodes
if nodes[0].index == -1 { // If tree has not been initialized yet, do that.
t.initialize()
}
return nodes[nodes[0].index].index == -1
}

func (t *Tree[E, S]) initialize() {
winner := t.playGame(1)
t.nodes[0].index = winner
t.nodes[0].value = t.nodes[winner].value
}

// Find the winner at position pos; if it is a non-leaf node, store the loser.
// pos must be >= 1 and < len(t.nodes).
func (t *Tree[E, S]) playGame(pos int) int {
nodes := t.nodes
if pos >= len(nodes)/2 {
return pos
}
left := t.playGame(pos * 2)
right := t.playGame(pos*2 + 1)
var loser, winner int
if t.less(nodes[left].value, nodes[right].value) {
loser, winner = right, left
} else {
loser, winner = left, right
}
nodes[pos].index = loser
nodes[pos].value = nodes[loser].value
return winner
}

// Starting at pos, which is a winner, re-consider all values up to the root.
func (t *Tree[E, S]) replayGames(pos int) {
nodes := t.nodes
winningValue := nodes[pos].value
for n := parent(pos); n != 0; n = parent(n) {
node := &nodes[n]
if t.less(node.value, winningValue) {
// Record pos as the loser here, and the old loser is the new winner.
node.index, pos = pos, node.index
node.value, winningValue = winningValue, node.value
}
}
// pos is now the winner; store it in node 0.
nodes[0].index = pos
nodes[0].value = winningValue
}

func parent(i int) int { return i >> 1 }
124 changes: 124 additions & 0 deletions pkg/losertree/tree_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Original version copyright Bryan Boreham, 2024.
// https://github.com/bboreham/go-loser/tree/any.
package losertree

import (
"math"
"testing"
)

type List struct {
list []uint64
cur uint64
}

func NewList(list ...uint64) *List {
return &List{list: list}
}

func (it *List) At() uint64 {
return it.cur
}

func (it *List) Next() bool {
if len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
return true
}
it.cur = 0
return false
}

func (it *List) Seek(val uint64) bool {
for it.cur < val && len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
}
return len(it.list) > 0
}

func checkIterablesEqual[E any, S1 Sequence, S2 Sequence](t *testing.T, a S1, b S2, at1 func(S1) E, at2 func(S2) E, less func(E, E) bool) {
t.Helper()
count := 0
for a.Next() {
count++
if !b.Next() {
t.Fatalf("b ended before a after %d elements", count)
}
if less(at1(a), at2(b)) || less(at2(b), at1(a)) {
t.Fatalf("position %d: %v != %v", count, at1(a), at2(b))
}
}
if b.Next() {
t.Fatalf("a ended before b after %d elements", count)
}
}

var testCases = []struct {
name string
args []*List
want *List
}{
{
name: "empty input",
want: NewList(),
},
{
name: "one list",
args: []*List{NewList(1, 2, 3, 4)},
want: NewList(1, 2, 3, 4),
},
{
name: "two lists",
args: []*List{NewList(3, 4, 5), NewList(1, 2)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "two lists, first empty",
args: []*List{NewList(), NewList(1, 2)},
want: NewList(1, 2),
},
{
name: "two lists, second empty",
args: []*List{NewList(1, 2), NewList()},
want: NewList(1, 2),
},
{
name: "two lists b",
args: []*List{NewList(1, 2), NewList(3, 4, 5)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "two lists c",
args: []*List{NewList(1, 3), NewList(2, 4, 5)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "three lists",
args: []*List{NewList(1, 3), NewList(2, 4), NewList(5)},
want: NewList(1, 2, 3, 4, 5),
},
}

func TestMerge(t *testing.T) {
at := func(s *List) uint64 { return s.At() }
less := func(a, b uint64) bool { return a < b }
at2 := func(s *Tree[uint64, *List]) uint64 { return s.Winner().At() }
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
numCloses := 0
closeFn := func(_ *List) {
numCloses++
}
lt := New(tt.args, math.MaxUint64, at, less, closeFn)
checkIterablesEqual(t, tt.want, lt, at, at2, less)
if numCloses != len(tt.args) {
t.Errorf("Expected %d closes, got %d", len(tt.args), numCloses)
}
})
}
}
7 changes: 1 addition & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,13 +1640,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
defer func() {
for _, resp := range respSets {
resp.Close()
}
}()
begin := time.Now()
set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...))
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
for set.Next() {
at := set.At()
warn := at.GetWarning()
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))

respHeap := NewDedupResponseHeap(NewProxyResponseHeap(storeResponses...))
respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...))
for respHeap.Next() {
resp := respHeap.At()

Expand Down
Loading
Loading