Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table partition: enhance auto id for exchange partition with table #36663

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package restore_test
import (
"context"
"encoding/json"
"github.com/pingcap/tidb/infoschema"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can revert this file change?

"math"
"strconv"
"testing"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down
1 change: 1 addition & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
"only_files": {
"ddl/index.go": "ddl/index code",
"planner/core/rule_partition_eliminate.go": "planner/core/rule_partition_eliminate code",
"infoschema/builder.go": "infoschema/builder.go code",
"distsql/": "ignore distsql code",
"dumpling/export": "dumpling/export code",
"lock/": "lock file",
Expand Down
83 changes: 83 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,89 @@ func TestExchangePartitionHook(t *testing.T) {
tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
}

func TestExchangePartitionHook3(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
// why use tkCancel, not tk.
//tkCancel := testkit.NewTestKit(t, store)
Comment on lines +2384 to +2385
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// why use tkCancel, not tk.
//tkCancel := testkit.NewTestKit(t, store)


tk.MustExec("set @@tidb_enable_exchange_partition=1")
defer tk.MustExec("set @@tidb_enable_exchange_partition=0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. I think this is a session scoped variable and the session will end after the test anyway?


tk.MustExec("use test")
tk.MustExec(`create table pt (a int primary key auto_increment) partition by range(a) (
partition p0 values less than (3),
partition p1 values less than (6),
PARTITION p2 values less than (9),
PARTITION p3 values less than (MAXVALUE)
);`)
tk.MustExec(`create table nt(a int primary key auto_increment);`)

tk.MustExec(`insert into pt values (0), (4), (7)`)
tk.MustExec("insert into nt values (1)")

hook := &ddl.TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)

hookFunc := func(job *model.Job) {
tk.MustExec(`insert into pt values (40000000)`)
}
hook.OnJobUpdatedExported = hookFunc

//tk.MustExec(`create table nt1(a int);`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//tk.MustExec(`create table nt1(a int);`)

tk.MustExec("alter table pt exchange partition p0 with table nt")
time.Sleep(time.Duration(63) * time.Second)
tk.MustExec("insert into nt values (NULL)")
tk.MustQuery("select * from nt").Check(testkit.Rows(""))
tk.MustQuery("select * from pt").Check(testkit.Rows(""))

//tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
//tk.MustExec("alter table pt exchange partition p0 with table nt")
//tkCancel.MustExec("insert into nt values (1)")
//tkCancel.MustExec("use test")
// tkCancel.MustExec("insert into pt values (2)")
// tkCancel.MustQuery("select * from pt partition(p0)").Check(testkit.Rows())
Comment on lines +2416 to +2422
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
//tk.MustExec("alter table pt exchange partition p0 with table nt")
//tkCancel.MustExec("insert into nt values (1)")
//tkCancel.MustExec("use test")
// tkCancel.MustExec("insert into pt values (2)")
// tkCancel.MustQuery("select * from pt partition(p0)").Check(testkit.Rows())

}

func TestExchangePartitionHook2(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
// why use tkCancel, not tk.
//tkCancel := testkit.NewTestKit(t, store)

tk.MustExec("set @@tidb_enable_exchange_partition=1")
defer tk.MustExec("set @@tidb_enable_exchange_partition=0")

tk.MustExec("use test")
tk.MustExec(`CREATE TABLE pt (a int primary key auto_increment, b varchar(255))
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1000000),
partition p1M values less than (2000000));`)
tk.MustExec(`create table t (a int primary key auto_increment, b varchar(255))`)

tk.MustExec(`insert into pt values (1, "1")`)
tk.MustExec(`insert into t values (2, "2")`)
tk.MustExec(`insert into t values (4000000, "4M")`)
tk.MustExec(`delete from t where a = 4000000`)
tk.MustExec(`alter table pt exchange partition p0 with table t`)

//hook := &ddl.TestDDLCallback{Do: dom}
//dom.DDL().SetHook(hook)
//
//hookFunc := func(job *model.Job) {
// if job.Type == model.ActionExchangeTablePartition && job.SchemaState != model.StateNone {
// tkCancel.MustExec("use test")
// tkCancel.MustGetErrCode("insert into nt values (5)", tmysql.ErrRowDoesNotMatchGivenPartitionSet)
// }
//}
//hook.OnJobUpdatedExported = hookFunc
//
//tk.MustExec("alter table pt exchange partition p0 with table nt")
//tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
}

func TestExchangePartitionExpressIndex(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
Expand Down
32 changes: 32 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,15 +1597,33 @@ func GetLastHistoryDDLJobsIterator(m *meta.Meta) (meta.LastJobIterator, error) {
return m.GetLastHistoryDDLJobsIterator()
}

// Session wraps sessionctx.Context for transaction usage.
type Session struct {
sessionctx.Context
}

// session wraps sessionctx.Context for transaction usage.
type session struct {
sessionctx.Context
}

func NewSession(s sessionctx.Context) *session {
return &session{s}
}

func newSession(s sessionctx.Context) *session {
return &session{s}
}

func (s *session) Begin() error {
err := sessiontxn.NewTxn(context.Background(), s)
if err != nil {
return err
}
s.GetSessionVars().SetInTxn(true)
return nil
}

func (s *session) begin() error {
err := sessiontxn.NewTxn(context.Background(), s)
if err != nil {
Expand All @@ -1615,15 +1633,29 @@ func (s *session) begin() error {
return nil
}

func (s *session) Commit() error {
s.StmtCommit()
return s.CommitTxn(context.Background())
}

func (s *session) commit() error {
s.StmtCommit()
return s.CommitTxn(context.Background())
}

func (s *session) STxn() (kv.Transaction, error) {
return s.Txn(true)
}

func (s *session) txn() (kv.Transaction, error) {
return s.Txn(true)
}

func (s *session) Rollback() {
s.StmtRollback()
s.RollbackTxn(context.Background())
}

func (s *session) rollback() {
s.StmtRollback()
s.RollbackTxn(context.Background())
Expand Down
12 changes: 6 additions & 6 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,9 +797,9 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) error {
d.mu.RUnlock()
}

d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()
//d.mu.RLock()
//d.mu.hook.OnJobUpdated(job)
//d.mu.RUnlock()

return nil
}
Expand Down Expand Up @@ -957,9 +957,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
d.mu.RUnlock()
}

d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()
//d.mu.RLock()
//d.mu.hook.OnJobUpdated(job)
//d.mu.RUnlock()

if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() {
asyncNotify(d.ddlJobDoneCh)
Expand Down
7 changes: 7 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,13 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

logutil.BgLogger().Info("!!!!onExchangeTablePartition", zap.Int64("RowID", newAutoIDs.RowID),
zap.Int64("IncrementID", newAutoIDs.IncrementID), zap.Int64("RandomID", newAutoIDs.RandomID))

d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()

err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package ddl_test

import (
"context"
"github.com/pingcap/tidb/infoschema"
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
Expand Down
12 changes: 9 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func (do *Domain) EtcdClient() *clientv3.Client {
func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, int64, *transaction.RelatedSchemaChange, error) {
snapshot := do.store.GetSnapshot(kv.NewVersion(startTS))
m := meta.NewSnapshotMeta(snapshot)
sysSess, err := do.sysSessionPool.Get()
if err != nil {
return nil, false, 0, nil, err
}
sessCtx := sysSess.(sessionctx.Context)

neededSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return nil, false, 0, nil, err
Expand All @@ -148,7 +154,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 3. There are less 100 diffs.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
is, relatedChanges, err := do.tryLoadSchemaDiffs(sessCtx, m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
do.infoCache.Insert(is, startTS)
logutil.BgLogger().Info("diff load InfoSchema success",
Expand Down Expand Up @@ -281,7 +287,7 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta,
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) {
func (do *Domain) tryLoadSchemaDiffs(sess sessionctx.Context, m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
Expand All @@ -301,7 +307,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
IDs, err := builder.ApplyDiff(m, diff)
IDs, err := builder.ApplyDiff(sess, m, diff)
if err != nil {
return nil, nil, err
}
Expand Down
Loading