Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schematracker: treat TableInfo as immutable #39103

Merged
merged 3 commits into from Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 81 additions & 25 deletions ddl/schematracker/dm_tracker.go
Expand Up @@ -49,6 +49,9 @@ var _ ddl.DDL = SchemaTracker{}

// SchemaTracker is used to track schema changes by DM. It implements DDL interface and by applying DDL, it updates the
// table structure to keep tracked with upstream changes.
// It embeds an InfoStore which stores DBInfo and TableInfo. The DBInfo and TableInfo can be treated as immutable, so
// after reading them by SchemaByName or TableByName, later modifications made by SchemaTracker will not change them.
// SchemaTracker is not thread-safe.
type SchemaTracker struct {
*InfoStore
}
Expand Down Expand Up @@ -108,16 +111,22 @@ func (d SchemaTracker) CreateSchemaWithInfo(ctx sessionctx.Context, dbInfo *mode
}

// AlterSchema implements the DDL interface.
func (d SchemaTracker) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error {
func (d SchemaTracker) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
dbInfo := d.SchemaByName(stmt.Name)
if dbInfo == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(stmt.Name.O)
}

dbInfo = dbInfo.Clone()
defer func() {
if err == nil {
d.PutSchema(dbInfo)
}
}()

// Resolve target charset and collation from options.
var (
toCharset, toCollate string
err error
)

for _, val := range stmt.Options {
Expand Down Expand Up @@ -173,9 +182,15 @@ func (d SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStm
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
// suppress ErrTooLongKey
strictSQLModeBackup := ctx.GetSessionVars().StrictSQLMode
ctx.GetSessionVars().StrictSQLMode = false
// support drop PK
enableClusteredIndexBackup := ctx.GetSessionVars().EnableClusteredIndex
ctx.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOff
defer func() {
ctx.GetSessionVars().StrictSQLMode = strictSQLModeBackup
ctx.GetSessionVars().EnableClusteredIndex = enableClusteredIndexBackup
}()

var (
referTbl *model.TableInfo
Expand Down Expand Up @@ -354,6 +369,13 @@ func (d SchemaTracker) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndex
stmt.IndexPartSpecifications, stmt.IndexOption, stmt.IfNotExists)
}

func (d SchemaTracker) putTableIfNoError(err error, dbName model.CIStr, tbInfo *model.TableInfo) {
if err != nil {
return
}
_ = d.PutTable(dbName, tbInfo)
}

// createIndex is shared by CreateIndex and AlterTable.
func (d SchemaTracker) createIndex(
ctx sessionctx.Context,
Expand All @@ -363,12 +385,15 @@ func (d SchemaTracker) createIndex(
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
ifNotExists bool,
) error {
) (err error) {
unique := keyType == ast.IndexKeyTypeUnique
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)

// Deal with anonymous index.
Expand Down Expand Up @@ -432,12 +457,14 @@ func (d SchemaTracker) DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt
}

// dropIndex is shared by DropIndex and AlterTable.
func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
if ifExists {
Expand All @@ -464,16 +491,19 @@ func (d SchemaTracker) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName
}

// addColumn is used by AlterTable.
func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
schema := d.SchemaByName(ti.Schema)
if schema == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)
colName := specNewColumn.Name.Name.O

Expand Down Expand Up @@ -504,12 +534,14 @@ func (d SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast
}

// dropColumn is used by AlterTable.
func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

colName := spec.OldColumnName.Name
colInfo := tblInfo.FindPublicColumnByName(colName.L)
if colInfo == nil {
Expand Down Expand Up @@ -546,14 +578,17 @@ func (d *SchemaTracker) dropColumn(ctx sessionctx.Context, ti ast.Ident, spec *a
}

// renameColumn is used by AlterTable.
func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
oldColName := spec.OldColumnName.Name
newColName := spec.NewColumnName.Name

tblInfo, err := d.TableByName(ident.Schema, ident.Name)
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

tbl := tables.MockTableFromMeta(tblInfo)

oldCol := table.FindCol(tbl.VisibleCols(), oldColName.L)
Expand Down Expand Up @@ -595,12 +630,15 @@ func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spe
}

