Skip to content

Commit

Permalink
schematracker: treat TableInfo as immutable (#39103)
Browse files Browse the repository at this point in the history
ref #35933
  • Loading branch information
lance6716 committed Nov 14, 2022
1 parent d734cc8 commit bdcb850
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 51 deletions.
106 changes: 81 additions & 25 deletions ddl/schematracker/dm_tracker.go
Expand Up @@ -49,6 +49,9 @@ var _ ddl.DDL = SchemaTracker{}

// SchemaTracker is used to track schema changes by DM. It implements DDL interface and by applying DDL, it updates the
// table structure to keep tracked with upstream changes.
// It embeds an InfoStore which stores DBInfo and TableInfo. The DBInfo and TableInfo can be treated as immutable, so
// after reading them by SchemaByName or TableByName, later modifications made by SchemaTracker will not change them.
// SchemaTracker is not thread-safe.
type SchemaTracker struct {
*InfoStore
}
Expand Down Expand Up @@ -108,16 +111,22 @@ func (d SchemaTracker) CreateSchemaWithInfo(ctx sessionctx.Context, dbInfo *mode
}

// AlterSchema implements the DDL interface.
func (d SchemaTracker) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error {
func (d SchemaTracker) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
dbInfo := d.SchemaByName(stmt.Name)
if dbInfo == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(stmt.Name.O)
}

dbInfo = dbInfo.Clone()
defer func() {
if err == nil {
d.PutSchema(dbInfo)
}
}()

// Resolve target charset and collation from options.
var (
toCharset, toCollate string
err error
)

for _, val := range stmt.Options {
Expand Down Expand Up @@ -173,9 +182,15 @@ func (d SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStm
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
// suppress ErrTooLongKey
strictSQLModeBackup := ctx.GetSessionVars().StrictSQLMode
ctx.GetSessionVars().StrictSQLMode = false
// support drop PK
enableClusteredIndexBackup := ctx.GetSessionVars().EnableClusteredIndex
ctx.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOff
defer func() {
ctx.GetSessionVars().StrictSQLMode = strictSQLModeBackup
ctx.GetSessionVars().EnableClusteredIndex = enableClusteredIndexBackup
}()

var (
referTbl *model.TableInfo
Expand Down Expand Up @@ -354,6 +369,13 @@ func (d SchemaTracker) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndex
stmt.IndexPartSpecifications, stmt.IndexOption, stmt.IfNotExists)
}

func (d SchemaTracker) putTableIfNoError(err error, dbName model.CIStr, tbInfo *model.TableInfo) {
if err != nil {
return
}
_ = d.PutTable(dbName, tbInfo)
}

// createIndex is shared by CreateIndex and AlterTable.
func (d SchemaTracker) createIndex(
ctx sessionctx.Context,
Expand All @@ -363,12 +385,15 @@ func (d SchemaTracker) createIndex(
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
ifNotExists bool,
) error {
) (err error) {
unique := keyType == ast.IndexKeyTypeUnique
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)

// Deal with anonymous index.
Expand Down Expand Up @@ -432,12 +457,14 @@ func (d SchemaTracker) DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt
}

// dropIndex is shared by DropIndex and AlterTable.
func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
if ifExists {
Expand All @@ -464,16 +491,19 @@ func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName
}

// addColumn is used by AlterTable.
func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
schema := d.SchemaByName(ti.Schema)
if schema == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)
colName := specNewColumn.Name.Name.O

Expand Down Expand Up @@ -504,12 +534,14 @@ func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast
}

// dropColumn is used by AlterTable.
func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

colName := spec.OldColumnName.Name
colInfo := tblInfo.FindPublicColumnByName(colName.L)
if colInfo == nil {
Expand Down Expand Up @@ -546,14 +578,17 @@ func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *a
}

// renameColumn is used by AlterTable.
func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
oldColName := spec.OldColumnName.Name
newColName := spec.NewColumnName.Name

