Skip to content

Commit

Permalink
*: implement IterReverse for tikvSnapshot and use desc scan to get la…
Browse files Browse the repository at this point in the history
…test N ddl history jobs. (#10152)
  • Loading branch information
crazycs520 authored and winkyao committed May 31, 2019
1 parent 03bb568 commit 8599fee
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 29 deletions.
23 changes: 20 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -226,9 +227,9 @@ func (s *testSuite) TestAdmin(c *C) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID))
result.Check(testkit.Rows(historyJob[0].Query))
historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
c.Assert(err, IsNil)

// check table test
Expand Down Expand Up @@ -282,6 +283,22 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;")
tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);")
tk.MustExec("admin check table t1;")

// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
historyJobs, err = admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)

// Split region for history ddl job queues.
m := meta.NewMeta(txn)
startKey := meta.DDLJobHistoryKey(m, 0)
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5))

historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
24 changes: 21 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (m *Meta) tableKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID))
}

// DDLJobHistoryKey is only used for testing.
func DDLJobHistoryKey(m *Meta, jobID int64) []byte {
return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID))
}

// GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
dbKey := m.dbKey(dbID)
Expand Down Expand Up @@ -637,10 +642,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, 0, len(pairs))
for _, pair := range pairs {
return decodeAndSortJob(pairs)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeAndSortJob(pairs)
}

func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err = job.Decode(pair.Value)
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
11 changes: 11 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (s *testSuite) TestMeta(c *C) {

err = txn.Commit(context.Background())
c.Assert(err, IsNil)

// Test for DDLJobHistoryKey.
key = meta.DDLJobHistoryKey(t, 888)
c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7})
}

func (s *testSuite) TestSnapshot(c *C) {
Expand Down Expand Up @@ -356,6 +360,13 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test for get last N history ddl jobs.
historyJobs, err := t.GetLastNHistoryDDLJobs(2)
c.Assert(err, IsNil)
c.Assert(len(historyJobs), Equals, 2)
c.Assert(historyJobs[0].ID == 123, IsTrue)
c.Assert(historyJobs[1].ID == 1234, IsTrue)

// Test GetAllDDLJobsInQueue.
err = t.EnQueueDDLJob(job)
c.Assert(err, IsNil)
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
)

Expand Down Expand Up @@ -340,6 +341,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count
c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count)
}

// SplitKeys evenly splits the start, end key into "count" regions.
// Only works for single store.
func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
}

func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
Expand Down
30 changes: 24 additions & 6 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
}

func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
endKey := MvccKey(h.endKey).Raw()
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
var pairs []Pair
if !req.Reverse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
}
pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
} else {
// TiKV use range [end_key, start_key) for reverse scan.
// Should use the req.EndKey to check in region.
if !h.checkKeyInRegion(req.GetEndKey()) {
panic("KvScan: startKey not in region")
}

// TiKV use range [end_key, start_key) for reverse scan.
// So the req.StartKey actually is the end_key.
if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) {
endKey = req.StartKey
}

pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)

return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
60 changes: 49 additions & 11 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ type Scanner struct {
nextStartKey []byte
endKey []byte
eof bool

// Use for reverse scan.
reverse bool
nextEndKey []byte
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand All @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz
valid: true,
nextStartKey: startKey,
endKey: endKey,
reverse: reverse,
nextEndKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
Expand Down Expand Up @@ -83,14 +89,15 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
var err error
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
Expand All @@ -101,7 +108,8 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) ||
(s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) {
s.eof = true
s.Close()
return nil
Expand Down Expand Up @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
func (s *Scanner) getData(bo *Backoffer) error {
logutil.Logger(context.Background()).Debug("txn getData",
zap.Binary("nextStartKey", s.nextStartKey),
zap.Binary("nextEndKey", s.nextEndKey),
zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)

var reqEndKey, reqStartKey []byte
var loc *KeyLocation
var err error
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if !s.reverse {
loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
} else {
loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey)
}
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
if !s.reverse {
reqEndKey = s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
} else {
reqStartKey = s.nextStartKey
if len(reqStartKey) == 0 ||
(len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) {
reqStartKey = loc.StartKey
}
}

req := &tikvrpc.Request{
Expand All @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
NotFillCache: s.snapshot.notFillCache,
},
}
if s.reverse {
req.Scan.StartKey = s.nextEndKey
req.Scan.EndKey = reqStartKey
req.Scan.Reverse = true
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error {
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) {
if !s.reverse {
s.nextStartKey = loc.EndKey
} else {
s.nextEndKey = reqStartKey
}
if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) ||
(s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) {
// Current Region is the last one.
s.eof = true
}
Expand All @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = kv.Key(lastKey).Next()
if !s.reverse {
s.nextStartKey = kv.Key(lastKey).Next()
} else {
s.nextEndKey = kv.Key(lastKey)
}
return nil
}
}
37 changes: 35 additions & 2 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,52 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10)
scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('h'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}

func (s *testScanMockSuite) TestReverseScan(c *C) {
store := NewTestStore(c).(*tikvStore)
defer store.Close()

txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true)
c.Assert(err, IsNil)
for ch := byte('y'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true)
c.Assert(err, IsNil)
for ch := byte('h'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}
5 changes: 3 additions & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {

// Iter return a list of key-value pair after `k`.
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
scanner, err := newScanner(s, k, upperBound, scanBatchSize, false)
return scanner, errors.Trace(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
return nil, kv.ErrNotImplemented
scanner, err := newScanner(s, nil, k, scanBatchSize, true)
return scanner, errors.Trace(err)
}

func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
if logFreq%10 == 0 {
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID),
zap.String("desc", string(resp.Desc)),
zap.String("reverse", string(resp.Desc)),
zap.String("status", pdpb.OperatorStatus_name[int32(resp.Status)]))
}
logFreq++
Expand Down
Loading

0 comments on commit 8599fee

Please sign in to comment.