From 908a56dd5433a004314f2b65baf5b5412de96916 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Mon, 15 May 2023 18:33:18 +0800 Subject: [PATCH] This is an automated cherry-pick of #8955 Signed-off-by: ti-chi-bot --- cdc/entry/schema/snapshot.go | 41 ++++++++++++------- .../cloudstorage/cloud_storage_ddl_sink.go | 41 +++++++++++++++++-- .../data/prepare.sql | 4 ++ .../data/prepare.sql | 4 ++ 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index eac2fc1e1a6..da8af449b48 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -59,6 +59,10 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) { case timodel.ActionRenameTables: // DDL on multiple tables, ignore pre table info return nil, nil + case timodel.ActionExchangeTablePartition: + // get the table will be exchanged + table, _, err := s.inner.getSourceTable(job.BinlogInfo.TableInfo) + return table, err default: binlogInfo := job.BinlogInfo if binlogInfo == nil { @@ -907,26 +911,21 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er return nil } -// exchangePartition find the partition's id in the old table info of targetTable, -// and find the sourceTable's id in the new table info of targetTable. -// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot. -// Finally, update both the targetTable's info and the sourceTable's info in snapshot. -func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error { - var sourceTable *model.TableInfo +func (s *snapshot) getSourceTable(targetTable *timodel.TableInfo) (*model.TableInfo, int64, error) { oldTable, ok := s.physicalTableByID(targetTable.ID) if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID) + return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID) } oldPartitions := oldTable.GetPartitionInfo() if oldPartitions == nil { - return cerror.ErrSnapshotTableNotFound. + return nil, 0, cerror.ErrSnapshotTableNotFound. GenWithStack("table %d is not a partitioned table", oldTable.ID) } newPartitions := targetTable.GetPartitionInfo() if newPartitions == nil { - return cerror.ErrSnapshotTableNotFound. + return nil, 0, cerror.ErrSnapshotTableNotFound. GenWithStack("table %d is not a partitioned table", targetTable.ID) } @@ -948,14 +947,13 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin } } if len(diff) != 1 { - return cerror.ErrExchangePartition. + return nil, 0, cerror.ErrExchangePartition. GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff)) } - sourceTable, ok = s.physicalTableByID(diff[0]) + sourceTable, ok := s.physicalTableByID(diff[0]) if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0]) + return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0]) } - // 3.find the exchanged partition info diff = diff[:0] for id := range oldIDs { @@ -964,13 +962,26 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin } } if len(diff) != 1 { - return cerror.ErrExchangePartition. + return nil, 0, cerror.ErrExchangePartition. GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff)) } exchangedPartitionID := diff[0] + return sourceTable, exchangedPartitionID, nil +} + +// exchangePartition find the partition's id in the old table info of targetTable, +// and find the sourceTable's id in the new table info of targetTable. +// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot. +// Finally, update both the targetTable's info and the sourceTable's info in snapshot. +func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error { + var sourceTable *model.TableInfo + sourceTable, exchangedPartitionID, err := s.getSourceTable(targetTable.TableInfo) + if err != nil { + return errors.Trace(err) + } // 4.update the targetTable - err := s.updatePartition(targetTable, currentTS) + err = s.updatePartition(targetTable, currentTS) if err != nil { return errors.Trace(err) } diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 57a070f82a1..20afd4a3cb1 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink" @@ -58,6 +59,7 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er return d, nil } +<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go func generateSchemaPath(def cloudstorage.TableDefinition) string { return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) } @@ -80,12 +82,45 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error err1 := d.storage.WriteFile(ctx, path, encodedDef) if err1 != nil { return err1 +======= +// WriteDDLEvent writes the ddl event to the cloud storage. +func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + writeFile := func(def cloudstorage.TableDefinition) error { + encodedDef, err := def.MarshalWithQuery() + if err != nil { + return errors.Trace(err) +>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go } - return nil - }) + path, err := def.GenerateSchemaFilePath() + if err != nil { + return errors.Trace(err) + } + log.Debug("write ddl event to external storage", + zap.String("path", path), zap.Any("ddl", ddl)) + return d.statistics.RecordDDLExecution(func() error { + err1 := d.storage.WriteFile(ctx, path, encodedDef) + if err1 != nil { + return err1 + } + + return nil + }) + } - return errors.Trace(err) + var def cloudstorage.TableDefinition + def.FromDDLEvent(ddl) + if err := writeFile(def); err != nil { + return errors.Trace(err) + } + + if ddl.Type == timodel.ActionExchangeTablePartition { + // For exchange partition, we need to write the schema of the source table. + var sourceTableDef cloudstorage.TableDefinition + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version) + return writeFile(sourceTableDef) + } + return nil } func (d *ddlSink) WriteCheckpointTs(ctx context.Context, diff --git a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql index 54640dfbdc5..94e6fc6bf9a 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql @@ -21,6 +21,10 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); update t1 set a=a+10 where a=9; +<<<<<<< HEAD +======= +/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */ +>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955)) create table t2 (a int primary key); ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ diff --git a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql index 1e0326f65f1..1217130adb9 100644 --- a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql @@ -21,6 +21,10 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); -- update t1 set a=a+10 where a=9; +<<<<<<< HEAD +======= +/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */ +>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955)) create table t2 (a int primary key); ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/