Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: implement non-block reading for Get and BatchGet under the large transaction protocol #13599

Merged
merged 8 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (e *tableScanExec) Next(ctx context.Context) (value [][]byte, err error) {
}

func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func (e *indexScanExec) Next(ctx context.Context) (value [][]byte, err error) {

// getRowFromPoint is only used for unique key.
func (e *indexScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 6 additions & 0 deletions store/mockstore/mocktikv/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) {
// After that, the query should read the previous version data.
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))

// Cover BatchGet.
tk.MustQuery("select * from t where id in (1)").Check(testkit.Rows("1 1"))

// Cover PointGet.
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 1"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this query use MaxTS?
Then we need also to cover PointGet with real TS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// And check the large txn is still alive.
pairs = s.mvccStore.Scan([]byte("primary"), nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
Expand Down
10 changes: 5 additions & 5 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,25 @@ func lock(key, primary string, ts uint64) *kvrpcpb.LockInfo {
}

func (s *testMockTiKVSuite) mustGetNone(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, IsNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetErr(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, NotNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetOK(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}

func (s *testMockTiKVSuite) mustGetRC(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC, nil)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}
Expand Down Expand Up @@ -411,7 +411,7 @@ func (s *testMockTiKVSuite) TestBatchGet(c *C) {
s.mustPutOK(c, "k2", "v2", 3, 4)
s.mustPutOK(c, "k3", "v3", 1, 2)
batchKeys := [][]byte{[]byte("k1"), []byte("k2"), []byte("k3")}
pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI)
pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI, nil)
for _, pair := range pairs {
c.Assert(pair.Err, IsNil)
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ func (e *rawEntry) Less(than btree.Item) bool {

// MVCCStore is a mvcc key-value storage.
type MVCCStore interface {
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error)
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error)
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS,
forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
Expand Down
9 changes: 4 additions & 5 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,11 @@ func (dec *skipDecoder) Decode(iter *Iterator) (bool, error) {

// Get implements the MVCCStore interface.
// key cannot be nil or []byte{}
func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) {
func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

// TODO: Update the nil here to support point-get for non-block reading on the large transaction.
return mvcc.getValue(key, startTS, isoLevel, nil)
return mvcc.getValue(key, startTS, isoLevel, resolvedLocks)
}

func (mvcc *MVCCLevelDB) getValue(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
Expand Down Expand Up @@ -317,13 +316,13 @@ func getValue(iter *Iterator, key []byte, startTS uint64, isoLevel kvrpcpb.Isola
}

// BatchGet implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair {
func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

pairs := make([]Pair, 0, len(ks))
for _, k := range ks {
v, err := mvcc.getValue(k, startTS, isoLevel, nil)
v, err := mvcc.getValue(k, startTS, isoLevel, resolvedLocks)
if v == nil && err == nil {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
panic("KvGet: key not in region")
}

val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel)
val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks())
if err != nil {
return &kvrpcpb.GetResponse{
Error: convertToKeyError(err),
Expand Down Expand Up @@ -417,7 +417,7 @@ func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.Bat
panic("KvBatchGet: key not in region")
}
}
pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel)
pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks())
return &kvrpcpb.BatchGetResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
ScanDetail: true,
})
startTime := time.Now()
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, task.storeType)
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -774,10 +774,10 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) {
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) {
sender := NewRegionRequestSender(ch.RegionCache, ch.Client)
req.Context.ResolvedLocks = ch.minCommitTSPushed.Get()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, ReadTimeoutMedium, sType)
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
return resp, ctx, sender.storeAddr, err
}

Expand Down
34 changes: 27 additions & 7 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32
minCommitTSPushed

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
Expand All @@ -80,13 +81,18 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *
priority: pb.CommandPri_Normal,
vars: kv.DefaultVars,
replicaReadSeed: replicaReadSeed,
minCommitTSPushed: minCommitTSPushed{
data: make(map[uint64]struct{}, 5),
},
}
}

func (s *tikvSnapshot) setSnapshotTS(ts uint64) {
// Invalidate cache if the snapshotTS change!
s.version.Ver = ts
s.cached = nil
// And also the minCommitTS pushed information.
s.minCommitTSPushed.data = make(map[uint64]struct{}, 5)
}

// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
Expand Down Expand Up @@ -191,7 +197,12 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
}

func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
sender := NewRegionRequestSender(s.store.regionCache, s.store.client)
cli := clientHelper{
LockResolver: s.store.lockResolver,
RegionCache: s.store.regionCache,
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}

pending := batch.keys
for {
Expand All @@ -202,7 +213,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
Priority: s.priority,
NotFillCache: s.notFillCache,
})
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium)

resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV)

if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -240,7 +253,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks)
msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -285,10 +298,17 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}

failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
panic("cache miss")
if bo.ctx.Value("TestSnapshotCache") != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe using exp like 1*return(true)->return(false) like this

c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/keepHistory", `1*return(true)->return(false)`), IsNil)
and check failpoint.Value == true at here is more gofail idiom~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the concurrent test environment, that mock panic may affect other tests.
bo.ctx.Value("TestSnapshotCache") is used to address that problem. @lysu

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can pass a TS and panic on equal TS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In failpoint.Inject it doesn't know which TS @coocood

panic("cache miss")
}
})

sender := NewRegionRequestSender(s.store.regionCache, s.store.client)
cli := clientHelper{
LockResolver: s.store.lockResolver,
RegionCache: s.store.regionCache,
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
&pb.GetRequest{
Expand All @@ -303,7 +323,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort)
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -328,7 +348,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
30 changes: 30 additions & 0 deletions store/tikv/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -121,6 +122,7 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) {
func (s *testSnapshotSuite) TestSnapshotCache(c *C) {
txn := s.beginTxn(c)
c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil)
c.Assert(txn.Delete(kv.Key("y")), IsNil) // store data is affected by other tests.
c.Assert(txn.Commit(context.Background()), IsNil)

txn = s.beginTxn(c)
Expand Down Expand Up @@ -205,3 +207,31 @@ func (s *testSnapshotSuite) TestLockNotFoundPrint(c *C) {
key := prettyLockNotFoundKey(msg)
c.Assert(key, Equals, "{tableID=12937, indexID=1, indexValues={C19092900000048625523, }}")
}

func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
txn := s.beginTxn(c)
c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil)
c.Assert(txn.Set(kv.Key("y"), []byte("y")), IsNil)
ctx := context.Background()
bo := NewBackoffer(ctx, PrewriteMaxBackoff)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
committer.lockTTL = txnLockTTL(txn.startTime, 10<<20)
c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil)

txn1 := s.beginTxn(c)
// txn1 is not blocked by txn in the large txn protocol.
_, err = txn1.Get(ctx, kv.Key("x"))
c.Assert(kv.IsErrNotFound(errors.Trace(err)), IsTrue)

res, err := txn1.BatchGet(ctx, []kv.Key{kv.Key("x"), kv.Key("y"), kv.Key("z")})
c.Assert(err, IsNil)
c.Assert(res, HasLen, 0)

// Commit txn, check the final commit ts is pushed.
c.Assert(committer.commitKeys(bo, committer.keys), IsNil)
status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("x"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Greater, txn1.StartTS())
}
4 changes: 4 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.TxnHeartBeatResponse{
RegionError: e,
}
case CmdCheckTxnStatus:
p = &kvrpcpb.CheckTxnStatusResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down