Skip to content

Commit

Permalink
*: use desc scan to get latest N ddl history jobs. (#10152) (#11789)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and winkyao committed Aug 21, 2019
1 parent f93b392 commit 98714ab
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 32 deletions.
23 changes: 20 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -219,9 +220,9 @@ func (s *testSuite) TestAdmin(c *C) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID))
result.Check(testkit.Rows(historyJob[0].Query))
historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
c.Assert(err, IsNil)

// check table test
Expand Down Expand Up @@ -307,6 +308,22 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("alter table t1 add index idx_i(i);")
tk.MustExec("alter table t1 add index idx_m(a,c,d,e,f,g,h,i,j);")
tk.MustExec("admin check table t1;")

// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
historyJobs, err = admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)

// Split region for history ddl job queues.
m := meta.NewMeta(txn)
startKey := meta.DDLJobHistoryKey(m, 0)
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5))

historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
2 changes: 2 additions & 0 deletions executor/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (s *testMemoryLeak) SetUpSuite(c *C) {
}

func (s *testMemoryLeak) TestPBMemoryLeak(c *C) {
c.Skip("too slow")

se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "create database test_mem")
Expand Down
1 change: 1 addition & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20"))
_, err = tk.Exec("set global tidb_query_log_max_len = 20")
c.Assert(err, NotNil)
tk.MustExec("set tidb_query_log_max_len = 1024")

tk.MustExec("set tidb_constraint_check_in_place = 1")
tk.MustQuery(`select @@session.tidb_constraint_check_in_place;`).Check(testkit.Rows("1"))
Expand Down
28 changes: 23 additions & 5 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,13 @@ func (m *Meta) parseTableID(key string) (int64, error) {
return n, errors.Trace(err)
}

// GenAutoTableIDIDKeyValue generate meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
// DDLJobHistoryKey is only used for testing.
func DDLJobHistoryKey(m *Meta, jobID int64) []byte {
return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID))
}

// GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
dbKey := m.dbKey(dbID)
autoTableIDKey := m.autoTableIDKey(tableID)
return m.txn.EncodeHashAutoIDKeyValue(dbKey, autoTableIDKey, autoID)
Expand Down Expand Up @@ -666,10 +671,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
var jobs []*model.Job
for _, pair := range pairs {
return decodeAndSortJob(pairs)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeAndSortJob(pairs)
}

func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err = job.Decode(pair.Value)
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
18 changes: 18 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ func (s *testSuite) TestMeta(c *C) {

err = txn.Commit(context.Background())
c.Assert(err, IsNil)

// Test for DDLJobHistoryKey.
key := meta.DDLJobHistoryKey(t, 888)
c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7})
}

func (s *testSuite) TestSnapshot(c *C) {
Expand Down Expand Up @@ -322,6 +326,13 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, job)

// Add multiple history jobs.
historyJob1 := &model.Job{ID: 1234}
err = t.AddHistoryDDLJob(historyJob1)
c.Assert(err, IsNil)
historyJob2 := &model.Job{ID: 123}
err = t.AddHistoryDDLJob(historyJob2)
c.Assert(err, IsNil)
all, err := t.GetAllHistoryDDLJobs()
c.Assert(err, IsNil)
var lastID int64
Expand All @@ -330,6 +341,13 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test for get last N history ddl jobs.
historyJobs, err := t.GetLastNHistoryDDLJobs(2)
c.Assert(err, IsNil)
c.Assert(len(historyJobs), Equals, 2)
c.Assert(historyJobs[0].ID == 123, IsTrue)
c.Assert(historyJobs[1].ID == 1234, IsTrue)

// Test GetAllDDLJobsInQueue.
err = t.EnQueueDDLJob(job)
job1 := &model.Job{ID: 2}
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -329,6 +330,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count
c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count)
}

// SplitKeys evenly splits the start, end key into "count" regions.
// Only works for single store.
func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
}

func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
Expand Down
30 changes: 24 additions & 6 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
}

func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
endKey := MvccKey(h.endKey).Raw()
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
var pairs []Pair
if !req.Reverse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
}
pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
} else {
// TiKV use range [end_key, start_key) for reverse scan.
// Should use the req.EndKey to check in region.
if !h.checkKeyInRegion(req.GetEndKey()) {
panic("KvScan: startKey not in region")
}

// TiKV use range [end_key, start_key) for reverse scan.
// So the req.StartKey actually is the end_key.
if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) {
endKey = req.StartKey
}

pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)

return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
60 changes: 49 additions & 11 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ type Scanner struct {
nextStartKey []byte
endKey []byte
eof bool

// Use for reverse scan.
reverse bool
nextEndKey []byte
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand All @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz
valid: true,
nextStartKey: startKey,
endKey: endKey,
reverse: reverse,
nextEndKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
Expand Down Expand Up @@ -83,14 +89,15 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
var err error
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
Expand All @@ -101,7 +108,8 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) ||
(s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) {
s.eof = true
s.Close()
return nil
Expand Down Expand Up @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
func (s *Scanner) getData(bo *Backoffer) error {
logutil.Logger(context.Background()).Debug("txn getData",
zap.Binary("nextStartKey", s.nextStartKey),
zap.Binary("nextEndKey", s.nextEndKey),
zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)

var reqEndKey, reqStartKey []byte
var loc *KeyLocation
var err error
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if !s.reverse {
loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
} else {
loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey)
}
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
if !s.reverse {
reqEndKey = s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
} else {
reqStartKey = s.nextStartKey
if len(reqStartKey) == 0 ||
(len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) {
reqStartKey = loc.StartKey
}
}

req := &tikvrpc.Request{
Expand All @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
NotFillCache: s.snapshot.notFillCache,
},
}
if s.reverse {
req.Scan.StartKey = s.nextEndKey
req.Scan.EndKey = reqStartKey
req.Scan.Reverse = true
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error {
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) {
if !s.reverse {
s.nextStartKey = loc.EndKey
} else {
s.nextEndKey = reqStartKey
}
if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) ||
(s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) {
// Current Region is the last one.
s.eof = true
}
Expand All @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = kv.Key(lastKey).Next()
if !s.reverse {
s.nextStartKey = kv.Key(lastKey).Next()
} else {
s.nextEndKey = kv.Key(lastKey)
}
return nil
}
}
37 changes: 35 additions & 2 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,52 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10)
scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('h'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}

func (s *testScanMockSuite) TestReverseScan(c *C) {
store := NewTestStore(c).(*tikvStore)
defer store.Close()

txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true)
c.Assert(err, IsNil)
for ch := byte('y'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true)
c.Assert(err, IsNil)
for ch := byte('h'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}
Loading

0 comments on commit 98714ab

Please sign in to comment.