From 5f5e31211dc041f16283094ce68421aeede58de4 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 4 Aug 2022 17:59:00 +0800 Subject: [PATCH 01/11] add tiflash test --- ddl/db_partition_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 5a3bdb7588a8b..851fbcfbe4484 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2103,6 +2103,18 @@ func TestAlterTableExchangePartition(t *testing.T) { require.NoError(t, err) tk.MustExec("alter table e15 exchange partition p0 with table e16") + + tk.MustExec("create table e17 (a int)") + tk.MustExec("alter table e17 set tiflash replica 1") + tk.MustExec("insert into e17 values (1)") + + tk.MustExec("create table e18 (a int) partition by range (a) (partition p0 values less than (4), partition p1 values less than (10))") + tk.MustExec("alter table e18 set tiflash replica 1") + tk.MustExec("insert into pt values (2)") + + tk.MustExec("alter table e18 exchange partition p0 with table e17") + tk.MustQuery("select * /*+ read_from_storage(tiflash[pt]) */ from e18").Check(testkit.Rows("1")) + tk.MustQuery("select * /*+ read_from_storage(tiflash[pt]) */ from e17").Check(testkit.Rows("2")) } func TestExchangePartitionTableCompatiable(t *testing.T) { From 561a1f03145a7f838642f76e8417e7e8d2fd8c1e Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 12:13:54 +0800 Subject: [PATCH 02/11] fix AutoID --- ddl/db_partition_test.go | 37 ++++++++++++++++++ ddl/partition.go | 8 ++++ domain/domain.go | 6 +-- infoschema/builder.go | 82 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 123 insertions(+), 10 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 851fbcfbe4484..0a2f429e1c59f 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2389,6 +2389,43 @@ func TestExchangePartitionHook(t *testing.T) { tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1")) } +func TestExchangePartitionAutoID(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("set @@tidb_enable_exchange_partition=1") + defer tk.MustExec("set @@tidb_enable_exchange_partition=0") + + tk.MustExec("use test") + tk.MustExec(`create table pt (a int primary key auto_increment) partition by range(a) ( + partition p0 values less than (3), + partition p1 values less than (6), + PARTITION p2 values less than (9), + PARTITION p3 values less than (50000000) + );`) + tk.MustExec(`create table nt(a int primary key auto_increment);`) + + tk.MustExec(`insert into pt values (0), (4)`) + tk.MustExec("insert into nt values (1)") + + hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `return(true)`)) + defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + + hookFunc := func(job *model.Job) { + if job.Type == model.ActionExchangeTablePartition && job.State == model.JobStateRunning { + tk.MustExec(`insert into pt values (40000000)`) + } + } + hook.OnJobUpdatedExported = hookFunc + + tk.MustExec("alter table pt exchange partition p0 with table nt") + tk.MustExec("insert into nt values (NULL)") + tk.MustQuery("select * from nt").Check(testkit.Rows("1", "42000001")) + tk.MustQuery("select * from pt").Sort().Check(testkit.Rows("1", "4", "40000000")) +} + func TestExchangePartitionExpressIndex(t *testing.T) { restore := config.RestoreFunc() defer restore() diff --git a/ddl/partition.go b/ddl/partition.go index 1fcf4ca4f3db0..f89bb426ca4ca 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1483,6 +1483,14 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + failpoint.Inject("exchangePartitionAutoID", func(val failpoint.Value) { + if val.(bool) { + d.mu.RLock() + d.mu.hook.OnJobUpdated(job) + d.mu.RUnlock() + } + }) + err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) if err != nil { job.State = model.JobStateCancelled diff --git a/domain/domain.go b/domain/domain.go index a8f35f8abdd05..11a8c11702283 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -148,7 +148,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // 3. There are less 100 diffs. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { - is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) + is, relatedChanges, err := do.tryLoadSchemaDiffs(do.store, m, currentSchemaVersion, neededSchemaVersion) if err == nil { do.infoCache.Insert(is, startTS) logutil.BgLogger().Info("diff load InfoSchema success", @@ -281,7 +281,7 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { +func (do *Domain) tryLoadSchemaDiffs(store kv.Storage, m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ @@ -301,7 +301,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { - IDs, err := builder.ApplyDiff(m, diff) + IDs, err := builder.ApplyDiff(store, m, diff) if err != nil { return nil, nil, err } diff --git a/infoschema/builder.go b/infoschema/builder.go index 4e7b767aecdb0..31e886e74d1a9 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -15,6 +15,7 @@ package infoschema import ( + "context" "fmt" "strings" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -190,7 +192,7 @@ type Builder struct { // ApplyDiff applies SchemaDiff to the new InfoSchema. // Return the detail updated table IDs that are produced from SchemaDiff and an error. -func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) ApplyDiff(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version switch diff.Type { case model.ActionCreateSchema: @@ -214,13 +216,15 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro case model.ActionRecoverTable: return b.applyRecoverTable(m, diff) case model.ActionCreateTables: - return b.applyCreateTables(m, diff) + return b.applyCreateTables(store, m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangeTablePartition(store, m, diff) default: - return b.applyDefaultAction(m, diff) + return b.applyDefaultAction(store, m, diff) } } -func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyCreateTables(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs := make([]int64, 0, len(diff.AffectedOpts)) if diff.AffectedOpts != nil { for _, opt := range diff.AffectedOpts { @@ -232,7 +236,7 @@ func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) + affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) if err != nil { return nil, errors.Trace(err) } @@ -287,7 +291,71 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int return tblIDs, nil } -func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyExchangeTablePartition(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + tblIDs, err := b.applyTableUpdate(m, diff) + if err != nil { + return nil, errors.Trace(err) + } + + for _, opt := range diff.AffectedOpts { + affectedDiff := &model.SchemaDiff{ + Version: diff.Version, + Type: diff.Type, + SchemaID: opt.SchemaID, + TableID: opt.TableID, + OldSchemaID: opt.OldSchemaID, + OldTableID: opt.OldTableID, + } + affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) + if err != nil { + return nil, errors.Trace(err) + } + tblIDs = append(tblIDs, affectedIDs...) + + // handle partition table and table AutoID + err = updateAutoIDForExchangePartition(store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) + if err != nil { + return nil, errors.Trace(err) + } + } + + return tblIDs, nil +} + +func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSchemaID, ntID int64) error { + // partition table auto IDs. + + err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() + if err != nil { + return err + } + + // non-partition table auto IDs. + ntAutoIDs, err := t.GetAutoIDAccessors(ntSchemaID, ntID).Get() + if err != nil { + return err + } + + // Set both tables to the maximum auto IDs between normal table and partitioned table. + newAutoIDs := meta.AutoIDGroup{ + RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), + IncrementID: mathutil.Max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID), + RandomID: mathutil.Max(ptAutoIDs.RandomID, ntAutoIDs.RandomID), + } + err = t.GetAutoIDAccessors(ptSchemaID, ptID).Put(newAutoIDs) + if err != nil { + return err + } + err = t.GetAutoIDAccessors(ntSchemaID, ntID).Put(newAutoIDs) + return nil + }) + + return err +} + +func (b *Builder) applyDefaultAction(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { return nil, errors.Trace(err) @@ -303,7 +371,7 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) + affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) if err != nil { return nil, errors.Trace(err) } From 901ccfc79bf075b49e0a88b5be876df32a88ecc7 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 14:41:52 +0800 Subject: [PATCH 03/11] fix --- ddl/db_partition_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 0a2f429e1c59f..d7d09f2c6622b 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2113,8 +2113,8 @@ func TestAlterTableExchangePartition(t *testing.T) { tk.MustExec("insert into pt values (2)") tk.MustExec("alter table e18 exchange partition p0 with table e17") - tk.MustQuery("select * /*+ read_from_storage(tiflash[pt]) */ from e18").Check(testkit.Rows("1")) - tk.MustQuery("select * /*+ read_from_storage(tiflash[pt]) */ from e17").Check(testkit.Rows("2")) + tk.MustQuery("select * /*+ read_from_storage(tiflash[e18]) */ from e18").Check(testkit.Rows("1")) + tk.MustQuery("select * /*+ read_from_storage(tiflash[e17]) */ from e17").Check(testkit.Rows("2")) } func TestExchangePartitionTableCompatiable(t *testing.T) { From 93687752fa5a3a9cfbb4517aab3058754d827b25 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 15:04:22 +0800 Subject: [PATCH 04/11] fix test --- infoschema/infoschema_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index c7629f65f113e..1cd6e1ea5263b 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -115,8 +115,8 @@ func TestBasic(t *testing.T) { txn, err := store.Begin() require.NoError(t, err) - checkApplyCreateNonExistsSchemaDoesNotPanic(t, txn, builder) - checkApplyCreateNonExistsTableDoesNotPanic(t, txn, builder, dbID) + checkApplyCreateNonExistsSchemaDoesNotPanic(t, txn, builder, store) + checkApplyCreateNonExistsTableDoesNotPanic(t, txn, builder, dbID, store) err = txn.Rollback() require.NoError(t, err) @@ -202,7 +202,7 @@ func TestBasic(t *testing.T) { require.NoError(t, err) txn, err = store.Begin() require.NoError(t, err) - _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionRenameTable, SchemaID: dbID, TableID: tbID, OldSchemaID: dbID}) + _, err = builder.ApplyDiff(store, meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionRenameTable, SchemaID: dbID, TableID: tbID, OldSchemaID: dbID}) require.NoError(t, err) err = txn.Rollback() require.NoError(t, err) @@ -235,15 +235,15 @@ func TestMockInfoSchema(t *testing.T) { require.Equal(t, colInfo, tbl.Cols()[0].ColumnInfo) } -func checkApplyCreateNonExistsSchemaDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder) { +func checkApplyCreateNonExistsSchemaDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, store kv.Storage) { m := meta.NewMeta(txn) - _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) + _, err := builder.ApplyDiff(store, m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) require.True(t, infoschema.ErrDatabaseNotExists.Equal(err)) } -func checkApplyCreateNonExistsTableDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, dbID int64) { +func checkApplyCreateNonExistsTableDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, dbID int64, store kv.Storage) { m := meta.NewMeta(txn) - _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) + _, err := builder.ApplyDiff(store, m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) require.True(t, infoschema.ErrTableNotExists.Equal(err)) } From 68f7744a473693c7efe59bc39b2523e910c6634d Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 15:26:43 +0800 Subject: [PATCH 05/11] fix --- infoschema/builder.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/infoschema/builder.go b/infoschema/builder.go index 31e886e74d1a9..5d55277ca0d89 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -349,6 +349,9 @@ func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSche return err } err = t.GetAutoIDAccessors(ntSchemaID, ntID).Put(newAutoIDs) + if err != nil { + return err + } return nil }) From fc7610b50a55b8943423421683f72c24956d1ff1 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 15:51:26 +0800 Subject: [PATCH 06/11] fix test --- ddl/db_partition_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index d7d09f2c6622b..599934d5a100e 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2422,8 +2422,8 @@ func TestExchangePartitionAutoID(t *testing.T) { tk.MustExec("alter table pt exchange partition p0 with table nt") tk.MustExec("insert into nt values (NULL)") - tk.MustQuery("select * from nt").Check(testkit.Rows("1", "42000001")) - tk.MustQuery("select * from pt").Sort().Check(testkit.Rows("1", "4", "40000000")) + tk.MustQuery("select count(*) from nt where a >= 4000000").Check(testkit.Rows("1")) + tk.MustQuery("select count(*) from pt where a >= 4000000").Check(testkit.Rows("1")) } func TestExchangePartitionExpressIndex(t *testing.T) { From 9c5228517ccdbdd3c59b52ceb2a568ce4d6ac7b0 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 5 Aug 2022 16:15:59 +0800 Subject: [PATCH 07/11] fix --- ddl/db_partition_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 599934d5a100e..7ebe6c443d432 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2110,7 +2110,7 @@ func TestAlterTableExchangePartition(t *testing.T) { tk.MustExec("create table e18 (a int) partition by range (a) (partition p0 values less than (4), partition p1 values less than (10))") tk.MustExec("alter table e18 set tiflash replica 1") - tk.MustExec("insert into pt values (2)") + tk.MustExec("insert into e18 values (2)") tk.MustExec("alter table e18 exchange partition p0 with table e17") tk.MustQuery("select * /*+ read_from_storage(tiflash[e18]) */ from e18").Check(testkit.Rows("1")) From 157b7bd3b545c87437f4619bd921d73412a5083d Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Tue, 9 Aug 2022 22:45:30 +0800 Subject: [PATCH 08/11] address comment --- ddl/callback_test.go | 5 +++++ ddl/db_partition_test.go | 12 +++--------- ddl/partition.go | 10 +++------- domain/domain.go | 6 +++--- infoschema/builder.go | 24 +++++++++++------------- infoschema/infoschema_test.go | 6 +++--- 6 files changed, 28 insertions(+), 35 deletions(-) diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 85e26209fff7c..49753bceeb274 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -52,6 +52,7 @@ type TestDDLCallback struct { onJobUpdated func(*model.Job) OnJobUpdatedExported func(*model.Job) onWatched func(ctx context.Context) + OnWatchedExported func(ctx context.Context) OnGetJobBeforeExported func(string) OnGetJobAfterExported func(string, *model.Job) } @@ -112,6 +113,10 @@ func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) { // OnWatched is used to run the user customized logic of `OnWatched` first. func (tc *TestDDLCallback) OnWatched(ctx context.Context) { + if tc.OnWatchedExported != nil { + tc.OnWatchedExported(ctx) + return + } if tc.onWatched != nil { tc.onWatched(ctx) return diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 7ebe6c443d432..4bef17d0c892a 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2404,21 +2404,15 @@ func TestExchangePartitionAutoID(t *testing.T) { PARTITION p3 values less than (50000000) );`) tk.MustExec(`create table nt(a int primary key auto_increment);`) - tk.MustExec(`insert into pt values (0), (4)`) tk.MustExec("insert into nt values (1)") hook := &ddl.TestDDLCallback{Do: dom} dom.DDL().SetHook(hook) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `return(true)`)) - defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) - - hookFunc := func(job *model.Job) { - if job.Type == model.ActionExchangeTablePartition && job.State == model.JobStateRunning { - tk.MustExec(`insert into pt values (40000000)`) - } + hookFunc := func(ctx context.Context) { + tk.MustExec(`insert into pt values (40000000)`) } - hook.OnJobUpdatedExported = hookFunc + hook.OnWatchedExported = hookFunc tk.MustExec("alter table pt exchange partition p0 with table nt") tk.MustExec("insert into nt values (NULL)") diff --git a/ddl/partition.go b/ddl/partition.go index f89bb426ca4ca..440f145eda4c9 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1483,13 +1483,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - failpoint.Inject("exchangePartitionAutoID", func(val failpoint.Value) { - if val.(bool) { - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() - } - }) + d.mu.RLock() + d.mu.hook.OnWatched(d.ctx) + d.mu.RUnlock() err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) if err != nil { diff --git a/domain/domain.go b/domain/domain.go index 11a8c11702283..a8f35f8abdd05 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -148,7 +148,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // 3. There are less 100 diffs. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { - is, relatedChanges, err := do.tryLoadSchemaDiffs(do.store, m, currentSchemaVersion, neededSchemaVersion) + is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) if err == nil { do.infoCache.Insert(is, startTS) logutil.BgLogger().Info("diff load InfoSchema success", @@ -281,7 +281,7 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(store kv.Storage, m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { +func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ @@ -301,7 +301,7 @@ func (do *Domain) tryLoadSchemaDiffs(store kv.Storage, m *meta.Meta, usedVersion phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { - IDs, err := builder.ApplyDiff(store, m, diff) + IDs, err := builder.ApplyDiff(m, diff) if err != nil { return nil, nil, err } diff --git a/infoschema/builder.go b/infoschema/builder.go index 5d55277ca0d89..521139975e1f2 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -192,7 +192,7 @@ type Builder struct { // ApplyDiff applies SchemaDiff to the new InfoSchema. // Return the detail updated table IDs that are produced from SchemaDiff and an error. -func (b *Builder) ApplyDiff(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version switch diff.Type { case model.ActionCreateSchema: @@ -216,15 +216,15 @@ func (b *Builder) ApplyDiff(store kv.Storage, m *meta.Meta, diff *model.SchemaDi case model.ActionRecoverTable: return b.applyRecoverTable(m, diff) case model.ActionCreateTables: - return b.applyCreateTables(store, m, diff) + return b.applyCreateTables(m, diff) case model.ActionExchangeTablePartition: - return b.applyExchangeTablePartition(store, m, diff) + return b.applyExchangeTablePartition(m, diff) default: - return b.applyDefaultAction(store, m, diff) + return b.applyDefaultAction(m, diff) } } -func (b *Builder) applyCreateTables(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs := make([]int64, 0, len(diff.AffectedOpts)) if diff.AffectedOpts != nil { for _, opt := range diff.AffectedOpts { @@ -236,7 +236,7 @@ func (b *Builder) applyCreateTables(store kv.Storage, m *meta.Meta, diff *model. OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) + affectedIDs, err := b.ApplyDiff(m, affectedDiff) if err != nil { return nil, errors.Trace(err) } @@ -291,7 +291,7 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int return tblIDs, nil } -func (b *Builder) applyExchangeTablePartition(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { return nil, errors.Trace(err) @@ -306,14 +306,14 @@ func (b *Builder) applyExchangeTablePartition(store kv.Storage, m *meta.Meta, di OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) + affectedIDs, err := b.ApplyDiff(m, affectedDiff) if err != nil { return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) + err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) if err != nil { return nil, errors.Trace(err) } @@ -323,8 +323,6 @@ func (b *Builder) applyExchangeTablePartition(store kv.Storage, m *meta.Meta, di } func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSchemaID, ntID int64) error { - // partition table auto IDs. - err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() @@ -358,7 +356,7 @@ func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSche return err } -func (b *Builder) applyDefaultAction(store kv.Storage, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { return nil, errors.Trace(err) @@ -374,7 +372,7 @@ func (b *Builder) applyDefaultAction(store kv.Storage, m *meta.Meta, diff *model OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(store, m, affectedDiff) + affectedIDs, err := b.ApplyDiff(m, affectedDiff) if err != nil { return nil, errors.Trace(err) } diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 1cd6e1ea5263b..afc146d44145d 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -202,7 +202,7 @@ func TestBasic(t *testing.T) { require.NoError(t, err) txn, err = store.Begin() require.NoError(t, err) - _, err = builder.ApplyDiff(store, meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionRenameTable, SchemaID: dbID, TableID: tbID, OldSchemaID: dbID}) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionRenameTable, SchemaID: dbID, TableID: tbID, OldSchemaID: dbID}) require.NoError(t, err) err = txn.Rollback() require.NoError(t, err) @@ -237,13 +237,13 @@ func TestMockInfoSchema(t *testing.T) { func checkApplyCreateNonExistsSchemaDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, store kv.Storage) { m := meta.NewMeta(txn) - _, err := builder.ApplyDiff(store, m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) + _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) require.True(t, infoschema.ErrDatabaseNotExists.Equal(err)) } func checkApplyCreateNonExistsTableDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, dbID int64, store kv.Storage) { m := meta.NewMeta(txn) - _, err := builder.ApplyDiff(store, m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) + _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) require.True(t, infoschema.ErrTableNotExists.Equal(err)) } From 4115c4b88b2e899670863b707b9e8023e6b85415 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Tue, 9 Aug 2022 22:47:41 +0800 Subject: [PATCH 09/11] fix --- infoschema/infoschema_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index afc146d44145d..c7629f65f113e 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -115,8 +115,8 @@ func TestBasic(t *testing.T) { txn, err := store.Begin() require.NoError(t, err) - checkApplyCreateNonExistsSchemaDoesNotPanic(t, txn, builder, store) - checkApplyCreateNonExistsTableDoesNotPanic(t, txn, builder, dbID, store) + checkApplyCreateNonExistsSchemaDoesNotPanic(t, txn, builder) + checkApplyCreateNonExistsTableDoesNotPanic(t, txn, builder, dbID) err = txn.Rollback() require.NoError(t, err) @@ -235,13 +235,13 @@ func TestMockInfoSchema(t *testing.T) { require.Equal(t, colInfo, tbl.Cols()[0].ColumnInfo) } -func checkApplyCreateNonExistsSchemaDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, store kv.Storage) { +func checkApplyCreateNonExistsSchemaDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder) { m := meta.NewMeta(txn) _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) require.True(t, infoschema.ErrDatabaseNotExists.Equal(err)) } -func checkApplyCreateNonExistsTableDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, dbID int64, store kv.Storage) { +func checkApplyCreateNonExistsTableDoesNotPanic(t *testing.T, txn kv.Transaction, builder *infoschema.Builder, dbID int64) { m := meta.NewMeta(txn) _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) require.True(t, infoschema.ErrTableNotExists.Equal(err)) From 8628dada3e057aec7612547b9411d87da87cfdf7 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 11 Aug 2022 17:16:19 +0800 Subject: [PATCH 10/11] failpoint replace hook --- ddl/callback_test.go | 5 ----- ddl/db_partition_test.go | 12 +++++------- ddl/partition.go | 17 ++++++++++++++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 49753bceeb274..85e26209fff7c 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -52,7 +52,6 @@ type TestDDLCallback struct { onJobUpdated func(*model.Job) OnJobUpdatedExported func(*model.Job) onWatched func(ctx context.Context) - OnWatchedExported func(ctx context.Context) OnGetJobBeforeExported func(string) OnGetJobAfterExported func(string, *model.Job) } @@ -113,10 +112,6 @@ func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) { // OnWatched is used to run the user customized logic of `OnWatched` first. func (tc *TestDDLCallback) OnWatched(ctx context.Context) { - if tc.OnWatchedExported != nil { - tc.OnWatchedExported(ctx) - return - } if tc.onWatched != nil { tc.onWatched(ctx) return diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 76f1c44564f8c..67f5a53574e88 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2390,7 +2390,7 @@ func TestExchangePartitionHook(t *testing.T) { } func TestExchangePartitionAutoID(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) + store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set @@tidb_enable_exchange_partition=1") @@ -2407,12 +2407,10 @@ func TestExchangePartitionAutoID(t *testing.T) { tk.MustExec(`insert into pt values (0), (4)`) tk.MustExec("insert into nt values (1)") - hook := &ddl.TestDDLCallback{Do: dom} - dom.DDL().SetHook(hook) - hookFunc := func(ctx context.Context) { - tk.MustExec(`insert into pt values (40000000)`) - } - hook.OnWatchedExported = hookFunc + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + }() tk.MustExec("alter table pt exchange partition p0 with table nt") tk.MustExec("insert into nt values (NULL)") diff --git a/ddl/partition.go b/ddl/partition.go index 4bd24764d8f0a..229d710bf4e4e 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2039,9 +2039,20 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - d.mu.RLock() - d.mu.hook.OnWatched(d.ctx) - d.mu.RUnlock() + failpoint.Inject("exchangePartitionAutoID", func(val failpoint.Value) { + if val.(bool) { + se, err := w.sessPool.get() + defer w.sessPool.put(se) + if err != nil { + failpoint.Return(ver, err) + } + sess := newSession(se) + _, err = sess.execute(context.Background(), "insert into test.pt values (40000000)", "exchange_partition_test") + if err != nil { + failpoint.Return(ver, err) + } + } + }) err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) if err != nil { From 567111bffedf4fd7d97901587e49e72f5f7ff749 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 12 Aug 2022 12:50:40 +0800 Subject: [PATCH 11/11] adress comment --- infoschema/builder.go | 41 ++++++++--------------------------------- 1 file changed, 8 insertions(+), 33 deletions(-) diff --git a/infoschema/builder.go b/infoschema/builder.go index 521139975e1f2..9ac3a203151a4 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -217,8 +217,6 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyRecoverTable(m, diff) case model.ActionCreateTables: return b.applyCreateTables(m, diff) - case model.ActionExchangeTablePartition: - return b.applyExchangeTablePartition(m, diff) default: return b.applyDefaultAction(m, diff) } @@ -291,37 +289,6 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int return tblIDs, nil } -func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - tblIDs, err := b.applyTableUpdate(m, diff) - if err != nil { - return nil, errors.Trace(err) - } - - for _, opt := range diff.AffectedOpts { - affectedDiff := &model.SchemaDiff{ - Version: diff.Version, - Type: diff.Type, - SchemaID: opt.SchemaID, - TableID: opt.TableID, - OldSchemaID: opt.OldSchemaID, - OldTableID: opt.OldTableID, - } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) - if err != nil { - return nil, errors.Trace(err) - } - tblIDs = append(tblIDs, affectedIDs...) - - // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) - if err != nil { - return nil, errors.Trace(err) - } - } - - return tblIDs, nil -} - func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSchemaID, ntID int64) error { err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) @@ -377,6 +344,14 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) + + if diff.Type == model.ActionExchangeTablePartition { + // handle partition table and table AutoID + err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) + if err != nil { + return nil, errors.Trace(err) + } + } } return tblIDs, nil