Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema, sink(ticdc): fix exchange partition #8955

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 26 additions & 15 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
case timodel.ActionRenameTables:
// DDL on multiple tables, ignore pre table info
return nil, nil
case timodel.ActionExchangeTablePartition:
// get the table will be exchanged
table, _, err := s.inner.getSourceTable(job.BinlogInfo.TableInfo)
return table, err
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -885,26 +889,21 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
return nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
func (s *snapshot) getSourceTable(targetTable *timodel.TableInfo) (*model.TableInfo, int64, error) {
oldTable, ok := s.physicalTableByID(targetTable.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
}

oldPartitions := oldTable.GetPartitionInfo()
if oldPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", oldTable.ID)
}

newPartitions := targetTable.GetPartitionInfo()
if newPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", targetTable.ID)
}

Expand All @@ -926,14 +925,13 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}
sourceTable, ok = s.physicalTableByID(diff[0])
sourceTable, ok := s.physicalTableByID(diff[0])
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
}

// 3.find the exchanged partition info
diff = diff[:0]
for id := range oldIDs {
Expand All @@ -942,13 +940,26 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}

exchangedPartitionID := diff[0]
return sourceTable, exchangedPartitionID, nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
sourceTable, exchangedPartitionID, err := s.getSourceTable(targetTable.TableInfo)
if err != nil {
return errors.Trace(err)
}
// 4.update the targetTable
err := s.updatePartition(targetTable, currentTS)
err = s.updatePartition(targetTable, currentTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
47 changes: 30 additions & 17 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -62,29 +63,41 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL) (*DDLSink, error) {

// WriteDDLEvent writes the ddl event to the cloud storage.
func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
writeFile := func(def cloudstorage.TableDefinition) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
return d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

return nil
})
}

var def cloudstorage.TableDefinition
def.FromDDLEvent(ddl)
encodedDef, err := def.MarshalWithQuery()
if err != nil {
if err := writeFile(def); err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
if ddl.Type == timodel.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version)
return writeFile(sourceTableDef)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

return nil
})

return errors.Trace(err)
return nil
}

// WriteCheckpointTs writes the checkpoint ts to the cloud storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */
-- create table t2 (a int primary key);
-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (-1),(6),(13);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
-- update t1 set a=a+10 where a=9;

/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */
-- create table t2 (a int primary key);
-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (-1),(6),(13);
Expand Down