Skip to content

Commit

Permalink
tikvclient: Add endKey param to Scanner (#8178)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and tiancaiamao committed Nov 8, 2018
1 parent 81d2fff commit d125958
Show file tree
Hide file tree
Showing 34 changed files with 194 additions and 163 deletions.
10 changes: 5 additions & 5 deletions ddl/db_test.go
Expand Up @@ -1891,7 +1891,7 @@ func (s *testDBSuite) TestTruncateTable(c *C) {
hasOldTableData := true
for i := 0; i < waitForCleanDataRound; i++ {
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err1 := txn.Seek(tablePrefix)
it, err1 := txn.Iter(tablePrefix, nil)
if err1 != nil {
return err1
}
Expand Down Expand Up @@ -2817,7 +2817,7 @@ func (s *testDBSuite) TestAlterTableDropPartition(c *C) {

s.tk.MustExec("drop table if exists tr;")
s.tk.MustExec(` create table tr(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2907,7 +2907,7 @@ func checkPartitionDelRangeDone(c *C, s *testDBSuite, partitionPrefix kv.Key) bo
hasOldPartitionData := true
for i := 0; i < waitForCleanDataRound; i++ {
err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err := txn.Seek(partitionPrefix)
it, err := txn.Iter(partitionPrefix, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -2957,7 +2957,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test truncate table partition.
s.tk.MustExec("drop table if exists t3;")
s.tk.MustExec(`create table t3(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2995,7 +2995,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test drop table partition.
s.tk.MustExec("drop table if exists t4;")
s.tk.MustExec(`create table t4(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down
2 changes: 1 addition & 1 deletion ddl/delete_range.go
Expand Up @@ -154,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error {
iter, err := txn.Seek(oldStartKey)
iter, err := txn.Iter(oldStartKey, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Expand Up @@ -1216,7 +1216,7 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
it, err := snap.Iter(firstKey, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 // indirect
github.com/pingcap/errors v0.11.0
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26
github.com/pingcap/parser v0.0.0-20181102070703-4acd198f5092
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v0.0.0-20181101090416-cfac1096162e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -189,6 +189,8 @@ github.com/pingcap/kvproto v0.0.0-20180930052200-fae11119f066 h1:ulo0ph8sxCzY3GY
github.com/pingcap/kvproto v0.0.0-20180930052200-fae11119f066/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7 h1:CYssSnPvf90ZSbFdZpsZGSI7y+drG1EfKxqTOnKnHb0=
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26 h1:JK4VLNYbSn36QSbCnqALi2ySXdH0DfcMssT/zmLf4Ls=
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/parser v0.0.0-20181024082006-53ac409ed043 h1:P9Osi8lei5j2fiRgsBi2Wch7qe4a3yWUOsS5vSan/JU=
github.com/pingcap/parser v0.0.0-20181024082006-53ac409ed043/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20181102070703-4acd198f5092 h1:vGjjf7fhuaO9udn6QEFzvsNJDwVxFmdJvIJhCdCNe/E=
Expand Down
16 changes: 8 additions & 8 deletions kv/buffer_store.go
Expand Up @@ -74,26 +74,26 @@ func (s *BufferStore) Get(k Key) ([]byte, error) {
return val, nil
}

// Seek implements the Retriever interface.
func (s *BufferStore) Seek(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Seek(k)
// Iter implements the Retriever interface.
func (s *BufferStore) Iter(k Key, upperBound Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.Seek(k)
retrieverIt, err := s.r.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, false)
}

// SeekReverse implements the Retriever interface.
func (s *BufferStore) SeekReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.SeekReverse(k)
// IterReverse implements the Retriever interface.
func (s *BufferStore) IterReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.SeekReverse(k)
retrieverIt, err := s.r.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/buffer_store_test.go
Expand Up @@ -53,7 +53,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

iter, err := mutator.Seek(nil)
iter, err := mutator.Iter(nil, nil)
c.Check(err, IsNil)
for iter.Valid() {
cmp := bytes.Compare(iter.Key(), iter.Value())
Expand Down
10 changes: 6 additions & 4 deletions kv/kv.go
Expand Up @@ -79,15 +79,17 @@ type Retriever interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(k Key) ([]byte, error)
// Seek creates an Iterator positioned on the first entry that k <= entry's key.
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Seek(k Key) (Iterator, error)
Iter(k Key, upperBound Key) (Iterator, error)

// SeekReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
SeekReverse(k Key) (Iterator, error)
// TODO: Add lower bound limit
IterReverse(k Key) (Iterator, error)
}

// Mutator is the interface wraps the basic Set and Delete methods.
Expand Down
20 changes: 10 additions & 10 deletions kv/mem_buffer_test.go
Expand Up @@ -76,7 +76,7 @@ func valToStr(c *C, iter Iterator) string {
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
Expand All @@ -86,7 +86,7 @@ func checkNewIterator(c *C, buffer MemBuffer) {
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
Expand All @@ -102,15 +102,15 @@ func checkNewIterator(c *C, buffer MemBuffer) {
}

// Non exist and beyond maximum seek test
iter, err := buffer.Seek(encodeInt(testCount * indexStep))
iter, err := buffer.Iter(encodeInt(testCount*indexStep), nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter, err = buffer.Seek(inBetween)
iter, err = buffer.Iter(inBetween, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), Not(BytesEquals), inBetween)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *testKVSuite) TestNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
// should be invalid
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

Expand All @@ -155,7 +155,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
insertData(c, buffer)

iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)

err = NextUntil(iter, func(k Key) bool {
Expand All @@ -168,7 +168,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
func (s *testKVSuite) TestBasicNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
it, err := buffer.Seek([]byte("2"))
it, err := buffer.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
}
Expand All @@ -193,15 +193,15 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}

cnt := 0
it, err := buffer.Seek(nil)
it, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)

it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
it, err = buffer.Iter([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"), nil)
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
if err != nil {
b.Error(err)
}
Expand Down
16 changes: 6 additions & 10 deletions kv/memdb_buffer.go
Expand Up @@ -50,14 +50,10 @@ func NewMemDbBuffer(cap int) MemBuffer {
}
}

// Seek creates an Iterator.
func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
var i Iterator
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: false}
} else {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k)}), reverse: false}
}
// Iter creates an Iterator.
func (m *memDbBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
i := &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k), Limit: []byte(upperBound)}), reverse: false}

err := i.Next()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -69,7 +65,7 @@ func (m *memDbBuffer) SetCap(cap int) {

}

func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) {
func (m *memDbBuffer) IterReverse(k Key) (Iterator, error) {
var i *memDbIter
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true}
Expand Down Expand Up @@ -161,7 +157,7 @@ func (i *memDbIter) Close() {

// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k Key, v []byte) error) error {
iter, err := memBuf.Seek(nil)
iter, err := memBuf.Iter(nil, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions kv/mock.go
Expand Up @@ -68,11 +68,11 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) Seek(k Key) (Iterator, error) {
func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}

func (t *mockTxn) SeekReverse(k Key) (Iterator, error) {
func (t *mockTxn) IterReverse(k Key) (Iterator, error) {
return nil, nil
}

Expand Down Expand Up @@ -211,10 +211,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) SeekReverse(k Key) (Iterator, error) {
return s.store.SeekReverse(k)
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}
4 changes: 2 additions & 2 deletions kv/mock_test.go
Expand Up @@ -46,8 +46,8 @@ func (s testMockSuite) TestInterface(c *C) {
if transaction.IsReadOnly() {
transaction.Get(Key("lock"))
transaction.Set(Key("lock"), []byte{})
transaction.Seek(Key("lock"))
transaction.SeekReverse(Key("lock"))
transaction.Iter(Key("lock"), nil)
transaction.IterReverse(Key("lock"))
}
transaction.Commit(context.Background())

Expand Down
8 changes: 4 additions & 4 deletions kv/union_store.go
Expand Up @@ -127,18 +127,18 @@ func (lmb *lazyMemBuffer) Delete(k Key) error {
return lmb.mb.Delete(k)
}

func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.Seek(k)
return lmb.mb.Iter(k, upperBound)
}

func (lmb *lazyMemBuffer) SeekReverse(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) IterReverse(k Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.SeekReverse(k)
return lmb.mb.IterReverse(k)
}

func (lmb *lazyMemBuffer) Size() int {
Expand Down
18 changes: 9 additions & 9 deletions kv/union_store_test.go
Expand Up @@ -63,46 +63,46 @@ func (s *testUnionStoreSuite) TestSeek(c *C) {
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))

iter, err := s.us.Seek(nil)
iter, err := s.us.Iter(nil, nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("1"), []byte("2"), []byte("3")}, [][]byte{[]byte("1"), []byte("2"), []byte("3")})

iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3")}, [][]byte{[]byte("2"), []byte("3")})

s.us.Set([]byte("4"), []byte("4"))
iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3"), []byte("4")}, [][]byte{[]byte("2"), []byte("3"), []byte("4")})

s.us.Delete([]byte("3"))
iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("4")}, [][]byte{[]byte("2"), []byte("4")})
}

func (s *testUnionStoreSuite) TestSeekReverse(c *C) {
func (s *testUnionStoreSuite) TestIterReverse(c *C) {
defer testleak.AfterTest(c)()
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))

iter, err := s.us.SeekReverse(nil)
iter, err := s.us.IterReverse(nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("3"), []byte("2"), []byte("1")}, [][]byte{[]byte("3"), []byte("2"), []byte("1")})

iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")})

s.us.Set([]byte("0"), []byte("0"))
iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1"), []byte("0")}, [][]byte{[]byte("2"), []byte("1"), []byte("0")})

s.us.Delete([]byte("1"))
iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("0")}, [][]byte{[]byte("2"), []byte("0")})
}
Expand Down

0 comments on commit d125958

Please sign in to comment.