From 8a3ba39622cd264924697ceff31f61735a51edad Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Mon, 10 Feb 2020 14:06:23 +1300 Subject: [PATCH] Implement scan request Signed-off-by: Nick Cameron --- kv/tikv/server.go | 4 +- kv/tikv/storage/commands/scan.go | 63 ++++++++ kv/tikv/storage/kvstore/scanner.go | 71 +++++++++ kv/tikv/storage/kvstore/transaction.go | 24 +-- kv/tikv/storage/kvstore/transaction_test.go | 10 +- kv/tikv/storage/rollback_test.go | 2 +- kv/tikv/storage/scan_test.go | 167 ++++++++++++++++++++ kv/util/engine_util/cf_iterator.go | 4 +- 8 files changed, 327 insertions(+), 18 deletions(-) create mode 100644 kv/tikv/storage/commands/scan.go create mode 100644 kv/tikv/storage/kvstore/scanner.go create mode 100644 kv/tikv/storage/scan_test.go diff --git a/kv/tikv/server.go b/kv/tikv/server.go index 7f41d61ce..13d3542f1 100644 --- a/kv/tikv/server.go +++ b/kv/tikv/server.go @@ -77,7 +77,9 @@ func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb } func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { - return nil, nil + cmd := commands.NewScan(req) + resp := <-svr.scheduler.Run(&cmd) + return resp.scanResponse() } func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { diff --git a/kv/tikv/storage/commands/scan.go b/kv/tikv/storage/commands/scan.go new file mode 100644 index 000000000..a2301b8fe --- /dev/null +++ b/kv/tikv/storage/commands/scan.go @@ -0,0 +1,63 @@ +package commands + +import ( + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore" + "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" +) + +type Scan struct { + request *kvrpcpb.ScanRequest + response *kvrpcpb.ScanResponse +} + +func NewScan(request *kvrpcpb.ScanRequest) Scan { + return Scan{request, &kvrpcpb.ScanResponse{}} +} + +func (s *Scan) BuildTxn(txn *kvstore.MvccTxn) error { + txn.StartTS = &s.request.Version + + iter := kvstore.NewScanner(s.request.StartKey, txn) + limit := s.request.Limit + for { + if limit == 0 { + // We've scanned up to the requested limit. + return nil + } + limit -= 1 + key, value, err := iter.Next() + if err != nil { + return err + } + if key == nil { + // Reached the end of the DB + return nil + } + + pair := kvrpcpb.KvPair{} + pair.Key = key + pair.Value = value + s.response.Pairs = append(s.response.Pairs, &pair) + } +} + +func (s *Scan) Context() *kvrpcpb.Context { + return s.request.Context +} + +func (s *Scan) Response() interface{} { + return s.response +} + +func (s *Scan) HandleError(err error) interface{} { + if err == nil { + return nil + } + + if regionErr := extractRegionError(err); regionErr != nil { + s.response.RegionError = regionErr + return &s.response + } + + return nil +} diff --git a/kv/tikv/storage/kvstore/scanner.go b/kv/tikv/storage/kvstore/scanner.go new file mode 100644 index 000000000..ca6ed7f95 --- /dev/null +++ b/kv/tikv/storage/kvstore/scanner.go @@ -0,0 +1,71 @@ +package kvstore + +import ( + "github.com/pingcap-incubator/tinykv/kv/util/engine_util" +) + +// Scanner is used for reading multiple sequential key/value pairs from the storage layer. It is aware of the implementation +// of the storage layer and returns results suitable for users. +// Invariant: either the scanner is finished and can not be used, or it is ready to return a value immediately. +type Scanner struct { + writeIter engine_util.DBIterator + txn *MvccTxn +} + +// NewScanner creates a new scanner ready to read from the snapshot in txn. +func NewScanner(startKey []byte, txn *MvccTxn) *Scanner { + writeIter := txn.Reader.IterCF(engine_util.CfWrite) + writeIter.Seek(EncodeKey(startKey, TsMax)) + return &Scanner{ + writeIter: writeIter, + txn: txn, + } +} + +// Next returns the next key/value pair from the scanner. If the scanner is exhausted, then it will return `nil, nil, nil`. +func (scan *Scanner) Next() ([]byte, []byte, error) { + // Search for the next relevant key/value. + for { + if !scan.writeIter.Valid() { + // The underlying iterator is exhausted - we've reached the end of the DB. + return nil, nil, nil + } + + item := scan.writeIter.Item() + userKey := decodeUserKey(item.Key()) + commitTs := decodeTimestamp(item.Key()) + + if commitTs >= *scan.txn.StartTS { + // The key was not committed before our transaction started, find an earlier key. + scan.writeIter.Seek(EncodeKey(userKey, commitTs-1)) + continue + } + + // Note: we might check if userKey is locked (since we should not read an uncommitted transaction). However, + // because we are iterating over writes, we are guaranteed never to get a locked key at our timestamp (i.e., if + // the key were locked, then we would use the older value, which is what we will get via the write in any case). + + writeValue, err := item.Value() + if err != nil { + return nil, nil, err + } + write, err := ParseWrite(writeValue) + if err != nil { + return nil, nil, err + } + if write.Kind != WriteKindPut { + // Key is removed, go to next key. + scan.writeIter.Seek(EncodeKey(userKey, 0)) + continue + } + + value, err := scan.txn.GetValue(userKey, write.StartTS) + if err != nil { + return nil, nil, err + } + + scan.writeIter.Next() + + return userKey, value, nil + } +} diff --git a/kv/tikv/storage/kvstore/transaction.go b/kv/tikv/storage/kvstore/transaction.go index 0e3fa413c..63fc987de 100644 --- a/kv/tikv/storage/kvstore/transaction.go +++ b/kv/tikv/storage/kvstore/transaction.go @@ -13,7 +13,6 @@ import ( // MvccTxn represents an mvcc transaction (see tikv/storage/doc.go for a definition). It permits reading from a snapshot // and stores writes in a buffer for atomic writing. type MvccTxn struct { - // TODO: is reader a snapshot or not? Reader dbreader.DBReader Writes []inner_server.Modify StartTS *uint64 @@ -35,8 +34,8 @@ func (txn *MvccTxn) SeekWrite(key []byte, ts uint64) (*Write, uint64, error) { return nil, 0, nil } item := iter.Item() - commitTs := DecodeTimestamp(item.Key()) - if bytes.Compare(DecodeUserKey(item.Key()), key) != 0 { + commitTs := decodeTimestamp(item.Key()) + if bytes.Compare(decodeUserKey(item.Key()), key) != 0 { return nil, 0, nil } value, err := item.Value() @@ -60,7 +59,7 @@ func (txn *MvccTxn) FindWrittenValue(key []byte, ts uint64) ([]byte, error) { for iter.Seek(EncodeKey(key, ts)); iter.Valid(); iter.Next() { item := iter.Item() // If the user key part of the combined key has changed, then we've got to the next key without finding a put write. - if bytes.Compare(DecodeUserKey(item.Key()), key) != 0 { + if bytes.Compare(decodeUserKey(item.Key()), key) != 0 { return nil, nil } value, err := item.Value() @@ -104,7 +103,7 @@ func (txn *MvccTxn) FindWrite(key []byte, startTs uint64) (*Write, error) { } } -// GetWrite gets the value at precisely the given key and ts, without searching. +// GetWrite gets the write at precisely the given key and ts, without searching. func (txn *MvccTxn) GetWrite(key []byte, ts uint64) (*Write, error) { value, err := txn.Reader.GetCF(engine_util.CfWrite, EncodeKey(key, ts)) if err != nil { @@ -125,7 +124,7 @@ func (txn *MvccTxn) PutWrite(key []byte, write *Write, ts uint64) { }) } -// GetLock returns a lock if key is currently locked. It will return (nil, nil) if there is no lock on key, and (nil, err) +// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err) // if an error occurs during lookup. func (txn *MvccTxn) GetLock(key []byte) (*Lock, error) { bytes, err := txn.Reader.GetCF(engine_util.CfLock, key) @@ -167,6 +166,11 @@ func (txn *MvccTxn) DeleteLock(key []byte) { }) } +// GetValue gets the value at precisely the given key and ts, without searching. +func (txn *MvccTxn) GetValue(key []byte, ts uint64) ([]byte, error) { + return txn.Reader.GetCF(engine_util.CfDefault, EncodeKey(key, ts)) +} + // PutValue adds a key/value write to this transaction. func (txn *MvccTxn) PutValue(key []byte, value []byte) { txn.Writes = append(txn.Writes, inner_server.Modify{ @@ -200,8 +204,8 @@ func EncodeKey(key []byte, ts uint64) []byte { return newKey } -// DecodeUserKey takes a key + timestamp and returns the key part. -func DecodeUserKey(key []byte) []byte { +// decodeUserKey takes a key + timestamp and returns the key part. +func decodeUserKey(key []byte) []byte { _, userKey, err := codec.DecodeBytes(key) if err != nil { panic(err) @@ -209,8 +213,8 @@ func DecodeUserKey(key []byte) []byte { return userKey } -// DecodeTimestamp takes a key + timestamp and returns the timestamp part. -func DecodeTimestamp(key []byte) uint64 { +// decodeTimestamp takes a key + timestamp and returns the timestamp part. +func decodeTimestamp(key []byte) uint64 { left, _, err := codec.DecodeBytes(key) if err != nil { panic(err) diff --git a/kv/tikv/storage/kvstore/transaction_test.go b/kv/tikv/storage/kvstore/transaction_test.go index d85bb5275..350d6e981 100644 --- a/kv/tikv/storage/kvstore/transaction_test.go +++ b/kv/tikv/storage/kvstore/transaction_test.go @@ -20,9 +20,9 @@ func TestEncodeKey(t *testing.T) { } func TestDecodeKey(t *testing.T) { - assert.Equal(t, []byte{}, DecodeUserKey(EncodeKey([]byte{}, 0))) - assert.Equal(t, []byte{42}, DecodeUserKey(EncodeKey([]byte{42}, 0))) - assert.Equal(t, []byte{42, 0, 5}, DecodeUserKey(EncodeKey([]byte{42, 0, 5}, 0))) - assert.Equal(t, []byte{42}, DecodeUserKey(EncodeKey([]byte{42}, 2342342355436234))) - assert.Equal(t, []byte{42, 0, 5}, DecodeUserKey(EncodeKey([]byte{42, 0, 5}, 234234))) + assert.Equal(t, []byte{}, decodeUserKey(EncodeKey([]byte{}, 0))) + assert.Equal(t, []byte{42}, decodeUserKey(EncodeKey([]byte{42}, 0))) + assert.Equal(t, []byte{42, 0, 5}, decodeUserKey(EncodeKey([]byte{42, 0, 5}, 0))) + assert.Equal(t, []byte{42}, decodeUserKey(EncodeKey([]byte{42}, 2342342355436234))) + assert.Equal(t, []byte{42, 0, 5}, decodeUserKey(EncodeKey([]byte{42, 0, 5}, 234234))) } diff --git a/kv/tikv/storage/rollback_test.go b/kv/tikv/storage/rollback_test.go index 974caa39a..d0b7d3cb6 100644 --- a/kv/tikv/storage/rollback_test.go +++ b/kv/tikv/storage/rollback_test.go @@ -4,10 +4,10 @@ import ( "github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore" "testing" - "github.com/pingcap-incubator/tinykv/kv/engine_util" "github.com/pingcap-incubator/tinykv/kv/tikv/inner_server" "github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands" "github.com/pingcap-incubator/tinykv/kv/tikv/storage/exec" + "github.com/pingcap-incubator/tinykv/kv/util/engine_util" "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" "github.com/stretchr/testify/assert" ) diff --git a/kv/tikv/storage/scan_test.go b/kv/tikv/storage/scan_test.go new file mode 100644 index 000000000..ab92a678b --- /dev/null +++ b/kv/tikv/storage/scan_test.go @@ -0,0 +1,167 @@ +package storage + +import ( + "fmt" + "github.com/pingcap-incubator/tinykv/kv/tikv/inner_server" + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands" + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/exec" + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore" + "github.com/pingcap-incubator/tinykv/kv/util/engine_util" + "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" + "github.com/stretchr/testify/assert" + "testing" +) + +// TestScanEmpty tests a scan after the end of the DB. +func TestScanEmpty(t *testing.T) { + sched := exec.NewSeqScheduler(initMemServer()) + + builder := newReqBuilder() + cmd := commands.NewScan(builder.scanRequest([]byte{200}, 10000)) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp.RegionError) + assert.Empty(t, resp.Pairs) +} + +// TestScanLimitZero tests we get nothing if limit is 0. +func TestScanLimitZero(t *testing.T) { + sched := exec.NewSeqScheduler(initMemServer()) + + builder := newReqBuilder() + cmd := commands.NewScan(builder.scanRequest([]byte{3}, 0)) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp.RegionError) + assert.Empty(t, resp.Pairs) +} + +// TestScanAll start at the beginning of the DB and read all pairs, respecting the timestamp. +func TestScanAll(t *testing.T) { + sched := exec.NewSeqScheduler(initMemServer()) + + builder := newReqBuilder() + cmd := commands.NewScan(builder.scanRequest([]byte{0}, 10000)) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.ScanResponse) + + assert.Nil(t, resp.RegionError) + assert.Equal(t, 11, len(resp.Pairs)) + assert.Equal(t, []byte{1}, resp.Pairs[0].Key) + assert.Equal(t, []byte{50}, resp.Pairs[0].Value) + assert.Equal(t, []byte{199}, resp.Pairs[10].Key) + assert.Equal(t, []byte{54}, resp.Pairs[10].Value) +} + +// TestScanLimit tests that scan takes the limit into account. +func TestScanLimit(t *testing.T) { + sched := exec.NewSeqScheduler(initMemServer()) + + builder := newReqBuilder() + cmd := commands.NewScan(builder.scanRequest([]byte{2}, 6)) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 6, len(resp.Pairs)) + assert.Equal(t, []byte{3}, resp.Pairs[0].Key) + assert.Equal(t, []byte{51}, resp.Pairs[0].Value) + assert.Equal(t, []byte{4}, resp.Pairs[5].Key) + assert.Equal(t, []byte{52}, resp.Pairs[5].Value) + fmt.Printf("%v", resp.Pairs) +} + +// TestScanDeleted scan over a value which is deleted then replaced. +func TestScanDeleted(t *testing.T) { + sched := exec.NewSeqScheduler(initMemServer()) + builder := newReqBuilder() + + req1 := builder.scanRequest([]byte{100}, 10000) + req1.Version = 100 + cmd1 := commands.NewScan(req1) + req2 := builder.scanRequest([]byte{100}, 10000) + req2.Version = 105 + cmd2 := commands.NewScan(req2) + req3 := builder.scanRequest([]byte{100}, 10000) + req3.Version = 120 + cmd3 := commands.NewScan(req3) + + resps := run(t, sched, &cmd1, &cmd2, &cmd3) + + resp1 := resps[0].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp1.RegionError) + assert.Equal(t, 3, len(resp1.Pairs)) + assert.Equal(t, []byte{150}, resp1.Pairs[1].Key) + assert.Equal(t, []byte{42}, resp1.Pairs[1].Value) + + resp2 := resps[1].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp2.RegionError) + assert.Equal(t, 2, len(resp2.Pairs)) + assert.Equal(t, []byte{120}, resp2.Pairs[0].Key) + assert.Equal(t, []byte{199}, resp2.Pairs[1].Key) + + resp3 := resps[2].(*kvrpcpb.ScanResponse) + assert.Nil(t, resp3.RegionError) + assert.Equal(t, 3, len(resp3.Pairs)) + assert.Equal(t, []byte{150}, resp3.Pairs[1].Key) + assert.Equal(t, []byte{64}, resp3.Pairs[1].Value) +} + +func initMemServer() *inner_server.MemInnerServer { + mem := inner_server.NewMemInnerServer() + + // Committed before 100. + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{1}, 80), []byte{50}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{1}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{1, 23}, 80), []byte{55}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{1, 23}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 80), []byte{51}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3, 45}, 80), []byte{56}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3, 45}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3, 46}, 80), []byte{57}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3, 46}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3, 47}, 80), []byte{58}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3, 47}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3, 48}, 80), []byte{59}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3, 48}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{4}, 80), []byte{52}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{4}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{120}, 80), []byte{53}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{120}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{199}, 80), []byte{54}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{199}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + + // Committed after 100. + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{4, 45}, 110), []byte{58}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{4, 45}, 116), []byte{1, 0, 0, 0, 0, 0, 0, 0, 110}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{4, 46}, 110), []byte{57}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{4, 46}, 116), []byte{1, 0, 0, 0, 0, 0, 0, 0, 110}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{4, 47}, 110), []byte{58}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{4, 47}, 116), []byte{1, 0, 0, 0, 0, 0, 0, 0, 110}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{4, 48}, 110), []byte{59}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{4, 48}, 116), []byte{1, 0, 0, 0, 0, 0, 0, 0, 110}) + + // Committed after 100, but started before. + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{5, 45}, 97), []byte{60}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{5, 45}, 101), []byte{1, 0, 0, 0, 0, 0, 0, 0, 97}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{5, 46}, 97), []byte{61}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{5, 46}, 101), []byte{1, 0, 0, 0, 0, 0, 0, 0, 97}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{5, 47}, 97), []byte{62}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{5, 47}, 101), []byte{1, 0, 0, 0, 0, 0, 0, 0, 97}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{5, 48}, 97), []byte{63}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{5, 48}, 101), []byte{1, 0, 0, 0, 0, 0, 0, 0, 97}) + + // A deleted value and replaced value. + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{150}, 80), []byte{42}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{150}, 99), []byte{1, 0, 0, 0, 0, 0, 0, 0, 80}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{150}, 101), []byte{2, 0, 0, 0, 0, 0, 0, 0, 97}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{150}, 110), []byte{64}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{150}, 116), []byte{1, 0, 0, 0, 0, 0, 0, 0, 110}) + + return mem +} + +func (builder *requestBuilder) scanRequest(startKey []byte, limit uint32) *kvrpcpb.ScanRequest { + var req kvrpcpb.ScanRequest + req.StartKey = startKey + req.Limit = limit + req.Version = builder.nextTS + builder.nextTS++ + return &req +} diff --git a/kv/util/engine_util/cf_iterator.go b/kv/util/engine_util/cf_iterator.go index daa7cc82a..863a7f10c 100644 --- a/kv/util/engine_util/cf_iterator.go +++ b/kv/util/engine_util/cf_iterator.go @@ -1,6 +1,8 @@ package engine_util -import "github.com/coocood/badger" +import ( + "github.com/coocood/badger" +) type CFItem struct { item *badger.Item