Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement IterReverse for tikvSnapshot and use desc scan to get latest N ddl history jobs. #10152

Merged
merged 31 commits into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1ffab4d
add reverse scan for mocktikv
crazycs520 Mar 13, 2019
f05174b
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 Apr 15, 2019
3b26e64
todo: fix tikv reverse
crazycs520 Apr 15, 2019
971c2fa
fix mockTiKV handle rpc reverse scan bug
crazycs520 Apr 15, 2019
eefc6ce
refactor code and remove desc_scan.go file
crazycs520 Apr 15, 2019
3b7cda0
merge desc_scan and scan function
crazycs520 Apr 15, 2019
d52ba8e
refine code
crazycs520 Apr 15, 2019
26e8806
remove blank line
crazycs520 Apr 15, 2019
1369842
Merge branch 'master' into reverse-kv-scan
crazycs520 Apr 15, 2019
80672d8
add comment
crazycs520 Apr 15, 2019
3c4a6b5
Merge branch 'reverse-kv-scan' of https://github.com/crazycs520/tidb …
crazycs520 Apr 15, 2019
a3583dd
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 May 24, 2019
7a54a84
use LocateEndKey to located the region end with the endKey
crazycs520 May 24, 2019
d2dcfe4
address comment and add test
crazycs520 May 27, 2019
1b992df
Merge branch 'master' into reverse-kv-scan
crazycs520 May 27, 2019
341617e
test for history ddl job with multiple region
crazycs520 May 27, 2019
b7b62d4
fix test
crazycs520 May 28, 2019
ce3d64b
refine comment
crazycs520 May 28, 2019
ff0cee9
rename test
crazycs520 May 28, 2019
04f4c1a
Merge branch 'master' into reverse-kv-scan
crazycs520 May 28, 2019
87482e1
Update meta/meta.go
crazycs520 May 28, 2019
aef7e7c
add test in meta_test
crazycs520 May 28, 2019
363de0e
fix test
crazycs520 May 28, 2019
d4a3305
fix test
crazycs520 May 30, 2019
20ca3f5
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 May 31, 2019
dd1653b
refine code:
crazycs520 May 31, 2019
b5aaaff
refine test
crazycs520 May 31, 2019
be97fe1
address comment
crazycs520 May 31, 2019
b2554c1
refine code
crazycs520 May 31, 2019
b2de4c3
Merge branch 'master' into reverse-kv-scan
crazycs520 May 31, 2019
dc22826
Merge branch 'master' into reverse-kv-scan
crazycs520 May 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,26 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
return jobs, nil
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
var jobs []*model.Job
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
for _, pair := range pairs {
job := &model.Job{}
err = job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}
sorter := &jobsSorter{jobs: jobs}
sort.Sort(sorter)
return jobs, nil
}

// jobsSorter implements the sort.Interface interface.
type jobsSorter struct {
jobs []*model.Job
Expand Down
17 changes: 14 additions & 3 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,21 @@ func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanRespons
panic("KvScan: startKey not in region")
}
endKey := h.endKey
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
var pairs []Pair
if !req.Reverse {
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
}
pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
} else {
// TiKV use range [end_key, start_key) for reverse scan.
// So the req.StartKey actually is the end_key.
if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.StartKey, endKey) < 0) {
endKey = req.StartKey
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)

