Skip to content

Commit

Permalink
txn: Update client-go to fix the issue that GC BatchResolveLcok may m…
Browse files Browse the repository at this point in the history
…iss primary pessimistic locks (#45143) (#45151)

close #45134
  • Loading branch information
ti-chi-bot committed Jul 4, 2023
1 parent 887d342 commit 613ecc5
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 10 deletions.
8 changes: 8 additions & 0 deletions domain/domain.go
Expand Up @@ -972,6 +972,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
return store.GetPDClient()
}
return nil
}

// LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -18,7 +18,7 @@ require (
github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5
github.com/coocood/freecache v1.2.1
github.com/coreos/go-semver v0.3.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8
github.com/danjacques/gofslock v0.0.0-20191023191349-0a45f885bc37
github.com/dgraph-io/ristretto v0.1.1-0.20220403145359-8e850b710d6d
Expand Down Expand Up @@ -63,7 +63,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -755,8 +755,8 @@ github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df/go.mod h1:6Fq8o
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6 h1:CWnu4ji9owlkK+E7B5X9rg6EiwTbyDJY96lspfDVYkg=
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6 h1:+RE7e74gaujHFwxfpZ+tRQxF7H1ekd7sYm/tkACq5OA=
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
Expand Down
7 changes: 1 addition & 6 deletions store/mockstore/unistore/tikv/mvcc.go
Expand Up @@ -1267,12 +1267,7 @@ func (store *MVCCStore) Cleanup(reqCtx *requestCtx, key []byte, startTS, current
func (store *MVCCStore) appendScannedLock(locks []*kvrpcpb.LockInfo, it *lockstore.Iterator, maxTS uint64) []*kvrpcpb.LockInfo {
lock := mvcc.DecodeLock(it.Value())
if lock.StartTS < maxTS {
locks = append(locks, &kvrpcpb.LockInfo{
PrimaryLock: lock.Primary,
LockVersion: lock.StartTS,
Key: safeCopy(it.Key()),
LockTtl: uint64(lock.TTL),
})
locks = append(locks, lock.ToLockInfo(append([]byte{}, it.Key()...)))
}
return locks
}
Expand Down
142 changes: 142 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/auth"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
Expand All @@ -47,6 +49,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

Expand Down Expand Up @@ -3284,3 +3287,142 @@ func TestRCUpdateWithPointGet(t *testing.T) {
require.Equal(t, uint64(1), tk1.Session().AffectedRows())
tk1.MustExec("commit")
}

func mustTimeout[T interface{}](t *testing.T, ch <-chan T, timeout time.Duration) {
select {
case res := <-ch:
require.FailNow(t, fmt.Sprintf("received signal when not expected: %v", res))
case <-time.After(timeout):
}
}
func mustRecv[T interface{}](t *testing.T, ch <-chan T) T {
select {
case <-time.After(time.Second):
case res := <-ch:
return res
}
require.FailNow(t, "signal not received after waiting for one second")
panic("unreachable")
}

func mustLocked(t *testing.T, store kv.Storage, stmt string) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("begin pessimistic")
tk.MustGetErrCode(stmt, errno.ErrLockAcquireFailAndNoWaitSet)
tk.MustExec("rollback")
}

func TestBatchResolveLocks(t *testing.T) {
if !*realtikvtest.WithRealTiKV {
t.Skip("this test doesn't work due to some problem in unistore's implementation")
}

store, domain, clean := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
defer clean()

if *realtikvtest.WithRealTiKV {
// Disable in-memory pessimistic lock since it cannot be scanned in current implementation.
// TODO: Remove this after supporting scan lock for in-memory pessimistic lock.
tkcfg := testkit.NewTestKit(t, store)
res := tkcfg.MustQuery("show config where name = 'pessimistic-txn.in-memory' and type = 'tikv'").Rows()
if len(res) > 0 && res[0][3].(string) == "true" {
tkcfg.MustExec("set config tikv `pessimistic-txn.in-memory`=\"false\"")
tkcfg.MustQuery("show warnings").Check(testkit.Rows())
defer func() {
tkcfg.MustExec("set config tikv `pessimistic-txn.in-memory`=\"true\"")
}()
time.Sleep(time.Second)
} else {
t.Log("skip disabling in-memory pessimistic lock, current config:", res)
}
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int primary key, v int)")
tk.MustExec("create table t2 (id int primary key, v int)")
tk.MustExec("create table t3 (id int primary key, v int)")
tk.MustExec("insert into t1 values (1, 1), (2, 2)")
tk.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("insert into t3 values (1, 1)")
tk.MustExec("set @@tidb_enable_async_commit=0")
tk.MustExec("set @@tidb_enable_1pc=0")

// Split region
{
tableID, err := strconv.ParseInt(tk.MustQuery(`select tidb_table_id from information_schema.tables where table_schema = "test" and table_name = "t2"`).Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
key := tablecodec.EncodeTablePrefix(tableID)
_, err = domain.GetPDClient().SplitRegions(context.Background(), [][]byte{key})
require.NoError(t, err)
}

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")

require.NoError(t, failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
require.NoError(t, failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
require.NoError(t, failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
require.NoError(t, failpoint.Disable("tikvclient/beforeCommitSecondaries"))
require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
require.NoError(t, failpoint.Disable("tikvclient/onRollback"))
}()

// ----------------
// Simulate issue https://github.com/pingcap/tidb/issues/43243

tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk2.MustExec("update t2 set v = v + 1 where id = 2")
ch := make(chan struct{})
go func() {
// The `MERGE()` hint is not supported in release-6.1 branch. We use an equivalent `join` statement instead.
// tk.MustExec(`
// with
// c as (select /*+ MERGE() */ v from t1 where id in (1, 2))
// update c join t2 on c.v = t2.id set t2.v = t2.v + 10`)
tk.MustExec(`update t1 join t2 on t1.v = t2.id set t2.v = t2.v + 10 where t1.id in (1, 2)`)
ch <- struct{}{}
}()
// tk blocked on row 2
mustTimeout(t, ch, time.Millisecond*100)
// Change the rows that should be locked by tk.
tk3.MustExec("update t1 set v = v + 3")
// Release row 2 and resume tk.
tk2.MustExec("commit")
mustRecv(t, ch)

// tk should have updated row 4 and row 5, and 4 should be the primary.
// At the same time row 1 should be the old primary, row2 points to row 1.
// Add another secondary that's smaller than the current primary.
tk.MustExec("update t2 set v = v + 10 where id = 3")
tk.MustExec("commit")

// ----------------
// Simulate issue https://github.com/pingcap/tidb/issues/45134
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t3 where id = 1 for update").Check(testkit.Rows("1 1"))
tk.MustExec("rollback")
// tk leaves a pessimistic lock on row 6. Try to ensure it.
mustLocked(t, store, "select * from t3 where id = 1 for update nowait")

// Simulate a later GC that should resolve all stale lock produced in above steps.
currentTS, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
_, err = gcworker.RunResolveLocks(context.Background(), store.(tikv.Storage), domain.GetPDClient(), currentTS.Ver, "gc-worker-test-batch-resolve-locks", 1, false)
require.NoError(t, err)

// Check row 6 unlocked
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from t3 where id = 1 for update nowait").Check(testkit.Rows("1 1"))
tk3.MustExec("rollback")

// Check data consistency
tk.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 1", "2 3", "3 13", "4 14", "5 15"))
}

0 comments on commit 613ecc5

Please sign in to comment.