Skip to content

Commit

Permalink
infoschema_v2: support bundles for apply diff (#51715)
Browse files Browse the repository at this point in the history
ref #51680
  • Loading branch information
GMHDBJD committed Mar 13, 2024
1 parent 94eb9f5 commit 0d742d3
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/infoschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ go_test(
],
embed = [":infoschema"],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/ddl/placement",
"//pkg/domain",
Expand Down
9 changes: 5 additions & 4 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func applyTruncateTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaD
return tblIDs, nil
}

func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := applyTableUpdate(b, m, diff)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -146,7 +146,7 @@ func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff
return tblIDs, nil
}

func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func applyReorganizePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := applyTableUpdate(b, m, diff)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -165,7 +165,7 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff)
return tblIDs, nil
}

func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func applyExchangeTablePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// It is not in StatePublic.
if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID {
ntIDs, err := applyTableUpdate(b, m, diff)
Expand Down Expand Up @@ -793,12 +793,13 @@ func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64)

// Build builds and returns the built infoschema.
func (b *Builder) Build() InfoSchema {
b.updateInfoSchemaBundles(b.infoSchema)
if b.enableV2 {
b.infoschemaV2.ts = math.MaxUint64 // TODO: should be the correct TS
b.infoschemaV2.schemaVersion = b.infoSchema.SchemaMetaVersion()
updateInfoSchemaBundles(b)
return &b.infoschemaV2
}
updateInfoSchemaBundles(b)
return b.infoSchema
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/infoschema/bundle_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) {
}
}

func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) {
tbl, ok := is.TableByID(tableID)
func (b *bundleInfoBuilder) updateTableBundles(infoSchemaInterface InfoSchema, tableID int64) {
is := infoSchemaInterface.base()
tbl, ok := infoSchemaInterface.TableByID(tableID)
if !ok {
b.deleteBundle(is, tableID)
return
Expand Down
4 changes: 4 additions & 0 deletions pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) Inf

var _ InfoSchema = (*infoSchema)(nil)

func (is *infoSchema) base() *infoSchema {
return is
}

func (is *infoSchema) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) {
tableNames, ok := is.schemaMap[schema.L]
if !ok {
Expand Down
75 changes: 49 additions & 26 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ func search(bt *btree.BTreeG[tableItem], schemaVersion int64, end tableItem, mat
return target, ok
}

func (is *infoschemaV2) base() *infoSchema {
return is.infoSchema
}

func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) {
// Get from the cache.
key := tableCacheKey{id, is.schemaVersion}
Expand Down Expand Up @@ -609,10 +613,6 @@ func applyModifySchemaDefaultPlacement(b *Builder, m *meta.Meta, diff *model.Sch
return b.applyModifySchemaDefaultPlacement(m, diff)
}

func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
return b.applyDropTableOrPartition(m, diff)
}

func applyRecoverTable(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
return b.applyRecoverTable(m, diff)
}
Expand All @@ -621,18 +621,12 @@ func applyCreateTables(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int6
return b.applyCreateTables(m, diff)
}

func applyReorganizePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func updateInfoSchemaBundles(b *Builder) {
if b.enableV2 {
return b.applyReorganizePartitionV2(m, diff)
b.updateInfoSchemaBundlesV2(&b.infoschemaV2)
} else {
b.updateInfoSchemaBundles(b.infoSchema)
}
return b.applyReorganizePartition(m, diff)
}

func applyExchangeTablePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if b.enableV2 {
return b.applyExchangeTablePartitionV2(m, diff)
}
return b.applyExchangeTablePartition(m, diff)
}

// TODO: more UT to check the correctness.
Expand Down Expand Up @@ -724,14 +718,6 @@ func (b *Builder) applyModifySchemaDefaultPlacementV2(m *meta.Meta, diff *model.
return nil
}

func (b *Builder) applyTruncateTableOrPartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
}

func (b *Builder) applyDropTableOrPartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
}

func (b *Builder) applyRecoverTableV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
}
Expand All @@ -740,10 +726,47 @@ func (b *Builder) applyCreateTablesV2(m *meta.Meta, diff *model.SchemaDiff) ([]i
panic("TODO")
}

func (b *Builder) applyReorganizePartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
func (b *bundleInfoBuilder) updateInfoSchemaBundlesV2(is *infoschemaV2) {
if b.deltaUpdate {
b.completeUpdateTablesV2(is)
for tblID := range b.updateTables {
b.updateTableBundles(is, tblID)
}
return
}

// do full update bundles
// TODO: This is quite inefficient! we need some better way or avoid this API.
is.ruleBundleMap = make(map[int64]*placement.Bundle)
for _, dbInfo := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(dbInfo.Name) {
b.updateTableBundles(is, tbl.Meta().ID)
}
}
}

func (b *Builder) applyExchangeTablePartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
func (b *bundleInfoBuilder) completeUpdateTablesV2(is *infoschemaV2) {
if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 {
return
}

// TODO: This is quite inefficient! we need some better way or avoid this API.
for _, dbInfo := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(dbInfo.Name) {
tblInfo := tbl.Meta()
if tblInfo.PlacementPolicyRef != nil {
if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok {
b.markTableBundleShouldUpdate(tblInfo.ID)
}
}

if tblInfo.Partition != nil {
for _, par := range tblInfo.Partition.Definitions {
if _, ok := b.updatePartitions[par.ID]; ok {
b.markTableBundleShouldUpdate(tblInfo.ID)
}
}
}
}
}
}
83 changes: 82 additions & 1 deletion pkg/infoschema/infoschema_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMisc(t *testing.T) {
internal.UpdatePolicy(t, r.Store(), policyInfo)
txn, err = r.Store().Begin()
require.NoError(t, err)
err = applyCreatePolicy(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: policyInfo.ID})
_, err = applyAlterPolicy(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: policyInfo.ID})
require.NoError(t, err)
is = builder.Build()
require.Len(t, is.AllPlacementPolicies(), 2)
Expand All @@ -190,3 +190,84 @@ func TestMisc(t *testing.T) {
require.Equal(t, policyInfo2, getPolicyInfo)
require.NoError(t, txn.Rollback())
}

