Skip to content

Commit

Permalink
Implement scan request
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information
nrc committed Feb 13, 2020
1 parent e0d3499 commit 8a3ba39
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 18 deletions.
4 changes: 3 additions & 1 deletion kv/tikv/server.go
Expand Up @@ -77,7 +77,9 @@ func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb
}

func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
return nil, nil
cmd := commands.NewScan(req)
resp := <-svr.scheduler.Run(&cmd)
return resp.scanResponse()
}

func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
Expand Down
63 changes: 63 additions & 0 deletions kv/tikv/storage/commands/scan.go
@@ -0,0 +1,63 @@
package commands

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)

type Scan struct {
request *kvrpcpb.ScanRequest
response *kvrpcpb.ScanResponse
}

func NewScan(request *kvrpcpb.ScanRequest) Scan {
return Scan{request, &kvrpcpb.ScanResponse{}}
}

func (s *Scan) BuildTxn(txn *kvstore.MvccTxn) error {
txn.StartTS = &s.request.Version

iter := kvstore.NewScanner(s.request.StartKey, txn)
limit := s.request.Limit
for {
if limit == 0 {
// We've scanned up to the requested limit.
return nil
}
limit -= 1
key, value, err := iter.Next()
if err != nil {
return err
}
if key == nil {
// Reached the end of the DB
return nil
}

pair := kvrpcpb.KvPair{}
pair.Key = key
pair.Value = value
s.response.Pairs = append(s.response.Pairs, &pair)
}
}

func (s *Scan) Context() *kvrpcpb.Context {
return s.request.Context
}

func (s *Scan) Response() interface{} {
return s.response
}

func (s *Scan) HandleError(err error) interface{} {
if err == nil {
return nil
}

if regionErr := extractRegionError(err); regionErr != nil {
s.response.RegionError = regionErr
return &s.response
}

return nil
}
71 changes: 71 additions & 0 deletions kv/tikv/storage/kvstore/scanner.go
@@ -0,0 +1,71 @@
package kvstore

import (
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
)

// Scanner is used for reading multiple sequential key/value pairs from the storage layer. It is aware of the implementation
// of the storage layer and returns results suitable for users.
// Invariant: either the scanner is finished and can not be used, or it is ready to return a value immediately.
type Scanner struct {
writeIter engine_util.DBIterator
txn *MvccTxn
}

// NewScanner creates a new scanner ready to read from the snapshot in txn.
func NewScanner(startKey []byte, txn *MvccTxn) *Scanner {
writeIter := txn.Reader.IterCF(engine_util.CfWrite)
writeIter.Seek(EncodeKey(startKey, TsMax))
return &Scanner{
writeIter: writeIter,
txn: txn,
}
}

