Skip to content

Commit

Permalink
feat: add Seek method to keyvalue.Iterator interface (#3211)
Browse files Browse the repository at this point in the history
This is necessary to provide an efficient paging implementation for the
columnar serving formats (#2930).
  • Loading branch information
schroederc committed Nov 1, 2018
1 parent c3373a2 commit 753c91a
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 0 deletions.
23 changes: 23 additions & 0 deletions kythe/go/storage/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inmemory

import (
"context"
"fmt"
"io"
"sort"
"strings"
Expand Down Expand Up @@ -159,6 +160,17 @@ func (p *kvPrefixIterator) Next() (key, val []byte, err error) {
return []byte(k), []byte(v), nil
}

// Seek implements part of the keyvalue.Iterator interface.
func (p *kvPrefixIterator) Seek(k []byte) error {
s := string(k)
i := sort.Search(len(p.db.keys), func(i int) bool { return strings.Compare(p.db.keys[i], s) >= 0 })
if i < p.idx {
return fmt.Errorf("given key before current iterator position: %q", k)
}
p.idx = i
return nil
}

// Close implements part of the keyvalue.Iterator interface.
func (p *kvPrefixIterator) Close() error {
p.db.mu.RUnlock()
Expand Down Expand Up @@ -191,6 +203,17 @@ func (p *kvRangeIterator) Next() (key, val []byte, err error) {
return []byte(k), []byte(v), nil
}

// Seek implements part of the keyvalue.Iterator interface.
func (p *kvRangeIterator) Seek(k []byte) error {
s := string(k)
i := sort.Search(len(p.db.keys), func(i int) bool { return strings.Compare(p.db.keys[i], s) >= 0 })
if i < p.idx {
return fmt.Errorf("given key before current iterator position: %q", k)
}
p.idx = i
return nil
}

// Close implements part of the keyvalue.Iterator interface.
func (p *kvRangeIterator) Close() error {
p.db.mu.RUnlock()
Expand Down
103 changes: 103 additions & 0 deletions kythe/go/storage/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,109 @@ func TestKeyValueDB_scanRange(t *testing.T) {
}
}

func TestKeyValueDB_scanPrefixSeek(t *testing.T) {
db := NewKeyValueDB()

entries := []entry{
{"k4", "val4"},
{"k3", "val3"},
{"k2", "val2"},
}
writeEntries(t, db, entries)
writeEntries(t, db, []entry{
{"k", "val"},
{"k0", "val0"},
{"k1", "val1"},
{"j1", "val1"},
{"j0", "val0"},
})

it, err := db.ScanPrefix(ctx, []byte("k"), nil)
if err != nil {
t.Fatalf("ScanPrefix error: %v", err)
}

// Seek past k1 key
if err := it.Seek([]byte("k10")); err != nil {
t.Fatalf("Seek error: %v", err)
}

var found []entry
for {
k, v, err := it.Next()
if err == io.EOF {
break
}
found = append(found, entry{string(k), string(v)})
}

if err := it.Close(); err != nil {
t.Fatalf("Iterator close error: %v", err)
}

sort.Slice(entries, func(i, j int) bool { return entries[i].Key < entries[j].Key })
if diff := cmp.Diff(entries, found); diff != "" {
t.Fatalf("Found entry differences: (- expected; + found)\n%s", diff)
}

if err := db.Close(ctx); err != nil {
t.Fatalf("DB close error: %v", err)
}
}

func TestKeyValueDB_scanRangeSeek(t *testing.T) {
db := NewKeyValueDB()

entries := []entry{
{"k1", "val1"},
{"k2", "val2"},
}
writeEntries(t, db, entries)
writeEntries(t, db, []entry{
{"k0", "val0"},
{"k3", "val3"},
{"k", "val"},
{"j1", "val1"},
{"k4", "val4"},
{"j0", "val0"},
})

it, err := db.ScanRange(ctx, &keyvalue.Range{
Start: []byte("k0"),
End: []byte("k3"),
}, nil)
if err != nil {
t.Fatalf("ScanRange error: %v", err)
}

// Seek past k0 key
if err := it.Seek([]byte("k1")); err != nil {
t.Fatalf("Seek error: %v", err)
}

var found []entry
for {
k, v, err := it.Next()
if err == io.EOF {
break
}
found = append(found, entry{string(k), string(v)})
}

if err := it.Close(); err != nil {
t.Fatalf("Iterator close error: %v", err)
}

sort.Slice(entries, func(i, j int) bool { return entries[i].Key < entries[j].Key })
if diff := cmp.Diff(entries, found); diff != "" {
t.Fatalf("Found entry differences: (- expected; + found)\n%s", diff)
}

if err := db.Close(ctx); err != nil {
t.Fatalf("DB close error: %v", err)
}
}

func writeEntries(t *testing.T, db *KeyValueDB, entries []entry) {
for _, e := range entries {
write(t, db, e.Key, e.Value)
Expand Down
6 changes: 6 additions & 0 deletions kythe/go/storage/keyvalue/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type Iterator interface {
// entry. If there is no key-value entry to return, an io.EOF error is
// returned.
Next() (key, val []byte, err error)

// Seeks positions the Iterator to the given key. The key must be further
// than the current Iterator's position. If the key does not exist, the
// Iterator is positioned at the next existing key. If no such key exists,
// io.EOF is returned.
Seek(key []byte) error
}

// Writer provides write access to a DB. Writes must be Closed when no longer
Expand Down
12 changes: 12 additions & 0 deletions kythe/go/storage/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,15 @@ func (i iterator) Next() ([]byte, []byte, error) {
i.it.Next()
return key, val, nil
}

// Seek implements part of the keyvalue.Iterator interface.
func (i *iterator) Seek(k []byte) error {
i.it.Seek(k)
if !i.it.Valid() {
if err := i.it.GetError(); err != nil {
return err
}
return io.EOF
}
return nil
}

0 comments on commit 753c91a

Please sign in to comment.