Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: submit ttl scan task to the system table #40422

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 44
result := 45
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
2 changes: 2 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,8 @@ func Args2Expressions4Test(args ...interface{}) []Expression {
ft = types.NewFieldType(mysql.TypeVarString)
case types.KindMysqlTime:
ft = types.NewFieldType(mysql.TypeTimestamp)
case types.KindBytes:
ft = types.NewFieldType(mysql.TypeBlob)
default:
exprs[i] = nil
continue
Expand Down
34 changes: 32 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ const (
PRIMARY KEY (Host,User,Password_timestamp )
) COMMENT='Password history for user accounts' `

// CreateTTLTableStatus is a table about TTL task schedule
// CreateTTLTableStatus is a table about TTL job schedule
CreateTTLTableStatus = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_table_status (
table_id bigint(64) PRIMARY KEY,
parent_table_id bigint(64),
Expand All @@ -498,6 +498,24 @@ const (
current_job_state text DEFAULT NULL,
current_job_status varchar(64) DEFAULT NULL,
current_job_status_update_time timestamp NULL DEFAULT NULL);`

// CreateTTLTask is a table about parallel ttl tasks
CreateTTLTask = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_task (
job_id varchar(64) NOT NULL,
table_id bigint(64) NOT NULL,
scan_id int NOT NULL,
scan_range_start BLOB,
scan_range_end BLOB,
expire_time timestamp NOT NULL,
owner_id varchar(64) DEFAULT NULL,
owner_addr varchar(64) DEFAULT NULL,
owner_hb_time timestamp DEFAULT NULL,
status varchar(64) DEFAULT 'waiting',
status_update_time timestamp NULL DEFAULT NULL,
state text,
created_time timestamp NOT NULL,
primary key(job_id, scan_id),
key(created_time));`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -739,11 +757,13 @@ const (
version109 = 109
// version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0.
version110 = 110
// version111 adds the table tidb_ttl_task
version111 = 111
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version109
var currentBootstrapVersion int64 = version111

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -861,6 +881,7 @@ var (
upgradeToVer108,
upgradeToVer109,
upgradeToVer110,
upgradeToVer111,
}
)

Expand Down Expand Up @@ -2213,6 +2234,13 @@ func upgradeToVer110(s Session, ver int64) {
mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableGCAwareMemoryTrack, 0)
}

func upgradeToVer111(s Session, ver int64) {
if ver >= version111 {
return
}
doReentrantDDL(s, CreateTTLTask)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -2319,6 +2347,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateStatsTableLocked)
// Create tidb_ttl_table_status table
mustExecute(s, CreateTTLTableStatus)
// Create tidb_ttl_task table
mustExecute(s, CreateTTLTask)
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
8 changes: 4 additions & 4 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,16 +1018,16 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio
lock.Op = uint8(kvrpcpb.Op_Put)
}
if rowcodec.IsRowKey(m.Key) && lock.Op == uint8(kvrpcpb.Op_Put) {
if rowcodec.IsNewFormat(m.Value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When rowcodec.IsNewFormat(m.Value) , the previous code are deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it sets reqCtx.buf = m.Value and later lock.Value = reqCtx.buf. The main purpose is to set lock.Value = m.Value. In https://github.com/pingcap/tidb/pull/40422/files/f2fe4bde5508cad89d582145709fa286a24bec18#diff-c8058e505bb9be555aca4f1a51fbf20e712ea4d6d89812a09f3ebf02ff99e2f2R896, the lock.Value has been initialized with m.Value, so there is no need to assign again.

reqCtx.buf = m.Value
} else {
if !rowcodec.IsNewFormat(m.Value) {
reqCtx.buf, err = encodeFromOldRow(m.Value, reqCtx.buf)
if err != nil {
log.Error("encode data failed", zap.Binary("value", m.Value), zap.Binary("key", m.Key), zap.Stringer("op", m.Op), zap.Error(err))
return nil, err
}

lock.Value = make([]byte, len(reqCtx.buf))
copy(lock.Value, reqCtx.buf)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally find the bug! Reuse the reqCtx.buf could make the lock.Value be overwritten by another row.

It seems that this bug requires 1PC, and set multiple rows in one transaction (if these rows are in different tables, it will panic), so this problem only occurs in the TTL test, in which we set the session variables to enable 1PC explicitly.

It takes me a lot of time to reach here 😢 .

}
lock.Value = reqCtx.buf
}

lock.ForUpdateTS = req.ForUpdateTs
Expand Down
3 changes: 3 additions & 0 deletions ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"base.go",
"infoschema.go",
"table.go",
"task.go",
"ttlstatus.go",
],
importpath = "github.com/pingcap/tidb/ttl/cache",
Expand Down Expand Up @@ -40,6 +41,7 @@ go_test(
"main_test.go",
"split_test.go",
"table_test.go",
"task_test.go",
"ttlstatus_test.go",
],
embed = [":cache"],
Expand All @@ -49,6 +51,7 @@ go_test(
"//kv",
"//parser/model",
"//server",
"//session",
"//store/helper",
"//tablecodec",
"//testkit",
Expand Down
183 changes: 183 additions & 0 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"encoding/json"
"time"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
)

const selectFromTTLTask = `SELECT LOW_PRIORITY
job_id,
table_id,
scan_id,
scan_range_start,
scan_range_end,
expire_time,
owner_id,
owner_addr,
owner_hb_time,
status,
status_update_time,
state,
created_time FROM mysql.tidb_ttl_task`
const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET
job_id = %?,
table_id = %?,
scan_id = %?,
scan_range_start = %?,
scan_range_end = %?,
expire_time = %?,
created_time = %?`

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) {
rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...)
if err != nil {
return "", nil, err
}
rangeEnd, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeEnd...)
if err != nil {
return "", nil, err
}
return insertIntoTTLTask, []interface{}{jobID, tableID, int64(scanID), rangeStart, rangeEnd, expireTime, createdTime}, nil
}