// alterColumn is used by AlterTable.
func (d SchemaTracker) alterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
func (d SchemaTracker) alterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

t := tables.MockTableFromMeta(tblInfo)

colName := specNewColumn.Name.Name
Expand Down Expand Up @@ -664,11 +702,14 @@ func (d SchemaTracker) handleModifyColumn(
ident ast.Ident,
originalColName model.CIStr,
spec *ast.AlterTableSpec,
) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

schema := d.SchemaByName(ident.Schema)
t := tables.MockTableFromMeta(tblInfo)
job, err := ddl.GetModifiableColumnJob(ctx, sctx, nil, ident, originalColName, schema, t, spec)
Expand Down Expand Up @@ -714,11 +755,14 @@ func (d SchemaTracker) handleModifyColumn(
}

// renameIndex is used by AlterTable.
func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

duplicate, err := ddl.ValidateRenameIndex(spec.FromKey, spec.ToKey, tblInfo)
if duplicate {
return nil
Expand All @@ -732,12 +776,14 @@ func (d SchemaTracker) renameIndex(ctx sessionctx.Context, ident ast.Ident, spec
}

// addTablePartitions is used by AlterTable.
func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
Expand All @@ -752,12 +798,14 @@ func (d SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Iden
}

// dropTablePartitions is used by AlterTable.
func (d SchemaTracker) dropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
tblInfo, err := d.TableByName(ident.Schema, ident.Name)
func (d SchemaTracker) dropTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ident.Schema, tblInfo)

pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
Expand Down Expand Up @@ -799,12 +847,14 @@ func (d SchemaTracker) createPrimaryKey(
indexName model.CIStr,
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
) error {
tblInfo, err := d.TableByName(ti.Schema, ti.Name)
) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(err)
}

defer d.putTableIfNoError(err, ti.Schema, tblInfo)

indexName = model.NewCIStr(mysql.PrimaryKeyName)
if indexInfo := tblInfo.FindIndexByName(indexName.L); indexInfo != nil ||
// If the table's PKIsHandle is true, it also means that this table has a primary key.
Expand Down Expand Up @@ -888,7 +938,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
case ast.AlterTableRenameIndex:
err = d.renameIndex(sctx, ident, spec)
case ast.AlterTableDropPartition:
err = d.dropTablePartition(sctx, ident, spec)
err = d.dropTablePartitions(sctx, ident, spec)
case ast.AlterTableAddConstraint:
constr := spec.Constraint
switch spec.Constraint.Tp {
Expand Down Expand Up @@ -925,7 +975,9 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
case ast.TableOptionAutoIdCache:
case ast.TableOptionAutoRandomBase:
case ast.TableOptionComment:
tblInfo = tblInfo.Clone()
tblInfo.Comment = opt.StrValue
_ = d.PutTable(ident.Schema, tblInfo)
case ast.TableOptionCharset, ast.TableOptionCollate:
// getCharsetAndCollateInTableOption will get the last charset and collate in the options,
// so it should be handled only once.
Expand All @@ -939,6 +991,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
needsOverwriteCols := ddl.NeedToOverwriteColCharset(spec.Options)

tblInfo = tblInfo.Clone()
if toCharset != "" {
tblInfo.Charset = toCharset
}
Expand All @@ -957,6 +1010,7 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
}
}
_ = d.PutTable(ident.Schema, tblInfo)

handledCharsetOrCollate = true
case ast.TableOptionPlacementPolicy:
Expand All @@ -970,11 +1024,13 @@ func (d SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context,
}
}
case ast.AlterTableIndexInvisible:
tblInfo = tblInfo.Clone()
idx := tblInfo.FindIndexByName(spec.IndexName.L)
if idx == nil {
return errors.Trace(infoschema.ErrKeyNotExists.GenWithStackByArgs(spec.IndexName.O, ident.Name))
}
idx.Invisible = spec.Visibility == ast.IndexVisibilityInvisible
_ = d.PutTable(ident.Schema, tblInfo)
case ast.AlterTablePartitionOptions,
ast.AlterTableDropForeignKey,
ast.AlterTableCoalescePartitions,
Expand Down