Skip to content

Commit

Permalink
partition: make ExchangePartition follow check constraints during wri…
Browse files Browse the repository at this point in the history
…teOnly state(Part2) (#46030)

ref #45922
  • Loading branch information
jiyfhust committed Sep 20, 2023
1 parent e324e08 commit 4edb78e
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 67 deletions.
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Expand Up @@ -1420,6 +1420,10 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
// Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it.
if len(multiInfos) > 0 {
diff.AffectedOpts[0].OldSchemaID = ptSchemaID
}
} else {
// Swap
diff.TableID = ptDefID
Expand Down
18 changes: 15 additions & 3 deletions ddl/partition.go
Expand Up @@ -2458,16 +2458,27 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
}
var ptInfo []schemaIDAndTableInfo
if len(nt.Constraints) > 0 {
pt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionTableID: nt.ID,
ExchangePartitionDefID: defID,
}
ptInfo = append(ptInfo, schemaIDAndTableInfo{
schemaID: ptSchemaID,
tblInfo: pt,
})
}
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
ExchangePartitionTableID: ptID,
ExchangePartitionDefID: defID,
}
// We need an interim schema version,
// so there are no non-matching rows inserted
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, ptInfo...)
}
// From now on, nt (the non-partitioned table) has
// ExchangePartitionInfo set, meaning it is restricted
Expand Down Expand Up @@ -2527,6 +2538,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
}

// exchange table meta id
pt.ExchangePartitionInfo = nil
partDef.ID, nt.ID = nt.ID, partDef.ID

err = t.UpdateTable(ptSchemaID, pt)
Expand Down
28 changes: 26 additions & 2 deletions ddl/rollingback.go
Expand Up @@ -264,11 +264,35 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool {

// rollbackExchangeTablePartition will clear the non-partitioned
// table's ExchangePartitionInfo state.
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) {
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (ver int64, err error) {
tblInfo.ExchangePartitionInfo = nil
job.State = model.JobStateRollbackDone
job.SchemaState = model.StatePublic
return updateVersionAndTableInfo(d, t, job, tblInfo, true)
if len(tblInfo.Constraints) == 0 {
return updateVersionAndTableInfo(d, t, job, tblInfo, true)
}
var (
defID int64
ptSchemaID int64
ptID int64
partName string
withValidation bool
)
if err = job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
return ver, errors.Trace(err)
}
pt, err := getTableInfo(t, ptID, ptSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
pt.ExchangePartitionInfo = nil
var ptInfo []schemaIDAndTableInfo
ptInfo = append(ptInfo, schemaIDAndTableInfo{
schemaID: ptSchemaID,
tblInfo: pt,
})
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true, ptInfo...)
return ver, errors.Trace(err)
}

func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
Expand Down
24 changes: 5 additions & 19 deletions executor/insert_common.go
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -690,28 +689,15 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
}
}
}
tbl := e.Table.Meta()