func TestBundles(t *testing.T) {
r := internal.CreateAutoIDRequirement(t)
defer func() {
r.Store().Close()
}()

schemaName := model.NewCIStr("testDB")
tableName := model.NewCIStr("test")
builder, err := NewBuilder(r, nil, NewData()).InitWithDBInfos(nil, nil, nil, 1)
require.NoError(t, err)
is := builder.Build()
require.Equal(t, 2, len(is.AllSchemas()))

// create database
dbInfo := internal.MockDBInfo(t, r.Store(), schemaName.O)
internal.AddDB(t, r.Store(), dbInfo)
txn, err := r.Store().Begin()
require.NoError(t, err)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreateSchema, Version: 1, SchemaID: dbInfo.ID})
require.NoError(t, err)
is = builder.Build()
require.Equal(t, 3, len(is.AllSchemas()))
require.NoError(t, txn.Rollback())

// create table
tblInfo := internal.MockTableInfo(t, r.Store(), tableName.O)
tblInfo.Partition = &model.PartitionInfo{Definitions: []model.PartitionDefinition{{ID: 1}, {ID: 2}}}
internal.AddTable(t, r.Store(), dbInfo, tblInfo)
txn, err = r.Store().Begin()
require.NoError(t, err)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreateTable, Version: 2, SchemaID: dbInfo.ID, TableID: tblInfo.ID})
require.NoError(t, err)
is = builder.Build()
require.Equal(t, 1, len(is.SchemaTables(dbInfo.Name)))
require.NoError(t, txn.Rollback())

// test create policy
policyInfo := internal.MockPolicyInfo(t, r.Store(), "test")
internal.CreatePolicy(t, r.Store(), policyInfo)
txn, err = r.Store().Begin()
require.NoError(t, err)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreatePlacementPolicy, Version: 3, SchemaID: policyInfo.ID})
require.NoError(t, err)
is = builder.Build()
require.Len(t, is.AllPlacementPolicies(), 1)
getPolicyInfo, ok := is.PolicyByName(policyInfo.Name)
require.True(t, ok)
require.Equal(t, policyInfo, getPolicyInfo)
require.NoError(t, txn.Rollback())

// markTableBundleShouldUpdate
// test alter table placement
policyRefInfo := internal.MockPolicyRefInfo(t, r.Store(), "test")
tblInfo.PlacementPolicyRef = policyRefInfo
internal.UpdateTable(t, r.Store(), dbInfo, tblInfo)
txn, err = r.Store().Begin()
require.NoError(t, err)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionAlterTablePlacement, Version: 4, SchemaID: dbInfo.ID, TableID: tblInfo.ID})
require.NoError(t, err)
is = builder.Build()
getTableInfo, err := is.TableByName(schemaName, tableName)
require.NoError(t, err)
require.Equal(t, policyRefInfo, getTableInfo.Meta().PlacementPolicyRef)
require.NoError(t, txn.Rollback())

// markBundlesReferPolicyShouldUpdate
// test alter policy
policyInfo.State = model.StatePublic
internal.UpdatePolicy(t, r.Store(), policyInfo)
txn, err = r.Store().Begin()
require.NoError(t, err)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionAlterPlacementPolicy, Version: 5, SchemaID: policyInfo.ID})
require.NoError(t, err)
is = builder.Build()
getTableInfo, err = is.TableByName(schemaName, tableName)
require.NoError(t, err)
getPolicyInfo, ok = is.PolicyByName(getTableInfo.Meta().PlacementPolicyRef.Name)
require.True(t, ok)
require.Equal(t, policyInfo, getPolicyInfo)
}
1 change: 1 addition & 0 deletions pkg/infoschema/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type InfoSchema interface {
SchemaMetaVersion() int64
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition)
Misc
base() *infoSchema
}

// Misc contains the methods that are not closely related to InfoSchema.
Expand Down
21 changes: 21 additions & 0 deletions pkg/infoschema/internal/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ func MockPolicyInfo(t *testing.T, store kv.Storage, policyName string) *model.Po
}
}

// MockPolicyRefInfo mock policy ref info for testing.
func MockPolicyRefInfo(t *testing.T, store kv.Storage, policyName string) *model.PolicyRefInfo {
id, err := GenGlobalID(store)
require.NoError(t, err)
return &model.PolicyRefInfo{
ID: id,
Name: model.NewCIStr(policyName),
}
}

// AddTable add mock table for testing.
func AddTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
Expand All @@ -211,6 +221,17 @@ func AddTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *mod
require.NoError(t, err)
}

// UpdateTable update mock table for testing.
func UpdateTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
err := meta.NewMeta(txn).UpdateTable(dbInfo.ID, tblInfo)
require.NoError(t, err)
return errors.Trace(err)
})
require.NoError(t, err)
}

// DropTable drop mock table for testing.
func DropTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblID int64, tblName string) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
Expand Down

0 comments on commit 0d742d3

Please sign in to comment.