Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: escape parameters for finish job sql #40118

Merged
merged 2 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package cache

import (
"context"
"fmt"
"time"

"github.com/pingcap/tidb/sessionctx"
Expand All @@ -43,8 +42,8 @@ const (
const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"

// SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id
func SelectFromTTLTableStatusWithID(tableID int64) string {
return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID)
func SelectFromTTLTableStatusWithID(tableID int64) (string, []interface{}) {
return selectFromTTLTableStatus + " WHERE table_id = %?", []interface{}{tableID}
}

// TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status`
Expand Down
1 change: 1 addition & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
flaky = True,
deps = [
"//infoschema",
"//kv",
"//parser/ast",
"//parser/model",
"//parser/mysql",
Expand Down
48 changes: 32 additions & 16 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ttlworker
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -29,20 +28,34 @@ import (
"go.uber.org/zap"
)

const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = '%s' WHERE table_id = %d AND current_job_status = '%s' AND current_job_id = '%s'"
const finishJobTemplate = "UPDATE mysql.tidb_ttl_table_status SET last_job_id = current_job_id, last_job_start_time = current_job_start_time, last_job_finish_time = '%s', last_job_ttl_expire = current_job_ttl_expire, last_job_summary = '%s', current_job_id = NULL, current_job_owner_id = NULL, current_job_owner_hb_time = NULL, current_job_start_time = NULL, current_job_ttl_expire = NULL, current_job_state = NULL, current_job_status = NULL, current_job_status_update_time = NULL WHERE table_id = %d AND current_job_id = '%s'"
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = '%s' WHERE table_id = %d AND current_job_id = '%s' AND current_job_owner_id = '%s'"
const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = %? WHERE table_id = %? AND current_job_status = %? AND current_job_id = %?"
const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
SET last_job_id = current_job_id,
last_job_start_time = current_job_start_time,
last_job_finish_time = %?,
last_job_ttl_expire = current_job_ttl_expire,
last_job_summary = %?,
current_job_id = NULL,
current_job_owner_id = NULL,
current_job_owner_hb_time = NULL,
current_job_start_time = NULL,
current_job_ttl_expire = NULL,
current_job_state = NULL,
current_job_status = NULL,
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?"

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) string {
return fmt.Sprintf(updateJobCurrentStatusTemplate, newStatus, tableID, oldStatus, jobID)
func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
return updateJobCurrentStatusTemplate, []interface{}{newStatus, tableID, oldStatus, jobID}
}

func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) string {
return fmt.Sprintf(finishJobTemplate, finishTime.Format(timeFormat), summary, tableID, jobID)
func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) (string, []interface{}) {
return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID}
}

func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) string {
return fmt.Sprintf(updateJobStateTemplate, currentJobState, tableID, currentJobID, currentJobOwnerID)
func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) {
return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID}
}

type ttlJob struct {
Expand Down Expand Up @@ -76,9 +89,10 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status
job.status = status
job.statusMutex.Unlock()

_, err := se.ExecuteSQL(ctx, updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id))
sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
Expand All @@ -89,9 +103,10 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID))
sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
Expand All @@ -115,9 +130,10 @@ func (job *ttlJob) finish(se session.Session, now time.Time) {
}
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
_, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id), zap.String("sql", sql), zap.Any("arguments", args))
}
}

Expand Down
53 changes: 32 additions & 21 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
Expand All @@ -31,22 +30,30 @@ import (
"go.uber.org/zap"
)

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"
const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = UUID(),
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
current_job_status_update_time = %?,
current_job_ttl_expire = %?,
current_job_owner_hb_time = %?
WHERE table_id = %?`
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = %? WHERE table_id = %? AND current_job_owner_id = %?"

const timeFormat = "2006-01-02 15:04:05"

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) {
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
return fmt.Sprintf(updateHeartBeatTemplate, now.Format(timeFormat), tableID, id)
func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) {
return updateHeartBeatTemplate, []interface{}{now.Format(timeFormat), tableID, id}
}

// JobManager schedules and manages the ttl jobs on this instance
Expand Down Expand Up @@ -503,19 +510,22 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
var expireTime time.Time

err := se.RunInTxn(ctx, func() error {
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
// cannot find the row, insert the status row
_, err = se.ExecuteSQL(ctx, insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID))
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
sql, args = cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.New("table status row still doesn't exist after insertion")
Expand All @@ -534,9 +544,9 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))

return err
sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
return errors.Wrapf(err, "execute sql: %s", sql)
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -599,9 +609,10 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) error {
now := se.Now()
for _, job := range m.localJobs() {
_, err := se.ExecuteSQL(ctx, updateHeartBeatSQL(job.tbl.ID, now, m.id))
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}
// also updates some internal state for this job
err = job.updateState(ctx, se)
Expand Down
35 changes: 31 additions & 4 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
Expand All @@ -35,10 +37,8 @@ import (
"go.uber.org/zap"
)

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := func() session.Session {
func sessionFactory(t *testing.T, store kv.Storage) func() session.Session {
return func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)
Expand All @@ -50,6 +50,12 @@ func TestParallelLockNewJob(t *testing.T) {

return se
}
}

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := sessionFactory(t, store)

storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
variable.TTLJobRunInterval.Store(0)
Expand Down Expand Up @@ -96,3 +102,24 @@ func TestParallelLockNewJob(t *testing.T) {
successJob.Finish(se, time.Now())
}
}

func TestFinishJob(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}

tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)")

// finish with error
m := ttlworker.NewJobManager("test-id", nil, store)
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
require.NoError(t, err)
job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`))
job.Finish(se, time.Now())

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}