diff --git a/DEPS.bzl b/DEPS.bzl index d2fedf9090f41..f73ceec92ffa1 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3054,8 +3054,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=", - version = "v0.0.0-20221130022225-6c56ac56fe5f", + sum = "h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I=", + version = "v0.0.0-20230524051921-3dc79e773139", ) go_repository( name = "com_github_pingcap_log", diff --git a/go.mod b/go.mod index 939aab9388cd0..3a5eed5ee3b65 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f + github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index b67117b8ba4ce..d2e44b68f3ecc 100644 --- a/go.sum +++ b/go.sum @@ -809,8 +809,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ= -github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I= +github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/store/mockstore/unistore/tikv/kverrors/errors.go b/store/mockstore/unistore/tikv/kverrors/errors.go index 22054aab567f7..613e45b274ab5 100644 --- a/store/mockstore/unistore/tikv/kverrors/errors.go +++ b/store/mockstore/unistore/tikv/kverrors/errors.go @@ -43,8 +43,8 @@ func BuildLockErr(key []byte, lock *mvcc.Lock) *ErrLocked { func (e *ErrLocked) Error() string { lock := e.Lock return fmt.Sprintf( - "key is locked, key: %q, Type: %v, primary: %q, startTS: %v, forUpdateTS: %v, useAsyncCommit: %v", - e.Key, lock.Op, lock.Primary, lock.StartTS, lock.ForUpdateTS, lock.UseAsyncCommit, + "key is locked, key: %v, lock: %v", + hex.EncodeToString(e.Key), lock.String(), ) } @@ -148,3 +148,13 @@ func (e *ErrAssertionFailed) Error() string { return fmt.Sprintf("AssertionFailed { StartTS: %v, Key: %v, Assertion: %v, ExistingStartTS: %v, ExistingCommitTS: %v }", e.StartTS, hex.EncodeToString(e.Key), e.Assertion.String(), e.ExistingStartTS, e.ExistingCommitTS) } + +// ErrPrimaryMismatch is returned if CheckTxnStatus request is sent to a secondary lock. +type ErrPrimaryMismatch struct { + Key []byte + Lock *mvcc.Lock +} + +func (e *ErrPrimaryMismatch) Error() string { + return fmt.Sprintf("primary mismatch, key: %v, lock: %v", hex.EncodeToString(e.Key), e.Lock.String()) +} diff --git a/store/mockstore/unistore/tikv/mvcc.go b/store/mockstore/unistore/tikv/mvcc.go index 753c2e49c709c..8efd8f26ee75c 100644 --- a/store/mockstore/unistore/tikv/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc.go @@ -419,6 +419,13 @@ func (store *MVCCStore) CheckTxnStatus(reqCtx *requestCtx, lock := store.getLock(reqCtx, req.PrimaryKey) batch := store.dbWriter.NewWriteBatch(req.LockTs, 0, reqCtx.rpcCtx) if lock != nil && lock.StartTS == req.LockTs { + if !bytes.Equal(req.PrimaryKey, lock.Primary) { + return TxnStatus{}, &kverrors.ErrPrimaryMismatch{ + Key: req.PrimaryKey, + Lock: lock, + } + } + // For an async-commit lock, never roll it back or push forward it MinCommitTS. if lock.UseAsyncCommit && !req.ForceSyncCommit { log.S().Debugf("async commit startTS=%v secondaries=%v minCommitTS=%v", lock.StartTS, lock.Secondaries, lock.MinCommitTS) @@ -1025,10 +1032,6 @@ func (store *MVCCStore) Commit(req *requestCtx, keys [][]byte, startTS, commitTS Key: key, } } - if lock.Op == uint8(kvrpcpb.Op_PessimisticLock) { - log.Warn("commit a pessimistic lock with Lock type", zap.Binary("key", key), zap.Uint64("start ts", startTS), zap.Uint64("commit ts", commitTS)) - lock.Op = uint8(kvrpcpb.Op_Lock) - } isPessimisticTxn = lock.ForUpdateTS > 0 tmpDiff += len(key) + len(lock.Value) batch.Commit(key, &lock) @@ -1304,12 +1307,7 @@ func (store *MVCCStore) Cleanup(reqCtx *requestCtx, key []byte, startTS, current func (store *MVCCStore) appendScannedLock(locks []*kvrpcpb.LockInfo, it *lockstore.Iterator, maxTS uint64) []*kvrpcpb.LockInfo { lock := mvcc.DecodeLock(it.Value()) if lock.StartTS < maxTS { - locks = append(locks, &kvrpcpb.LockInfo{ - PrimaryLock: lock.Primary, - LockVersion: lock.StartTS, - Key: safeCopy(it.Key()), - LockTtl: uint64(lock.TTL), - }) + locks = append(locks, lock.ToLockInfo(append([]byte{}, it.Key()...))) } return locks } diff --git a/store/mockstore/unistore/tikv/mvcc/mvcc.go b/store/mockstore/unistore/tikv/mvcc/mvcc.go index b7185766bfeeb..e8184007b8e53 100644 --- a/store/mockstore/unistore/tikv/mvcc/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc/mvcc.go @@ -16,6 +16,8 @@ package mvcc import ( "encoding/binary" + "encoding/hex" + "fmt" "unsafe" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -113,6 +115,18 @@ func (l *Lock) ToLockInfo(key []byte) *kvrpcpb.LockInfo { } } +// String implements fmt.Stringer for Lock. +func (l *Lock) String() string { + return fmt.Sprintf( + "Lock { Type: %v, StartTS: %v, ForUpdateTS: %v, Primary: %v, UseAsyncCommit: %v }", + kvrpcpb.Op(l.Op).String(), + l.StartTS, + l.ForUpdateTS, + hex.EncodeToString(l.Primary), + l.UseAsyncCommit, + ) +} + // UserMeta value for lock. const ( LockUserMetaNoneByte = 0 diff --git a/store/mockstore/unistore/tikv/mvcc_test.go b/store/mockstore/unistore/tikv/mvcc_test.go index 8f70a847ef2fb..9cb97467bed34 100644 --- a/store/mockstore/unistore/tikv/mvcc_test.go +++ b/store/mockstore/unistore/tikv/mvcc_test.go @@ -715,6 +715,15 @@ func TestCheckTxnStatus(t *testing.T) { require.Equal(t, uint64(41), resCommitTs) require.NoError(t, err) require.Equal(t, kvrpcpb.Action_NoAction, action) + + // check on mismatching primary + startTs = 43 + callerStartTs = 44 + currentTs = 44 + MustAcquirePessimisticLock([]byte("another_key"), pk, startTs, startTs, store) + _, _, _, err = CheckTxnStatus(pk, startTs, callerStartTs, currentTs, true, store) + require.IsType(t, &kverrors.ErrPrimaryMismatch{}, errors.Cause(err)) + MustPessimisticRollback(pk, startTs, startTs, store) } func TestCheckSecondaryLocksStatus(t *testing.T) { diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index f7779b51a8d69..a0273980124ea 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -1074,6 +1074,12 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { ExistingCommitTs: x.ExistingCommitTS, }, } + case *kverrors.ErrPrimaryMismatch: + return &kvrpcpb.KeyError{ + PrimaryMismatch: &kvrpcpb.PrimaryMismatch{ + LockInfo: x.Lock.ToLockInfo(x.Key), + }, + } default: return &kvrpcpb.KeyError{ Abort: err.Error(), diff --git a/store/mockstore/unistore/tikv/write.go b/store/mockstore/unistore/tikv/write.go index a9182d78b6f81..5947fc2b7c786 100644 --- a/store/mockstore/unistore/tikv/write.go +++ b/store/mockstore/unistore/tikv/write.go @@ -23,10 +23,12 @@ import ( "github.com/pingcap/badger" "github.com/pingcap/badger/y" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" "github.com/pingcap/tidb/store/mockstore/unistore/lockstore" "github.com/pingcap/tidb/store/mockstore/unistore/tikv/dbreader" "github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc" "github.com/pingcap/tidb/util/mathutil" + "go.uber.org/zap" ) const ( @@ -249,7 +251,10 @@ func (wb *writeBatch) Prewrite(key []byte, lock *mvcc.Lock) { func (wb *writeBatch) Commit(key []byte, lock *mvcc.Lock) { userMeta := mvcc.NewDBUserMeta(wb.startTS, wb.commitTS) k := y.KeyWithTs(key, wb.commitTS) - if lock.Op != uint8(kvrpcpb.Op_Lock) { + if lock.Op == uint8(kvrpcpb.Op_PessimisticLock) { + log.Info("removing a pessimistic lock when committing", zap.Binary("key", key), zap.Uint64("startTS", wb.startTS), zap.Uint64("commitTS", wb.commitTS)) + // Write nothing as if PessimisticRollback is called. + } else if lock.Op != uint8(kvrpcpb.Op_Lock) { wb.dbBatch.set(k, lock.Value, userMeta) } else if bytes.Equal(key, lock.Primary) { opLockKey := y.KeyWithTs(mvcc.EncodeExtraTxnStatusKey(key, wb.startTS), wb.startTS)