Skip to content

Commit

Permalink
fail pipelined dml when max ttl exceeded (#1329)
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
Co-authored-by: ekexium <eke@fastmail.com>
  • Loading branch information
you06 and ekexium authored Apr 30, 2024
1 parent 52c232b commit 6cb0704
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
40 changes: 40 additions & 0 deletions integration_tests/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -466,3 +467,42 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
s.NotNil(err)
s.ErrorContains(err, "ttl manager is closed")
}

func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
originManagedTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL)
originMaxPipelinedTxnTTL := atomic.LoadUint64(&transaction.MaxPipelinedTxnTTL)
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, 200) // set to 200ms
updateGlobalConfig(func(conf *config.Config) {
conf.MaxTxnTTL = 200 // set to 200ms
})
defer func() {
atomic.StoreUint64(&transaction.ManagedLockTTL, originManagedTTLVal)
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, originMaxPipelinedTxnTTL)
restoreGlobalConfFunc()
}()

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
flushed, err := txnProbe.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
s.Nil(txn.GetMemBuffer().FlushWait())
s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1"))

s.True(txnProbe.GetCommitter().IsTTLRunning())

s.Eventuallyf(func() bool {
return !txnProbe.GetCommitter().IsTTLRunning()
}, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after max ttl")

txn.Set([]byte("key2"), []byte("value2"))
flushed, err = txn.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
err = txn.GetMemBuffer().FlushWait()
s.NotNil(err)
s.ErrorContains(err, "ttl manager is closed")
}
5 changes: 5 additions & 0 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,11 @@ func keepAlive(
if c.isPessimistic && lockCtx != nil && lockCtx.LockExpired != nil {
atomic.StoreUint32(lockCtx.LockExpired, 1)
}
if isPipelinedTxn {
// the pipelined txn can last a long time after max ttl exceeded.
// if we don't stop it, it may fail when committing the primary key with high probability.
tm.close()
}
return
}

Expand Down

0 comments on commit 6cb0704

Please sign in to comment.