diff --git a/.golangci.yml b/.golangci.yml index f75a8499c..2929ea46f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,3 +1,5 @@ +run: + timeout: 5m linters: disable-all: true enable: diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index ee5c99a5d..1e3f3ed39 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -37,9 +37,12 @@ package tikv_test import ( "bytes" "context" + "encoding/json" stderrs "errors" "fmt" + "io" "math" + "net/http" "sync" "sync/atomic" "testing" @@ -48,8 +51,11 @@ import ( "github.com/pingcap/failpoint" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" @@ -1083,6 +1089,126 @@ func (s *testLockWithTiKVSuite) checkIsKeyLocked(key []byte, expectedLocked bool s.NoError(err) } +func (s *testLockWithTiKVSuite) trySetTiKVConfig(name string, value interface{}) func() { + stores, err := s.store.GetPDClient().GetAllStores(context.Background()) + s.NoError(err) + + type configItem struct { + url string + name string + value interface{} + } + + var recoverConfigs []configItem + + httpScheme := "http" + if c, err := config.GetGlobalConfig().Security.ToTLSConfig(); err == nil && c != nil { + httpScheme = "https" + } + + t := s.Suite.T() + + setCfg := func(url, name string, value interface{}) error { + postBody, err := json.Marshal(map[string]interface{}{name: value}) + if err != nil { + return err + } + resp, err := http.Post(url, "text/json", bytes.NewReader(postBody)) + if err != nil { + return err + } + s.NoError(resp.Body.Close()) + if resp.StatusCode != 200 { + return errors.Errorf("post config got unexpected status code: %v, request body: %s", resp.StatusCode, postBody) + } + t.Logf("set config for tikv at %s finished: %s", url, string(postBody)) + return nil + } + +storeIter: + for _, store := range stores { + if store.State != metapb.StoreState_Up { + continue + } + for _, label := range store.Labels { + if label.Key == "engine" && label.Value != "tikv" { + continue storeIter + } + } + + err := func() (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.Errorf("set config for store at %v panicked: %v", store.StatusAddress, r) + } + }() + + url := fmt.Sprintf("%s://%s/config", httpScheme, store.StatusAddress) + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return errors.Errorf("unexpected response status: %v", resp.Status) + } + oldCfgRaw, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + oldCfg := make(map[string]interface{}) + err = json.Unmarshal(oldCfgRaw, &oldCfg) + if err != nil { + return err + } + + oldValue := oldCfg["pessimistic-txn"].(map[string]interface{})["in-memory"] + if assert.ObjectsAreEqual(oldValue, value) { + return nil + } + + err = setCfg(url, name, value) + if err != nil { + return err + } + + recoverConfigs = append(recoverConfigs, configItem{ + url: url, + name: name, + value: oldValue, + }) + + return nil + }() + + if err != nil { + t.Logf("failed to set config for store at %s: %v", store.StatusAddress, err) + } + } + + // Prevent goleak from complaining about its internal connections. + http.DefaultClient.CloseIdleConnections() + + if len(recoverConfigs) > 0 { + // Sleep for a while to ensure the new configs are applied. + time.Sleep(time.Second) + } + + return func() { + for _, item := range recoverConfigs { + err = setCfg(item.url, item.name, item.value) + if err != nil { + t.Logf("failed to recover config for store at %s: %v", item.url, err) + } + } + + // Prevent goleak from complaining about its internal connections. + http.DefaultClient.CloseIdleConnections() + } +} + func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { test := func(asyncCommit bool, onePC bool, causalConsistency bool) { k1 := []byte("k1") @@ -1337,3 +1463,72 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { s.NoError(err) s.Equal([]byte("v1-1"), v) } + +func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { + if *withTiKV { + recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", false) + defer recoverFunc() + } else { + s.T().Skip("this test only works with tikv") + } + + s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`)) + s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`)) + s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`)) + defer func() { + s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback")) + s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries")) + s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + }() + + k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3") + v2, v3 := []byte("v2"), []byte("v3") + + ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) + + txn, err := s.store.Begin() + s.NoError(err) + txn.SetPessimistic(true) + + { + // Produce write conflict on key k2 + txn2, err := s.store.Begin() + s.NoError(err) + s.NoError(txn2.Set(k2, []byte("v0"))) + s.NoError(txn2.Commit(ctx)) + } + + lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now()) + err = txn.LockKeys(ctx, lockCtx, k1, k2) + s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err)) + + // k1 has txn's stale pessimistic lock now. + + forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.NoError(err) + lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) + s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3)) + + s.NoError(txn.Set(k2, v2)) + s.NoError(txn.Set(k3, v3)) + s.NoError(txn.Commit(ctx)) + + // k3 has txn's stale prewrite lock now. + + // Perform ScanLock - BatchResolveLock. + currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.NoError(err) + s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1)) + + // Check data consistency + readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + snapshot := s.store.GetSnapshot(readTS) + _, err = snapshot.Get(ctx, k1) + s.Equal(tikverr.ErrNotExist, err) + v, err := snapshot.Get(ctx, k2) + s.NoError(err) + s.Equal(v2, v) + v, err = snapshot.Get(ctx, k3) + s.NoError(err) + s.Equal(v3, v) +} diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 73d2ed621..d470d6919 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -109,6 +109,11 @@ func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) { s.setSafeTS(storeID, safeTS) } +// GCResolveLockPhase performs the resolve-locks phase of GC, which scans all locks and resolves them. +func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, concurrency int) error { + return s.resolveLocks(ctx, safepoint, concurrency) +} + // LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose. type LockResolverProbe struct { *txnlock.LockResolverProbe diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index d1337d736..74caae652 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -238,11 +238,27 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo txnInfos := make(map[uint64]uint64) startTime := time.Now() for _, l := range expiredLocks { + logutil.Logger(bo.GetCtx()).Debug("BatchResolveLocks handling lock", zap.Stringer("lock", l)) + if _, ok := txnInfos[l.TxnID]; ok { continue } metrics.LockResolverCountWithExpired.Inc() + if l.LockType == kvrpcpb.Op_PessimisticLock { + // BatchResolveLocks forces resolving the locks ignoring whether whey are expired. + // For pessimistic locks, committing them makes no sense, but it won't affect transaction + // correctness if we always roll back them. + // Pessimistic locks needs special handling logic because their primary may not point + // to the real primary of that transaction, and their state cannot be put in `txnInfos`. + // (see: https://github.com/pingcap/tidb/issues/42937). + err := lr.resolvePessimisticLock(bo, l) + if err != nil { + return false, err + } + continue + } + // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l) if err != nil {