return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
59 changes: 48 additions & 11 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ type Scanner struct {
nextStartKey []byte
endKey []byte
eof bool

// Use for reverse scan.
desc bool
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
nextEndKey []byte
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, desc bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand All @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz
valid: true,
nextStartKey: startKey,
endKey: endKey,
desc: desc,
nextEndKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
Expand Down Expand Up @@ -83,14 +89,15 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
var err error
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
Expand All @@ -101,7 +108,8 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
if (!s.desc && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) ||
(s.desc && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) {
s.eof = true
s.Close()
return nil
Expand Down Expand Up @@ -147,18 +155,33 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
func (s *Scanner) getData(bo *Backoffer) error {
logutil.Logger(context.Background()).Debug("txn getData",
zap.Binary("nextStartKey", s.nextStartKey),
zap.Binary("nextEndKey", s.nextEndKey),
zap.Bool("desc", s.desc),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)

var locKey, reqEndKey, reqStartKey []byte
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if !s.desc {
locKey = s.nextStartKey
} else {
locKey = s.nextEndKey
}
loc, err := s.snapshot.store.regionCache.LocateKey(bo, locKey)
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
if !s.desc {
reqEndKey = s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
} else {
reqStartKey = s.nextStartKey
if len(reqStartKey) == 0 ||
(len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) {
reqStartKey = loc.StartKey
}
}

req := &tikvrpc.Request{
Expand All @@ -175,6 +198,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
NotFillCache: s.snapshot.notFillCache,
},
}
if s.desc {
req.Scan.StartKey = s.nextEndKey
req.Scan.EndKey = reqStartKey
req.Scan.Reverse = true
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,8 +246,13 @@ func (s *Scanner) getData(bo *Backoffer) error {
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) {
if !s.desc {
s.nextStartKey = loc.EndKey
} else {
s.nextEndKey = reqStartKey
}
if (!s.desc && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) ||
(s.desc && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) {
// Current Region is the last one.
s.eof = true
}
Expand All @@ -230,7 +263,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = kv.Key(lastKey).Next()
if !s.desc {
s.nextStartKey = kv.Key(lastKey).Next()
} else {
s.nextEndKey = kv.Key(lastKey)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
}
37 changes: 35 additions & 2 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,52 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10)
scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('h'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}

func (s *testScanMockSuite) TestDescScan(c *C) {
store := NewTestStore(c).(*tikvStore)
defer store.Close()

txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, true)
c.Assert(err, IsNil)
for ch := byte('z'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true)
c.Assert(err, IsNil)
for ch := byte('h'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}
7 changes: 4 additions & 3 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {

// Iter return a list of key-value pair after `k`.
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
scanner, err := newScanner(s, k, upperBound, scanBatchSize, false)
return scanner, errors.Trace(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
return nil, kv.ErrNotImplemented
func (s *tikvSnapshot) IterReverse(endKey kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, nil, endKey, scanBatchSize, true)
return scanner, errors.Trace(err)
}

func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
Expand Down
48 changes: 48 additions & 0 deletions structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,23 @@ func (t *TxStructure) HGetAll(key []byte) ([]HashPair, error) {
return res, errors.Trace(err)
}

// HGetLastN gets latest N fields and values in hash.
func (t *TxStructure) HGetLastN(key []byte, num int) ([]HashPair, error) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
res := make([]HashPair, 0, num)
err := t.iterReverseHash(key, func(field []byte, value []byte) (bool, error) {
pair := HashPair{
Field: append([]byte{}, field...),
Value: append([]byte{}, value...),
}
res = append(res, pair)
if len(res) >= num {
return false, nil
}
return true, nil
})
return res, errors.Trace(err)
}

// HClear removes the hash value of the key.
func (t *TxStructure) HClear(key []byte) error {
metaKey := t.encodeHashMetaKey(key)
Expand Down Expand Up @@ -268,6 +285,37 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error)
return nil
}

func (t *TxStructure) iterReverseHash(key []byte, fn func(k []byte, v []byte) (bool, error)) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.IterReverse(dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}

var field []byte
for it.Valid() {
if !it.Key().HasPrefix(dataPrefix) {
break
}

_, field, err = t.decodeHashDataKey(it.Key())
if err != nil {
return errors.Trace(err)
}

more, err := fn(field, it.Value())
if !more || err != nil {
return errors.Trace(err)
}

err = it.Next()
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) {
v, err := t.reader.Get(metaKey)
if kv.ErrNotExist.Equal(err) {
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ const DefNumHistoryJobs = 10
// The maximum count of history jobs is num.
func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) {
t := meta.NewMeta(txn)
jobs, err := t.GetAllHistoryDDLJobs()
jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down