From 98714ab5fce4a23d4f9121a8ffed69ed9b1c28b0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 21 Aug 2019 16:03:39 +0800 Subject: [PATCH] *: use desc scan to get latest N ddl history jobs. (#10152) (#11789) --- executor/executor_test.go | 23 +++++++++-- executor/memory_test.go | 2 + executor/set_test.go | 1 + meta/meta.go | 28 +++++++++++--- meta/meta_test.go | 18 +++++++++ store/mockstore/mocktikv/cluster.go | 7 ++++ store/mockstore/mocktikv/rpc.go | 30 ++++++++++++--- store/tikv/scan.go | 60 +++++++++++++++++++++++------ store/tikv/scan_mock_test.go | 37 +++++++++++++++++- store/tikv/snapshot.go | 5 ++- store/tikv/split_region.go | 2 +- structure/hash.go | 48 +++++++++++++++++++++++ structure/structure_test.go | 11 ++++++ structure/type.go | 5 +++ util/admin/admin.go | 2 +- util/kvencoder/kv_encoder.go | 2 +- 16 files changed, 249 insertions(+), 32 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 5f5add10cf87b..f555c6f4ea57f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" @@ -219,9 +220,9 @@ func (s *testSuite) TestAdmin(c *C) { result.Check(testkit.Rows()) result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`) result.Check(testkit.Rows()) - historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) - result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID)) - result.Check(testkit.Rows(historyJob[0].Query)) + historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) + result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID)) + result.Check(testkit.Rows(historyJobs[0].Query)) c.Assert(err, IsNil) // check table test @@ -307,6 +308,22 @@ func (s *testSuite) TestAdmin(c *C) { tk.MustExec("alter table t1 add index idx_i(i);") tk.MustExec("alter table t1 add index idx_m(a,c,d,e,f,g,h,i,j);") tk.MustExec("admin check table t1;") + + // Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + historyJobs, err = admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + + // Split region for history ddl job queues. + m := meta.NewMeta(txn) + startKey := meta.DDLJobHistoryKey(m, 0) + endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID) + s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5)) + + historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + c.Assert(historyJobs, DeepEquals, historyJobs2) } func (s *testSuite) fillData(tk *testkit.TestKit, table string) { diff --git a/executor/memory_test.go b/executor/memory_test.go index 2ab5d8a5f540e..56dd62fb3ead2 100644 --- a/executor/memory_test.go +++ b/executor/memory_test.go @@ -41,6 +41,8 @@ func (s *testMemoryLeak) SetUpSuite(c *C) { } func (s *testMemoryLeak) TestPBMemoryLeak(c *C) { + c.Skip("too slow") + se, err := session.CreateSession4Test(s.store) c.Assert(err, IsNil) _, err = se.Execute(context.Background(), "create database test_mem") diff --git a/executor/set_test.go b/executor/set_test.go index 39776f5fac6db..6ea6f3982aa24 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -254,6 +254,7 @@ func (s *testSuite) TestSetVar(c *C) { tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20")) _, err = tk.Exec("set global tidb_query_log_max_len = 20") c.Assert(err, NotNil) + tk.MustExec("set tidb_query_log_max_len = 1024") tk.MustExec("set tidb_constraint_check_in_place = 1") tk.MustQuery(`select @@session.tidb_constraint_check_in_place;`).Check(testkit.Rows("1")) diff --git a/meta/meta.go b/meta/meta.go index fd6ec3c9ee90a..ce2a4738c4e33 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -156,8 +156,13 @@ func (m *Meta) parseTableID(key string) (int64, error) { return n, errors.Trace(err) } -// GenAutoTableIDIDKeyValue generate meta key by dbID, tableID and corresponding value by autoID. -func (m *Meta) GenAutoTableIDIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) { +// DDLJobHistoryKey is only used for testing. +func DDLJobHistoryKey(m *Meta, jobID int64) []byte { + return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID)) +} + +// GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID. +func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) { dbKey := m.dbKey(dbID) autoTableIDKey := m.autoTableIDKey(tableID) return m.txn.EncodeHashAutoIDKeyValue(dbKey, autoTableIDKey, autoID) @@ -666,10 +671,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) { if err != nil { return nil, errors.Trace(err) } - var jobs []*model.Job - for _, pair := range pairs { + return decodeAndSortJob(pairs) +} + +// GetLastNHistoryDDLJobs gets latest N history ddl jobs. +func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) { + pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num) + if err != nil { + return nil, errors.Trace(err) + } + return decodeAndSortJob(pairs) +} + +func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) { + jobs := make([]*model.Job, 0, len(jobPairs)) + for _, pair := range jobPairs { job := &model.Job{} - err = job.Decode(pair.Value) + err := job.Decode(pair.Value) if err != nil { return nil, errors.Trace(err) } diff --git a/meta/meta_test.go b/meta/meta_test.go index 81a987a3fb4ea..b16ec357334c4 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -222,6 +222,10 @@ func (s *testSuite) TestMeta(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) + + // Test for DDLJobHistoryKey. + key := meta.DDLJobHistoryKey(t, 888) + c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7}) } func (s *testSuite) TestSnapshot(c *C) { @@ -322,6 +326,13 @@ func (s *testSuite) TestDDL(c *C) { c.Assert(err, IsNil) c.Assert(v, DeepEquals, job) + // Add multiple history jobs. + historyJob1 := &model.Job{ID: 1234} + err = t.AddHistoryDDLJob(historyJob1) + c.Assert(err, IsNil) + historyJob2 := &model.Job{ID: 123} + err = t.AddHistoryDDLJob(historyJob2) + c.Assert(err, IsNil) all, err := t.GetAllHistoryDDLJobs() c.Assert(err, IsNil) var lastID int64 @@ -330,6 +341,13 @@ func (s *testSuite) TestDDL(c *C) { lastID = job.ID } + // Test for get last N history ddl jobs. + historyJobs, err := t.GetLastNHistoryDDLJobs(2) + c.Assert(err, IsNil) + c.Assert(len(historyJobs), Equals, 2) + c.Assert(historyJobs[0].ID == 123, IsTrue) + c.Assert(historyJobs[1].ID == 1234, IsTrue) + // Test GetAllDDLJobsInQueue. err = t.EnQueueDDLJob(job) job1 := &model.Job{ID: 2} diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index d6ba91bea8634..9e8fba93ec79f 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" "golang.org/x/net/context" ) @@ -329,6 +330,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count) } +// SplitKeys evenly splits the start, end key into "count" regions. +// Only works for single store. +func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) { + c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count) +} + func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) { c.Lock() defer c.Unlock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index c9ec0fe013d43..2d94f73226507 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -227,14 +227,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse { } func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse { - if !h.checkKeyInRegion(req.GetStartKey()) { - panic("KvScan: startKey not in region") - } endKey := MvccKey(h.endKey).Raw() - if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) { - endKey = req.EndKey + var pairs []Pair + if !req.Reverse { + if !h.checkKeyInRegion(req.GetStartKey()) { + panic("KvScan: startKey not in region") + } + if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) { + endKey = req.EndKey + } + pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + } else { + // TiKV use range [end_key, start_key) for reverse scan. + // Should use the req.EndKey to check in region. + if !h.checkKeyInRegion(req.GetEndKey()) { + panic("KvScan: startKey not in region") + } + + // TiKV use range [end_key, start_key) for reverse scan. + // So the req.StartKey actually is the end_key. + if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) { + endKey = req.StartKey + } + + pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) } - pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + return &kvrpcpb.ScanResponse{ Pairs: convertToPbPairs(pairs), } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index d40037fe52bb6..224f697f7b4f3 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -35,9 +35,13 @@ type Scanner struct { nextStartKey []byte endKey []byte eof bool + + // Use for reverse scan. + reverse bool + nextEndKey []byte } -func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) { +func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { batchSize = scanBatchSize @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz valid: true, nextStartKey: startKey, endKey: endKey, + reverse: reverse, + nextEndKey: endKey, } err := scanner.Next() if kv.IsErrNotFound(err) { @@ -83,6 +89,7 @@ func (s *Scanner) Next() error { if !s.valid { return errors.New("scanner iterator is invalid") } + var err error for { s.idx++ if s.idx >= len(s.cache) { @@ -90,7 +97,7 @@ func (s *Scanner) Next() error { s.Close() return nil } - err := s.getData(bo) + err = s.getData(bo) if err != nil { s.Close() return errors.Trace(err) @@ -101,7 +108,8 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] - if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) || + (s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) { s.eof = true s.Close() return nil @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { func (s *Scanner) getData(bo *Backoffer) error { logutil.Logger(context.Background()).Debug("txn getData", zap.Binary("nextStartKey", s.nextStartKey), + zap.Binary("nextEndKey", s.nextEndKey), + zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client) - + var reqEndKey, reqStartKey []byte + var loc *KeyLocation + var err error for { - loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + if !s.reverse { + loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + } else { + loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey) + } if err != nil { return errors.Trace(err) } - reqEndKey := s.endKey - if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { - reqEndKey = loc.EndKey + if !s.reverse { + reqEndKey = s.endKey + if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { + reqEndKey = loc.EndKey + } + } else { + reqStartKey = s.nextStartKey + if len(reqStartKey) == 0 || + (len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) { + reqStartKey = loc.StartKey + } } req := &tikvrpc.Request{ @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error { NotFillCache: s.snapshot.notFillCache, }, } + if s.reverse { + req.Scan.StartKey = s.nextEndKey + req.Scan.EndKey = reqStartKey + req.Scan.Reverse = true + } resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { return errors.Trace(err) @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error { if len(kvPairs) < s.batchSize { // No more data in current Region. Next getData() starts // from current Region's endKey. - s.nextStartKey = loc.EndKey - if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) { + if !s.reverse { + s.nextStartKey = loc.EndKey + } else { + s.nextEndKey = reqStartKey + } + if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) || + (s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) { // Current Region is the last one. s.eof = true } @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error { // may get an empty response if the Region in fact does not have // more data. lastKey := kvPairs[len(kvPairs)-1].GetKey() - s.nextStartKey = kv.Key(lastKey).Next() + if !s.reverse { + s.nextStartKey = kv.Key(lastKey).Next() + } else { + s.nextEndKey = kv.Key(lastKey) + } return nil } } diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 94b720314f2bd..0d429e768a178 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -41,7 +41,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) - scanner, err := newScanner(snapshot, []byte("a"), nil, 10) + scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -49,7 +49,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) - scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10) + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('h'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -57,3 +57,36 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) } + +func (s *testScanMockSuite) TestReverseScan(c *C) { + store := NewTestStore(c).(*tikvStore) + defer store.Close() + + txn, err := store.Begin() + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + err = txn.Set([]byte{ch}, []byte{ch}) + c.Assert(err, IsNil) + } + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + + txn, err = store.Begin() + c.Assert(err, IsNil) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) + c.Assert(err, IsNil) + for ch := byte('y'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) + + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true) + c.Assert(err, IsNil) + for ch := byte('h'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) +} diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a22cc6db687a7..a3ba50e66a2dd 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -285,13 +285,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { // Iter return a list of key-value pair after `k`. func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { - scanner, err := newScanner(s, k, upperBound, scanBatchSize) + scanner, err := newScanner(s, k, upperBound, scanBatchSize, false) return scanner, errors.Trace(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { - return nil, kv.ErrNotImplemented + scanner, err := newScanner(s, nil, k, scanBatchSize, true) + return scanner, errors.Trace(err) } func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 9dc325d65fe9d..a495e246ec526 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -125,7 +125,7 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error if logFreq%10 == 0 { logutil.Logger(context.Background()).Info("wait scatter region", zap.Uint64("regionID", regionID), - zap.String("desc", string(resp.Desc)), + zap.String("reverse", string(resp.Desc)), zap.String("status", pdpb.OperatorStatus_name[int32(resp.Status)])) } logFreq++ diff --git a/structure/hash.go b/structure/hash.go index 609c4df398b68..78eecc888e342 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -216,6 +216,23 @@ func (t *TxStructure) HGetAll(key []byte) ([]HashPair, error) { return res, errors.Trace(err) } +// HGetLastN gets latest N fields and values in hash. +func (t *TxStructure) HGetLastN(key []byte, num int) ([]HashPair, error) { + res := make([]HashPair, 0, num) + err := t.iterReverseHash(key, func(field []byte, value []byte) (bool, error) { + pair := HashPair{ + Field: append([]byte{}, field...), + Value: append([]byte{}, value...), + } + res = append(res, pair) + if len(res) >= num { + return false, nil + } + return true, nil + }) + return res, errors.Trace(err) +} + // HClear removes the hash value of the key. func (t *TxStructure) HClear(key []byte) error { metaKey := t.encodeHashMetaKey(key) @@ -268,6 +285,37 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) return nil } +func (t *TxStructure) iterReverseHash(key []byte, fn func(k []byte, v []byte) (bool, error)) error { + dataPrefix := t.hashDataKeyPrefix(key) + it, err := t.reader.IterReverse(dataPrefix.PrefixNext()) + if err != nil { + return errors.Trace(err) + } + + var field []byte + for it.Valid() { + if !it.Key().HasPrefix(dataPrefix) { + break + } + + _, field, err = t.decodeHashDataKey(it.Key()) + if err != nil { + return errors.Trace(err) + } + + more, err := fn(field, it.Value()) + if !more || err != nil { + return errors.Trace(err) + } + + err = it.Next() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) { v, err := t.reader.Get(metaKey) if kv.ErrNotExist.Equal(err) { diff --git a/structure/structure_test.go b/structure/structure_test.go index 33e3f15c74330..354304efb3ebe 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -216,6 +216,17 @@ func (s *testTxStructureSuite) TestHash(c *C) { {Field: []byte("1"), Value: []byte("1")}, {Field: []byte("2"), Value: []byte("2")}}) + res, err = tx.HGetLastN(key, 1) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}}) + + res, err = tx.HGetLastN(key, 2) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}, + {Field: []byte("1"), Value: []byte("1")}}) + err = tx.HDel(key, []byte("1")) c.Assert(err, IsNil) diff --git a/structure/type.go b/structure/type.go index 89759269871c9..7096d70e86984 100644 --- a/structure/type.go +++ b/structure/type.go @@ -63,6 +63,11 @@ func (t *TxStructure) encodeHashDataKey(key []byte, field []byte) kv.Key { return codec.EncodeBytes(ek, field) } +// EncodeHashDataKey exports for tests. +func (t *TxStructure) EncodeHashDataKey(key []byte, field []byte) kv.Key { + return t.encodeHashDataKey(key, field) +} + func (t *TxStructure) decodeHashDataKey(ek kv.Key) ([]byte, []byte, error) { var ( key []byte diff --git a/util/admin/admin.go b/util/admin/admin.go index 8657c16554b45..96fe43452c5ff 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -233,7 +233,7 @@ const DefNumHistoryJobs = 10 // The maximum count of history jobs is num. func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) { t := meta.NewMeta(txn) - jobs, err := t.GetAllHistoryDDLJobs() + jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs) if err != nil { return nil, errors.Trace(err) } diff --git a/util/kvencoder/kv_encoder.go b/util/kvencoder/kv_encoder.go index c6b3c93ff8234..2e68a2319ca3f 100644 --- a/util/kvencoder/kv_encoder.go +++ b/util/kvencoder/kv_encoder.go @@ -188,7 +188,7 @@ func (e *kvEncoder) EncodePrepareStmt(tableID int64, stmtID uint32, param ...int func (e *kvEncoder) EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) { mockTxn := kv.NewMockTxn() m := meta.NewMeta(mockTxn) - k, v := m.GenAutoTableIDIDKeyValue(dbID, tableID, autoID) + k, v := m.GenAutoTableIDKeyValue(dbID, tableID, autoID) return KvPair{Key: k, Val: v}, nil }