// TaskStatus represents the current status of a task
type TaskStatus string

const (
// TaskStatusWaiting means the task hasn't started
TaskStatusWaiting TaskStatus = "waiting"
// TaskStatusRunning means this task is running
TaskStatusRunning = "running"
// TaskStatusFinished means this task has finished
TaskStatusFinished = "finished"
)

// TTLTask is a row recorded in mysql.tidb_ttl_task
type TTLTask struct {
JobID string
TableID int64
ScanID int64
ScanRangeStart []types.Datum
ScanRangeEnd []types.Datum
ExpireTime time.Time
OwnerID string
OwnerAddr string
OwnerHBTime time.Time
Status TaskStatus
StatusUpdateTime time.Time
State *TTLTaskState
CreatedTime time.Time
}

// TTLTaskState records the internal states of the ttl task
type TTLTaskState struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

ScanTaskErr string `json:"scan_task_err"`
}

// RowToTTLTask converts a row into TTL task
func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
var err error
timeZone := sctx.GetSessionVars().Location()

task := &TTLTask{
JobID: row.GetString(0),
TableID: row.GetInt64(1),
ScanID: row.GetInt64(2),
}
if !row.IsNull(3) {
scanRangeStartBuf := row.GetBytes(3)
// it's still posibble to be nil even this column is not NULL
if scanRangeStartBuf != nil {
task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf))
if err != nil {
return nil, err
}
}
}
if !row.IsNull(4) {
scanRangeEndBuf := row.GetBytes(4)
// it's still posibble to be nil even this column is not NULL
if scanRangeEndBuf != nil {
task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf))
if err != nil {
return nil, err
}
}
}

task.ExpireTime, err = row.GetTime(5).GoTime(timeZone)
if err != nil {
return nil, err
}

if !row.IsNull(6) {
task.OwnerID = row.GetString(6)
}
if !row.IsNull(7) {
task.OwnerAddr = row.GetString(7)
}
if !row.IsNull(8) {
task.OwnerHBTime, err = row.GetTime(8).GoTime(timeZone)
if err != nil {
return nil, err
}
}
if !row.IsNull(9) {
status := row.GetString(9)
if len(status) == 0 {
status = "waiting"
}
task.Status = TaskStatus(status)
}
if !row.IsNull(10) {
task.StatusUpdateTime, err = row.GetTime(10).GoTime(timeZone)
if err != nil {
return nil, err
}
}
if !row.IsNull(11) {
stateStr := row.GetString(11)
state := &TTLTaskState{}
err = json.Unmarshal([]byte(stateStr), state)
if err != nil {
return nil, err
}
task.State = state
}

task.CreatedTime, err = row.GetTime(12).GoTime(timeZone)
if err != nil {
return nil, err
}

return task, nil
}