Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement IterReverse for tikvSnapshot and use desc scan to get latest N ddl history jobs. (#10152) #11789

Merged
merged 7 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -219,9 +220,9 @@ func (s *testSuite) TestAdmin(c *C) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID))
result.Check(testkit.Rows(historyJob[0].Query))
historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
c.Assert(err, IsNil)

// check table test
Expand Down Expand Up @@ -307,6 +308,22 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("alter table t1 add index idx_i(i);")
tk.MustExec("alter table t1 add index idx_m(a,c,d,e,f,g,h,i,j);")
tk.MustExec("admin check table t1;")

// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
historyJobs, err = admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)

// Split region for history ddl job queues.
m := meta.NewMeta(txn)
startKey := meta.DDLJobHistoryKey(m, 0)
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5))

historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
2 changes: 2 additions & 0 deletions executor/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (s *testMemoryLeak) SetUpSuite(c *C) {
}

func (s *testMemoryLeak) TestPBMemoryLeak(c *C) {
c.Skip("too slow")

se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "create database test_mem")
Expand Down
1 change: 1 addition & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20"))
_, err = tk.Exec("set global tidb_query_log_max_len = 20")
c.Assert(err, NotNil)
tk.MustExec("set tidb_query_log_max_len = 1024")

tk.MustExec("set tidb_constraint_check_in_place = 1")
tk.MustQuery(`select @@session.tidb_constraint_check_in_place;`).Check(testkit.Rows("1"))
Expand Down
28 changes: 23 additions & 5 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,13 @@ func (m *Meta) parseTableID(key string) (int64, error) {
return n, errors.Trace(err)
}

// GenAutoTableIDIDKeyValue generate meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
// DDLJobHistoryKey is only used for testing.
func DDLJobHistoryKey(m *Meta, jobID int64) []byte {
return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID))
}

// GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
dbKey := m.dbKey(dbID)
autoTableIDKey := m.autoTableIDKey(tableID)
return m.txn.EncodeHashAutoIDKeyValue(dbKey, autoTableIDKey, autoID)
Expand Down Expand Up @@ -666,10 +671,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
var jobs []*model.Job
for _, pair := range pairs {
return decodeAndSortJob(pairs)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeAndSortJob(pairs)
}

func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err = job.Decode(pair.Value)
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
18 changes: 18 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ func (s *testSuite) TestMeta(c *C) {

err = txn.Commit(context.Background())
c.Assert(err, IsNil)

// Test for DDLJobHistoryKey.
key := meta.DDLJobHistoryKey(t, 888)
c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7})
}

func (s *testSuite) TestSnapshot(c *C) {
Expand Down Expand Up @@ -322,6 +326,13 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, job)

// Add multiple history jobs.
historyJob1 := &model.Job{ID: 1234}
err = t.AddHistoryDDLJob(historyJob1)
c.Assert(err, IsNil)
historyJob2 := &model.Job{ID: 123}
err = t.AddHistoryDDLJob(historyJob2)
c.Assert(err, IsNil)
all, err := t.GetAllHistoryDDLJobs()
c.Assert(err, IsNil)
var lastID int64
Expand All @@ -330,6 +341,13 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test for get last N history ddl jobs.
historyJobs, err := t.GetLastNHistoryDDLJobs(2)
c.Assert(err, IsNil)
c.Assert(len(historyJobs), Equals, 2)
c.Assert(historyJobs[0].ID == 123, IsTrue)
c.Assert(historyJobs[1].ID == 1234, IsTrue)

// Test GetAllDDLJobsInQueue.
err = t.EnQueueDDLJob(job)
job1 := &model.Job{ID: 2}
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -329,6 +330,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count
c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count)
}

// SplitKeys evenly splits the start, end key into "count" regions.
// Only works for single store.
func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
}

func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
Expand Down
30 changes: 24 additions & 6 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
}

func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
endKey := MvccKey(h.endKey).Raw()
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
var pairs []Pair
if !req.Reverse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) {
endKey = req.EndKey
}
pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
} else {
// TiKV use range [end_key, start_key) for reverse scan.
// Should use the req.EndKey to check in region.
if !h.checkKeyInRegion(req.GetEndKey()) {
panic("KvScan: startKey not in region")
}

// TiKV use range [end_key, start_key) for reverse scan.
// So the req.StartKey actually is the end_key.
if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) {
endKey = req.StartKey
}

pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)

return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
60 changes: 49 additions & 11 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ type Scanner struct {
nextStartKey []byte
endKey []byte
eof bool

// Use for reverse scan.
reverse bool
nextEndKey []byte
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand All @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz
valid: true,
nextStartKey: startKey,
endKey: endKey,
reverse: reverse,
nextEndKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
Expand Down Expand Up @@ -83,14 +89,15 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
var err error
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
Expand All @@ -101,7 +108,8 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) ||
(s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) {
s.eof = true
s.Close()
return nil
Expand Down Expand Up @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
func (s *Scanner) getData(bo *Backoffer) error {
logutil.Logger(context.Background()).Debug("txn getData",
zap.Binary("nextStartKey", s.nextStartKey),
zap.Binary("nextEndKey", s.nextEndKey),
zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)

var reqEndKey, reqStartKey []byte
var loc *KeyLocation
var err error
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if !s.reverse {
loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
} else {
loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey)
}
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
if !s.reverse {
reqEndKey = s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
} else {
reqStartKey = s.nextStartKey
if len(reqStartKey) == 0 ||
(len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) {
reqStartKey = loc.StartKey
}
}

req := &tikvrpc.Request{
Expand All @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
NotFillCache: s.snapshot.notFillCache,
},
}
if s.reverse {
req.Scan.StartKey = s.nextEndKey
req.Scan.EndKey = reqStartKey
req.Scan.Reverse = true
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error {
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) {
if !s.reverse {
s.nextStartKey = loc.EndKey
} else {
s.nextEndKey = reqStartKey
}
if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) ||
(s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) {
// Current Region is the last one.
s.eof = true
}
Expand All @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = kv.Key(lastKey).Next()
if !s.reverse {
s.nextStartKey = kv.Key(lastKey).Next()
} else {
s.nextEndKey = kv.Key(lastKey)
}
return nil
}
}
37 changes: 35 additions & 2 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,52 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10)
scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('h'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}

func (s *testScanMockSuite) TestReverseScan(c *C) {
store := NewTestStore(c).(*tikvStore)
defer store.Close()

txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true)
c.Assert(err, IsNil)
for ch := byte('y'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true)
c.Assert(err, IsNil)
for ch := byte('h'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}
Loading