Skip to content

Commit

Permalink
Merge pull request #9162 from influxdata/sgc-walkkeys
Browse files Browse the repository at this point in the history
improve startup performance
  • Loading branch information
stuartcarnie committed Nov 27, 2017
2 parents 3c9eb9d + c31fa86 commit f23dc15
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings
- [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.

### Bugfixes

Expand Down
45 changes: 6 additions & 39 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,49 +303,16 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e
return nil
}

readers := make([]chan seriesKey, 0, len(f.files))
done := make(chan struct{})
for _, f := range f.files {
ch := make(chan seriesKey, 1)
readers = append(readers, ch)

go func(c chan seriesKey, r TSMFile) {

start := 0
if len(seek) > 0 {
start = r.Seek(seek)
}
n := r.KeyCount()
for i := start; i < n; i++ {

key, typ := r.KeyAt(i)
select {
case <-done:
// Abort iteration
break
case c <- seriesKey{key, typ}:
}

}
close(ch)
}(ch, f)
}
ki := newMergeKeyIterator(f.files, seek)
f.mu.RUnlock()

merged := merge(readers...)
var err error
for v := range merged {
// Drain the remaing values so goroutines can exit
if err != nil {
continue
}
if err = fn(v.key, v.typ); err != nil {
// Signal that we should stop iterating
close(done)
for ki.Next() {
key, typ := ki.Read()
if err := fn(key, typ); err != nil {
return err
}
}

return err
return nil
}

// Keys returns all keys and types for all files in the file store.
Expand Down
112 changes: 112 additions & 0 deletions tsdb/engine/tsm1/file_store_key_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package tsm1

import (
"bytes"
"container/heap"
)

type keyIterator struct {
f TSMFile
c int // current key index
n int // key count
key []byte
typ byte
}

func newKeyIterator(f TSMFile, seek []byte) *keyIterator {
c, n := 0, f.KeyCount()
if len(seek) > 0 {
c = f.Seek(seek)
}

if c >= n {
return nil
}

k := &keyIterator{f: f, c: c, n: n}
k.next()

return k
}

func (k *keyIterator) next() bool {
if k.c < k.n {
k.key, k.typ = k.f.KeyAt(k.c)
k.c++
return true
}
return false
}

type mergeKeyIterator struct {
itrs keyIterators
key []byte
typ byte
}

func newMergeKeyIterator(files []TSMFile, seek []byte) *mergeKeyIterator {
m := &mergeKeyIterator{}
itrs := make(keyIterators, 0, len(files))
for _, f := range files {
if ki := newKeyIterator(f, seek); ki != nil {
itrs = append(itrs, ki)
}
}
m.itrs = itrs
heap.Init(&m.itrs)

return m
}

func (m *mergeKeyIterator) Next() bool {
merging := len(m.itrs) > 1

RETRY:
if len(m.itrs) == 0 {
return false
}

key, typ := m.itrs[0].key, m.itrs[0].typ
more := m.itrs[0].next()

switch {
case len(m.itrs) > 1:
if !more {
// remove iterator from heap
heap.Pop(&m.itrs)
} else {
heap.Fix(&m.itrs, 0)
}

case len(m.itrs) == 1:
if !more {
m.itrs = nil
}
}

if merging && bytes.Compare(m.key, key) == 0 {
// same as previous key, keep iterating
goto RETRY
}

m.key, m.typ = key, typ

return true
}

func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ }

type keyIterators []*keyIterator

func (k keyIterators) Len() int { return len(k) }
func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 }
func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) }

func (k *keyIterators) Pop() interface{} {
old := *k
n := len(old)
x := old[n-1]
*k = old[:n-1]
return x
}
198 changes: 198 additions & 0 deletions tsdb/engine/tsm1/file_store_key_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package tsm1

import (
"sort"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestNewMergeKeyIterator(t *testing.T) {
cases := []struct {
name string
seek string
files []TSMFile

exp []string
}{
{
name: "mixed",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "cccc", "dddd"},
[]string{"eeee", "ffff", "gggg"},
[]string{"aaaa"},
[]string{"dddd"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff", "gggg"},
},

{
name: "similar keys",
files: newTSMFiles(
[]string{"a", "aaa"},
[]string{"aa", "aaaa"},
),
exp: []string{"a", "aa", "aaa", "aaaa"},
},

{
name: "seek skips some files",
seek: "eeee",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "cccc", "dddd"},
[]string{"eeee", "ffff", "gggg"},
[]string{"aaaa"},
[]string{"dddd"},
),
exp: []string{"eeee", "ffff", "gggg"},
},

{
name: "keys same across all files",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd"},
},

{
name: "keys same across all files with extra",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
},

{
name: "seek skips all files",
seek: "eeee",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
),
exp: nil,
},

{
name: "keys sequential across all files",
files: newTSMFiles(
[]string{"a", "b", "c", "d"},
[]string{"e", "f", "g", "h"},
[]string{"i", "j", "k", "l"},
),
exp: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"},
},

{
name: "seek past one file",
seek: "e",
files: newTSMFiles(
[]string{"a", "b", "c", "d"},
[]string{"e", "f", "g", "h"},
[]string{"i", "j", "k", "l"},
),
exp: []string{"e", "f", "g", "h", "i", "j", "k", "l"},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ki := newMergeKeyIterator(tc.files, []byte(tc.seek))
var act []string
for ki.Next() {
key, _ := ki.Read()
act = append(act, string(key))
}
if !cmp.Equal(tc.exp, act) {
t.Error(cmp.Diff(tc.exp, act))
}
})
}

}

func newTSMFiles(keys ...[]string) []TSMFile {
var files []TSMFile
for _, k := range keys {
files = append(files, newMockTSMFile(k...))
}
return files
}

type mockTSMFile struct {
keys []string
}

func newMockTSMFile(keys ...string) *mockTSMFile {
sort.Strings(keys)
return &mockTSMFile{keys: keys}
}

func (t *mockTSMFile) KeyCount() int { return len(t.keys) }

func (t *mockTSMFile) Seek(key []byte) int {
k := string(key)
return sort.Search(len(t.keys), func(i int) bool {
return t.keys[i] >= k
})
}

func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) {
return []byte(t.keys[idx]), BlockFloat64
}

func (*mockTSMFile) Path() string { panic("implement me") }
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
func (*mockTSMFile) Remove() error { panic("implement me") }
func (*mockTSMFile) InUse() bool { panic("implement me") }
func (*mockTSMFile) Ref() { panic("implement me") }
func (*mockTSMFile) Unref() { panic("implement me") }
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
func (*mockTSMFile) Free() error { panic("implement me") }

func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
panic("implement me")
}

func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) {
panic("implement me")
}

func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) {
panic("implement me")
}

func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) {
panic("implement me")
}

func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
panic("implement me")
}

0 comments on commit f23dc15

Please sign in to comment.