// Handle exchange partition
if tbl.ExchangePartitionInfo != nil {
is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return nil, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
e.Ctx(),
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
tbl := e.Table.Meta()
if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil {
if err := checkRowForExchangePartition(e.Ctx(), row, tbl); err != nil {
return nil, err
}
}

sc := e.Ctx().GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for i, gCol := range gCols {
Expand Down
60 changes: 43 additions & 17 deletions executor/write.go
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -79,23 +81,8 @@ func updateRecord(

// Handle exchange partition
tbl := t.Meta()
if tbl.ExchangePartitionInfo != nil {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return false, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return false, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
newData,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil {
if err := checkRowForExchangePartition(sctx, newData, tbl); err != nil {
return false, err
}
}
Expand Down Expand Up @@ -326,3 +313,42 @@ func resetErrDataTooLong(colName string, rowIdx int, _ error) error {
newErr := types.ErrDataTooLong.GenWithStack("Data too long for column '%v' at row %v", colName, rowIdx)
return newErr
}

// checkRowForExchangePartition is only used for ExchangePartition by non-partitionTable during write only state.
// It check if rowData inserted or updated violate partition definition or checkConstraints of partitionTable.
func checkRowForExchangePartition(sctx sessionctx.Context, row []types.Datum, tbl *model.TableInfo) error {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionTableID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
tbl.ID,
)
if err != nil {
return err
}
if variable.EnableCheckConstraint.Load() {
type CheckConstraintTable interface {
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}
cc, ok := pt.(CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
return nil
}
26 changes: 24 additions & 2 deletions infoschema/builder.go
Expand Up @@ -315,9 +315,31 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff)
}

func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// The partitioned table is not affected until the last stage
// It is not in StatePublic.
if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID {
return b.applyTableUpdate(m, diff)
ntIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
return nil, errors.Trace(err)
}
if diff.AffectedOpts == nil || diff.AffectedOpts[0].OldSchemaID == 0 {
return ntIDs, err
}
// Reload parition tabe.
ptSchemaID := diff.AffectedOpts[0].OldSchemaID
ptID := diff.AffectedOpts[0].TableID
ptDiff := &model.SchemaDiff{
Type: diff.Type,
Version: diff.Version,
TableID: ptID,
SchemaID: ptSchemaID,
OldTableID: ptID,
OldSchemaID: ptSchemaID,
}
ptIDs, err := b.applyTableUpdate(m, ptDiff)
if err != nil {
return nil, errors.Trace(err)
}
return append(ptIDs, ntIDs...), nil
}
ntSchemaID := diff.OldSchemaID
ntID := diff.OldTableID
Expand Down
5 changes: 3 additions & 2 deletions parser/model/model.go
Expand Up @@ -1168,8 +1168,9 @@ func (p PartitionType) String() string {

// ExchangePartitionInfo provides exchange partition info.
type ExchangePartitionInfo struct {
ExchangePartitionID int64 `json:"exchange_partition_id"`
ExchangePartitionDefID int64 `json:"exchange_partition_def_id"`
// It is nt tableID when table which has the info is a partition table, else pt tableID.
ExchangePartitionTableID int64 `json:"exchange_partition_id"`
ExchangePartitionDefID int64 `json:"exchange_partition_def_id"`
// Deprecated, not used
XXXExchangePartitionFlag bool `json:"exchange_partition_flag"`
}
Expand Down
2 changes: 1 addition & 1 deletion table/table.go
Expand Up @@ -253,7 +253,7 @@ type PartitionedTable interface {
GetAllPartitionIDs() []int64
GetPartitionColumnIDs() []int64
GetPartitionColumnNames() []model.CIStr
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error
}

// TableFromMeta builds a table.Table from *model.TableInfo.
Expand Down
54 changes: 52 additions & 2 deletions table/tables/partition.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1268,12 +1269,12 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key {
return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle))
}

func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error {
func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error {
defID, err := t.locatePartition(ctx, r)
if err != nil {
return err
}
if defID != pid {
if defID != partID && defID != ntID {
return errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet)
}
return nil
Expand Down Expand Up @@ -1551,6 +1552,39 @@ func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context,
return t.partitions[pid], nil
}

// checkConstraintForExchangePartition is only used for ExchangePartition by partitionTable during write only state.
// It check if rowData inserted or updated violate checkConstraints of non-partitionTable.
func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Datum, partID, ntID int64) error {
type InfoSchema interface {
TableByID(id int64) (val table.Table, ok bool)
}
is, ok := sctx.GetDomainInfoSchema().(InfoSchema)
if !ok {
return errors.Errorf("exchange partition process assert inforSchema failed")
}
nt, tableFound := is.TableByID(ntID)
if !tableFound {
// Now partID is nt tableID.
nt, tableFound = is.TableByID(partID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
}
type CheckConstraintTable interface {
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}
cc, ok := nt.(CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
return nil
}

// AddRecord implements the AddRecord method for the table.Table interface.
func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
return partitionedTableAddRecord(ctx, t, r, nil, opts)
Expand All @@ -1570,6 +1604,14 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r []
if t.Meta().Partition.HasTruncatingPartitionID(pid) {
return nil, errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public"))
}
exchangePartitionInfo := t.Meta().ExchangePartitionInfo
if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == pid &&
variable.EnableCheckConstraint.Load() {
err = checkConstraintForExchangePartition(ctx, r, pid, exchangePartitionInfo.ExchangePartitionTableID)
if err != nil {
return nil, errors.WithStack(err)
}
}
tbl := t.GetPartition(pid)
recordID, err = tbl.AddRecord(ctx, r, opts...)
if err != nil {
Expand Down Expand Up @@ -1695,6 +1737,14 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context,
if t.Meta().Partition.HasTruncatingPartitionID(to) {
return errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public"))
}
exchangePartitionInfo := t.Meta().ExchangePartitionInfo
if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == to &&
variable.EnableCheckConstraint.Load() {
err = checkConstraintForExchangePartition(ctx, newData, to, exchangePartitionInfo.ExchangePartitionTableID)
if err != nil {
return errors.WithStack(err)
}
}

// The old and new data locate in different partitions.
// Remove record from old partition and add record to new partition.
Expand Down

0 comments on commit 4edb78e

Please sign in to comment.