Skip to content

Commit

Permalink
ddl: add owner_id field to tidb_mdl_info to avoid unexpected writes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed May 15, 2024
1 parent 59c7da1 commit 4538a21
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,5 +425,5 @@ func TestGetExistedUserDBs(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(197), session.CurrentBootstrapVersion)
require.Equal(t, int64(198), session.CurrentBootstrapVersion)
}
7 changes: 4 additions & 3 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,18 +605,19 @@ func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
if len(rows) == 0 {
return errors.Errorf("can't find ddl job %d", job.ID)
}
ownerID := w.ownerManager.ID()
ids := rows[0].GetString(0)
sql := fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids) values (%d, %d, '%s')", job.ID, ver, ids)
sql := fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID)
_, err = w.sess.Execute(context.Background(), sql, "register-mdl-info")
return err
}

// cleanMDLInfo cleans metadata lock info.
func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client, cleanETCD bool) {
func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client, ownerID string, cleanETCD bool) {
if !variable.EnableMDL.Load() {
return
}
sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d", jobID)
sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", jobID, ownerID)
sctx, _ := pool.Get()
defer pool.Put(sctx)
se := sess.NewSession(sctx)
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
asyncNotify(d.ddlJobCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
ownerID := d.ownerManager.ID()
// check if this ddl job is synced to all servers.
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
Expand All @@ -442,7 +443,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
return
}
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
// Don't have a worker now.
return
}
Expand Down Expand Up @@ -480,7 +481,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
d.synced(job)

if RunInGoTest {
Expand Down
15 changes: 14 additions & 1 deletion pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,11 +1093,15 @@ const (
// version 197
// replace `mysql.tidb_mdl_view` table
version197 = 197

// version 198
// add column `owner_id` for `mysql.tidb_mdl_info` table
version198 = 198
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version197
var currentBootstrapVersion int64 = version198

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1260,6 +1264,7 @@ var (
upgradeToVer195,
upgradeToVer196,
upgradeToVer197,
upgradeToVer198,
}
)

Expand Down Expand Up @@ -3103,6 +3108,14 @@ func upgradeToVer197(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, CreateMDLView)
}

func upgradeToVer198(s sessiontypes.Session, ver int64) {
if ver >= version198 {
return
}

doReentrantDDL(s, "ALTER TABLE mysql.tidb_mdl_info ADD COLUMN owner_id VARCHAR(64) NOT NULL DEFAULT '';", infoschema.ErrColumnExists)
}

func writeOOMAction(s sessiontypes.Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down
3 changes: 2 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3150,7 +3150,8 @@ var (
mdlTable = `create table mysql.tidb_mdl_info(
job_id BIGINT NOT NULL PRIMARY KEY,
version BIGINT NOT NULL,
table_ids text(65535)
table_ids text(65535),
owner_id varchar(64) NOT NULL DEFAULT ''
);`
)

Expand Down

0 comments on commit 4538a21

Please sign in to comment.