Skip to content

Commit

Permalink
ttl: use optimistic transaction with 0 retry limit for ttl (#39924)
Browse files Browse the repository at this point in the history
close #39923
  • Loading branch information
YangKeao committed Dec 14, 2022
1 parent aeec193 commit de865c4
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 18 deletions.
2 changes: 1 addition & 1 deletion ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
se := session.NewSession(sctx, sctx, func() {})
se := session.NewSession(sctx, sctx, func(_ session.Session) {})

isc := cache.NewInfoSchemaCache(time.Hour)

Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/ttlstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
ttlSession := session.NewSession(sctx, tk.Session(), func() {})
ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {})

isc := cache.NewTableStatusCache(time.Hour)

Expand Down
8 changes: 4 additions & 4 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type Session interface {
type session struct {
sessionctx.Context
sqlExec sqlexec.SQLExecutor
closeFn func()
closeFn func(Session)
}

// NewSession creates a new Session
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session {
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session {
return &session{
Context: sctx,
sqlExec: sqlExec,
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
defer tracer.EnterPhase(tracer.Phase())

tracer.EnterPhase(metrics.PhaseBeginTxn)
if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil {
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
return err
}
tracer.EnterPhase(metrics.PhaseOther)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
s.closeFn()
s.closeFn(s)
s.Context = nil
s.sqlExec = nil
s.closeFn = nil
Expand Down
7 changes: 7 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
Expand All @@ -52,15 +53,21 @@ go_test(
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//session",
"//sessionctx",
"//sessionctx/variable",
"//testkit",
"//ttl/cache",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
13 changes: 6 additions & 7 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"

const timeFormat = "2006-01-02 15:04:05"
Expand All @@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID)
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
Expand Down Expand Up @@ -499,11 +499,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
var expireTime time.Time

err := se.RunInTxn(ctx, func() error {
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -513,7 +512,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return err
}
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -534,7 +533,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id))
_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))

return err
})
Expand Down
98 changes: 98 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
)

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)

_, err = se.ExecuteSQL(context.Background(), "ROLLBACK")
require.NoError(t, err)
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
require.NoError(t, err)

return se
}

storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
variable.TTLJobRunInterval.Store(0)
defer func() {
variable.TTLJobRunInterval.Store(storedTTLJobRunInterval)
}()

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
// simply lock a new job
m := ttlworker.NewJobManager("test-id", nil, store)
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
require.NoError(t, err)
job.Finish(se, time.Now())

// lock one table in parallel, only one of them should lock successfully
testTimes := 100
concurrency := 5
for i := 0; i < testTimes; i++ {
successCounter := atomic.NewUint64(0)
successJob := &ttlworker.TTLJob{}

wg := sync.WaitGroup{}
for j := 0; j < concurrency; j++ {
jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j)
wg.Add(1)
go func() {
m := ttlworker.NewJobManager(jobManagerID, nil, store)

se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
if err == nil {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
}
wg.Done()
}()
}
wg.Wait()

require.Equal(t, uint64(1), successCounter.Load())
successJob.Finish(se, time.Now())
}
}
24 changes: 20 additions & 4 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -139,6 +140,22 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) {
m.scanWorkers = workers
}

// TTLJob exports the ttlJob for test
type TTLJob = ttlJob

// LockNewJob is an exported version of lockNewJob for test
func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) {
return m.lockNewJob(ctx, se, table, now)
}

func (j *ttlJob) Finish(se session.Session, now time.Time) {
j.finish(se, now)
}

func (j *ttlJob) ID() string {
return j.id
}

func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {
statistics := &ttlStatistics{}
return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}}
Expand Down Expand Up @@ -195,7 +212,6 @@ func TestReadyForNewJobTables(t *testing.T) {
func TestLockNewTable(t *testing.T) {
now, err := time.Parse(timeFormat, "2022-12-05 17:13:05")
assert.NoError(t, err)
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
expireTime := now

testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}}
Expand All @@ -219,7 +235,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, nil,
},
{
Expand All @@ -241,7 +257,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, nil,
},
{
Expand All @@ -255,7 +271,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, errors.New("test error message"),
},
}, false, true},
Expand Down
18 changes: 17 additions & 1 deletion ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/ngaut/pools"
Expand All @@ -26,7 +27,9 @@ import (
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

type sessionPool interface {
Expand Down Expand Up @@ -57,10 +60,23 @@ func getSession(pool sessionPool) (session.Session, error) {
return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx)
}

se := session.NewSession(sctx, exec, func() {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
}

pool.Put(resource)
})

// store and set the retry limit to 0
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
if err != nil {
se.Close()
return nil, err
}

// Force rollback the session to guarantee the session is not in any explicit transaction
if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil {
se.Close()
Expand Down

0 comments on commit de865c4

Please sign in to comment.