// Next returns the next key/value pair from the scanner. If the scanner is exhausted, then it will return `nil, nil, nil`.
func (scan *Scanner) Next() ([]byte, []byte, error) {
// Search for the next relevant key/value.
for {
if !scan.writeIter.Valid() {
// The underlying iterator is exhausted - we've reached the end of the DB.
return nil, nil, nil
}

item := scan.writeIter.Item()
userKey := decodeUserKey(item.Key())
commitTs := decodeTimestamp(item.Key())

if commitTs >= *scan.txn.StartTS {
// The key was not committed before our transaction started, find an earlier key.
scan.writeIter.Seek(EncodeKey(userKey, commitTs-1))
continue
}

// Note: we might check if userKey is locked (since we should not read an uncommitted transaction). However,
// because we are iterating over writes, we are guaranteed never to get a locked key at our timestamp (i.e., if
// the key were locked, then we would use the older value, which is what we will get via the write in any case).

writeValue, err := item.Value()
if err != nil {
return nil, nil, err
}
write, err := ParseWrite(writeValue)
if err != nil {
return nil, nil, err
}
if write.Kind != WriteKindPut {
// Key is removed, go to next key.
scan.writeIter.Seek(EncodeKey(userKey, 0))
continue
}

value, err := scan.txn.GetValue(userKey, write.StartTS)
if err != nil {
return nil, nil, err
}

scan.writeIter.Next()

return userKey, value, nil
}
}
24 changes: 14 additions & 10 deletions kv/tikv/storage/kvstore/transaction.go
Expand Up @@ -13,7 +13,6 @@ import (
// MvccTxn represents an mvcc transaction (see tikv/storage/doc.go for a definition). It permits reading from a snapshot
// and stores writes in a buffer for atomic writing.
type MvccTxn struct {
// TODO: is reader a snapshot or not?
Reader dbreader.DBReader
Writes []inner_server.Modify
StartTS *uint64
Expand All @@ -35,8 +34,8 @@ func (txn *MvccTxn) SeekWrite(key []byte, ts uint64) (*Write, uint64, error) {
return nil, 0, nil
}
item := iter.Item()
commitTs := DecodeTimestamp(item.Key())
if bytes.Compare(DecodeUserKey(item.Key()), key) != 0 {
commitTs := decodeTimestamp(item.Key())
if bytes.Compare(decodeUserKey(item.Key()), key) != 0 {
return nil, 0, nil
}
value, err := item.Value()
Expand All @@ -60,7 +59,7 @@ func (txn *MvccTxn) FindWrittenValue(key []byte, ts uint64) ([]byte, error) {
for iter.Seek(EncodeKey(key, ts)); iter.Valid(); iter.Next() {
item := iter.Item()
// If the user key part of the combined key has changed, then we've got to the next key without finding a put write.
if bytes.Compare(DecodeUserKey(item.Key()), key) != 0 {
if bytes.Compare(decodeUserKey(item.Key()), key) != 0 {
return nil, nil
}
value, err := item.Value()
Expand Down Expand Up @@ -104,7 +103,7 @@ func (txn *MvccTxn) FindWrite(key []byte, startTs uint64) (*Write, error) {
}
}

// GetWrite gets the value at precisely the given key and ts, without searching.
// GetWrite gets the write at precisely the given key and ts, without searching.
func (txn *MvccTxn) GetWrite(key []byte, ts uint64) (*Write, error) {
value, err := txn.Reader.GetCF(engine_util.CfWrite, EncodeKey(key, ts))
if err != nil {
Expand All @@ -125,7 +124,7 @@ func (txn *MvccTxn) PutWrite(key []byte, write *Write, ts uint64) {
})
}

// GetLock returns a lock if key is currently locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
// if an error occurs during lookup.
func (txn *MvccTxn) GetLock(key []byte) (*Lock, error) {
bytes, err := txn.Reader.GetCF(engine_util.CfLock, key)
Expand Down Expand Up @@ -167,6 +166,11 @@ func (txn *MvccTxn) DeleteLock(key []byte) {
})
}

// GetValue gets the value at precisely the given key and ts, without searching.
func (txn *MvccTxn) GetValue(key []byte, ts uint64) ([]byte, error) {
return txn.Reader.GetCF(engine_util.CfDefault, EncodeKey(key, ts))
}

// PutValue adds a key/value write to this transaction.
func (txn *MvccTxn) PutValue(key []byte, value []byte) {
txn.Writes = append(txn.Writes, inner_server.Modify{
Expand Down Expand Up @@ -200,17 +204,17 @@ func EncodeKey(key []byte, ts uint64) []byte {
return newKey
}

// DecodeUserKey takes a key + timestamp and returns the key part.
func DecodeUserKey(key []byte) []byte {
// decodeUserKey takes a key + timestamp and returns the key part.
func decodeUserKey(key []byte) []byte {
_, userKey, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
}
return userKey
}

// DecodeTimestamp takes a key + timestamp and returns the timestamp part.
func DecodeTimestamp(key []byte) uint64 {
// decodeTimestamp takes a key + timestamp and returns the timestamp part.
func decodeTimestamp(key []byte) uint64 {
left, _, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
Expand Down
10 changes: 5 additions & 5 deletions kv/tikv/storage/kvstore/transaction_test.go
Expand Up @@ -20,9 +20,9 @@ func TestEncodeKey(t *testing.T) {
}

func TestDecodeKey(t *testing.T) {
assert.Equal(t, []byte{}, DecodeUserKey(EncodeKey([]byte{}, 0)))
assert.Equal(t, []byte{42}, DecodeUserKey(EncodeKey([]byte{42}, 0)))
assert.Equal(t, []byte{42, 0, 5}, DecodeUserKey(EncodeKey([]byte{42, 0, 5}, 0)))
assert.Equal(t, []byte{42}, DecodeUserKey(EncodeKey([]byte{42}, 2342342355436234)))
assert.Equal(t, []byte{42, 0, 5}, DecodeUserKey(EncodeKey([]byte{42, 0, 5}, 234234)))
assert.Equal(t, []byte{}, decodeUserKey(EncodeKey([]byte{}, 0)))
assert.Equal(t, []byte{42}, decodeUserKey(EncodeKey([]byte{42}, 0)))
assert.Equal(t, []byte{42, 0, 5}, decodeUserKey(EncodeKey([]byte{42, 0, 5}, 0)))
assert.Equal(t, []byte{42}, decodeUserKey(EncodeKey([]byte{42}, 2342342355436234)))
assert.Equal(t, []byte{42, 0, 5}, decodeUserKey(EncodeKey([]byte{42, 0, 5}, 234234)))
}
2 changes: 1 addition & 1 deletion kv/tikv/storage/rollback_test.go
Expand Up @@ -4,10 +4,10 @@ import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"testing"

"github.com/pingcap-incubator/tinykv/kv/engine_util"
"github.com/pingcap-incubator/tinykv/kv/tikv/inner_server"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/exec"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/stretchr/testify/assert"
)
Expand Down

0 comments on commit 8a3ba39

Please sign in to comment.