Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: use optimistic transaction with 0 retry limit for ttl #39924

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
se := session.NewSession(sctx, sctx, func() {})
se := session.NewSession(sctx, sctx, func(_ session.Session) {})

isc := cache.NewInfoSchemaCache(time.Hour)

Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/ttlstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
ttlSession := session.NewSession(sctx, tk.Session(), func() {})
ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {})

isc := cache.NewTableStatusCache(time.Hour)

Expand Down
8 changes: 4 additions & 4 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type Session interface {
type session struct {
sessionctx.Context
sqlExec sqlexec.SQLExecutor
closeFn func()
closeFn func(Session)
}

// NewSession creates a new Session
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session {
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session {
return &session{
Context: sctx,
sqlExec: sqlExec,
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
defer tracer.EnterPhase(tracer.Phase())

tracer.EnterPhase(metrics.PhaseBeginTxn)
if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil {
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
return err
}
tracer.EnterPhase(metrics.PhaseOther)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
s.closeFn()
s.closeFn(s)
s.Context = nil
s.sqlExec = nil
s.closeFn = nil
Expand Down
7 changes: 7 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
Expand All @@ -52,15 +53,21 @@ go_test(
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//session",
"//sessionctx",
"//sessionctx/variable",
"//testkit",
"//ttl/cache",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
13 changes: 6 additions & 7 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"

const timeFormat = "2006-01-02 15:04:05"
Expand All @@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID)
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
Expand Down Expand Up @@ -491,11 +491,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
var expireTime time.Time

err := se.RunInTxn(ctx, func() error {
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -505,7 +504,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return err
}
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -526,7 +525,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id))
_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))

return err
})
Expand Down
98 changes: 98 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
)

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)

_, err = se.ExecuteSQL(context.Background(), "ROLLBACK")
require.NoError(t, err)
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
require.NoError(t, err)

return se
}

storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
variable.TTLJobRunInterval.Store(0)
defer func() {
variable.TTLJobRunInterval.Store(storedTTLJobRunInterval)
}()

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
// simply lock a new job
m := ttlworker.NewJobManager("test-id", nil, store)
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
require.NoError(t, err)
job.Finish(se, time.Now())

// lock one table in parallel, only one of them should lock successfully
testTimes := 100
concurrency := 5
for i := 0; i < testTimes; i++ {
successCounter := atomic.NewUint64(0)
successJob := &ttlworker.TTLJob{}

wg := sync.WaitGroup{}
for j := 0; j < concurrency; j++ {
jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j)
wg.Add(1)
go func() {
m := ttlworker.NewJobManager(jobManagerID, nil, store)

se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
if err == nil {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
}
wg.Done()
}()
}
wg.Wait()

require.Equal(t, uint64(1), successCounter.Load())
successJob.Finish(se, time.Now())
}
}
24 changes: 20 additions & 4 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -139,6 +140,22 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) {
m.scanWorkers = workers
}

// TTLJob exports the ttlJob for test
type TTLJob = ttlJob

// LockNewJob is an exported version of lockNewJob for test
func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) {
return m.lockNewJob(ctx, se, table, now)
}

func (j *ttlJob) Finish(se session.Session, now time.Time) {
j.finish(se, now)
}

func (j *ttlJob) ID() string {
return j.id
}

func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {
statistics := &ttlStatistics{}
return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}}
Expand Down Expand Up @@ -195,7 +212,6 @@ func TestReadyForNewJobTables(t *testing.T) {
func TestLockNewTable(t *testing.T) {
now, err := time.Parse(timeFormat, "2022-12-05 17:13:05")
assert.NoError(t, err)
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
expireTime := now

testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}}
Expand All @@ -219,7 +235,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, nil,
},
{
Expand All @@ -241,7 +257,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, nil,
},
{
Expand All @@ -255,7 +271,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"),
setTableStatusOwnerSQL(1, now, expireTime, "test-id"),
nil, errors.New("test error message"),
},
}, false, true},
Expand Down
18 changes: 17 additions & 1 deletion ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/ngaut/pools"
Expand All @@ -26,7 +27,9 @@ import (
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

type sessionPool interface {
Expand Down Expand Up @@ -57,10 +60,23 @@ func getSession(pool sessionPool) (session.Session, error) {
return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx)
}

se := session.NewSession(sctx, exec, func() {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
}

pool.Put(resource)
})

// store and set the retry limit to 0
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
if err != nil {
se.Close()
return nil, err
}

// Force rollback the session to guarantee the session is not in any explicit transaction
if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil {
se.Close()
Expand Down