tblInfo, err := d.TableByName(ident.Schema, ident.Name)
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

tbl := tables.MockTableFromMeta(tblInfo)

oldCol := table.FindCol(tbl.VisibleCols(), oldColName.L)
Expand Down Expand Up @@ -595,12 +630,15 @@ func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spe
}

// alterColumn is used by AlterTable.
func (d SchemaTracker) alterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) alterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)

colName := specNewColumn.Name.Name
Expand Down Expand Up @@ -664,11 +702,14 @@ func (d SchemaTracker) handleModifyColumn(
ident ast.Ident,
originalColName model.CIStr,
spec *ast.AlterTableSpec,
) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

schema := d.SchemaByName(ident.Schema)
t := tables.MockTableFromMeta(tblInfo)
job, err := ddl.GetModifiableColumnJob(ctx, sctx, nil, ident, originalColName, schema, t, spec)
Expand Down Expand Up @@ -714,11 +755,14 @@ func (d SchemaTracker) handleModifyColumn(
}

// renameIndex is used by AlterTable.
func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

duplicate, err := ddl.ValidateRenameIndex(spec.FromKey, spec.ToKey, tblInfo)
if duplicate {
return nil
Expand All @@ -732,12 +776,14 @@ func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec
}

// addTablePartitions is used by AlterTable.
func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
Expand All @@ -752,12 +798,14 @@ func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Iden
}

// dropTablePartitions is used by AlterTable.
func (d SchemaTracker) dropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) dropTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
Expand Down Expand Up @@ -799,12 +847,14 @@ func (d SchemaTracker) createPrimaryKey(
indexName model.CIStr,
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

indexName = model.NewCIStr(mysql.PrimaryKeyName)
if indexInfo := tblInfo.FindIndexByName(indexName.L); indexInfo != nil ||
// If the table's PKIsHandle is true, it also means that this table has a primary key.
Expand Down Expand Up @@ -888,7 +938,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
case ast.AlterTableRenameIndex:
err = d.renameIndex(sctx, ident, spec)
case ast.AlterTableDropPartition:
err = d.dropTablePartition(sctx, ident, spec)
err = d.dropTablePartitions(sctx, ident, spec)
case ast.AlterTableAddConstraint:
constr := spec.Constraint
switch spec.Constraint.Tp {
Expand Down Expand Up @@ -925,7 +975,9 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
case ast.TableOptionAutoIdCache:
case ast.TableOptionAutoRandomBase:
case ast.TableOptionComment:
tblInfo = tblInfo.Clone()
tblInfo.Comment = opt.StrValue
_ = d.PutTable(ident.Schema, tblInfo)
case ast.TableOptionCharset, ast.TableOptionCollate:
// getCharsetAndCollateInTableOption will get the last charset and collate in the options,
// so it should be handled only once.
Expand All @@ -939,6 +991,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
needsOverwriteCols := ddl.NeedToOverwriteColCharset(spec.Options)

tblInfo = tblInfo.Clone()
if toCharset != "" {
tblInfo.Charset = toCharset
}
Expand All @@ -957,6 +1010,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
}
}
_ = d.PutTable(ident.Schema, tblInfo)

handledCharsetOrCollate = true
case ast.TableOptionPlacementPolicy:
Expand All @@ -970,11 +1024,13 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
}
case ast.AlterTableIndexInvisible:
tblInfo = tblInfo.Clone()
idx := tblInfo.FindIndexByName(spec.IndexName.L)
if idx == nil {
return errors.Trace(infoschema.ErrKeyNotExists.GenWithStackByArgs(spec.IndexName.O, ident.Name))
}
idx.Invisible = spec.Visibility == ast.IndexVisibilityInvisible
_ = d.PutTable(ident.Schema, tblInfo)
case ast.AlterTablePartitionOptions,
ast.AlterTableDropForeignKey,
ast.AlterTableCoalescePartitions,
Expand Down

0 comments on commit bdcb850

Please sign in to comment.