diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 8801a6af34727..4b1165865d14c 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -5,6 +5,7 @@ package restore_test import ( "context" "encoding/json" + "github.com/pingcap/tidb/infoschema" "math" "strconv" "testing" @@ -18,7 +19,6 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" diff --git a/build/nogo_config.json b/build/nogo_config.json index b9f06516e625d..49e9aa9764798 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -264,6 +264,7 @@ "only_files": { "ddl/index.go": "ddl/index code", "planner/core/rule_partition_eliminate.go": "planner/core/rule_partition_eliminate code", + "infoschema/builder.go": "infoschema/builder.go code", "distsql/": "ignore distsql code", "dumpling/export": "dumpling/export code", "lock/": "lock file", diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 5a3bdb7588a8b..fc94514f8d57b 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2377,6 +2377,89 @@ func TestExchangePartitionHook(t *testing.T) { tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1")) } +func TestExchangePartitionHook3(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + // why use tkCancel, not tk. + //tkCancel := testkit.NewTestKit(t, store) + + tk.MustExec("set @@tidb_enable_exchange_partition=1") + defer tk.MustExec("set @@tidb_enable_exchange_partition=0") + + tk.MustExec("use test") + tk.MustExec(`create table pt (a int primary key auto_increment) partition by range(a) ( + partition p0 values less than (3), + partition p1 values less than (6), + PARTITION p2 values less than (9), + PARTITION p3 values less than (MAXVALUE) + );`) + tk.MustExec(`create table nt(a int primary key auto_increment);`) + + tk.MustExec(`insert into pt values (0), (4), (7)`) + tk.MustExec("insert into nt values (1)") + + hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + + hookFunc := func(job *model.Job) { + tk.MustExec(`insert into pt values (40000000)`) + } + hook.OnJobUpdatedExported = hookFunc + + //tk.MustExec(`create table nt1(a int);`) + tk.MustExec("alter table pt exchange partition p0 with table nt") + time.Sleep(time.Duration(63) * time.Second) + tk.MustExec("insert into nt values (NULL)") + tk.MustQuery("select * from nt").Check(testkit.Rows("")) + tk.MustQuery("select * from pt").Check(testkit.Rows("")) + + //tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1")) + //tk.MustExec("alter table pt exchange partition p0 with table nt") + //tkCancel.MustExec("insert into nt values (1)") + //tkCancel.MustExec("use test") + // tkCancel.MustExec("insert into pt values (2)") + // tkCancel.MustQuery("select * from pt partition(p0)").Check(testkit.Rows()) +} + +func TestExchangePartitionHook2(t *testing.T) { + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + // why use tkCancel, not tk. + //tkCancel := testkit.NewTestKit(t, store) + + tk.MustExec("set @@tidb_enable_exchange_partition=1") + defer tk.MustExec("set @@tidb_enable_exchange_partition=0") + + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE pt (a int primary key auto_increment, b varchar(255)) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (1000000), + partition p1M values less than (2000000));`) + tk.MustExec(`create table t (a int primary key auto_increment, b varchar(255))`) + + tk.MustExec(`insert into pt values (1, "1")`) + tk.MustExec(`insert into t values (2, "2")`) + tk.MustExec(`insert into t values (4000000, "4M")`) + tk.MustExec(`delete from t where a = 4000000`) + tk.MustExec(`alter table pt exchange partition p0 with table t`) + + //hook := &ddl.TestDDLCallback{Do: dom} + //dom.DDL().SetHook(hook) + // + //hookFunc := func(job *model.Job) { + // if job.Type == model.ActionExchangeTablePartition && job.SchemaState != model.StateNone { + // tkCancel.MustExec("use test") + // tkCancel.MustGetErrCode("insert into nt values (5)", tmysql.ErrRowDoesNotMatchGivenPartitionSet) + // } + //} + //hook.OnJobUpdatedExported = hookFunc + // + //tk.MustExec("alter table pt exchange partition p0 with table nt") + //tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1")) +} + func TestExchangePartitionExpressIndex(t *testing.T) { restore := config.RestoreFunc() defer restore() diff --git a/ddl/ddl.go b/ddl/ddl.go index ca272d00fbacf..02d92f36247d1 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1597,15 +1597,33 @@ func GetLastHistoryDDLJobsIterator(m *meta.Meta) (meta.LastJobIterator, error) { return m.GetLastHistoryDDLJobsIterator() } +// Session wraps sessionctx.Context for transaction usage. +type Session struct { + sessionctx.Context +} + // session wraps sessionctx.Context for transaction usage. type session struct { sessionctx.Context } +func NewSession(s sessionctx.Context) *session { + return &session{s} +} + func newSession(s sessionctx.Context) *session { return &session{s} } +func (s *session) Begin() error { + err := sessiontxn.NewTxn(context.Background(), s) + if err != nil { + return err + } + s.GetSessionVars().SetInTxn(true) + return nil +} + func (s *session) begin() error { err := sessiontxn.NewTxn(context.Background(), s) if err != nil { @@ -1615,15 +1633,29 @@ func (s *session) begin() error { return nil } +func (s *session) Commit() error { + s.StmtCommit() + return s.CommitTxn(context.Background()) +} + func (s *session) commit() error { s.StmtCommit() return s.CommitTxn(context.Background()) } +func (s *session) STxn() (kv.Transaction, error) { + return s.Txn(true) +} + func (s *session) txn() (kv.Transaction, error) { return s.Txn(true) } +func (s *session) Rollback() { + s.StmtRollback() + s.RollbackTxn(context.Background()) +} + func (s *session) rollback() { s.StmtRollback() s.RollbackTxn(context.Background()) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 0cb32442d2c68..d4c69d22f4910 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -797,9 +797,9 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) error { d.mu.RUnlock() } - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() + //d.mu.RLock() + //d.mu.hook.OnJobUpdated(job) + //d.mu.RUnlock() return nil } @@ -957,9 +957,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { d.mu.RUnlock() } - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() + //d.mu.RLock() + //d.mu.hook.OnJobUpdated(job) + //d.mu.RUnlock() if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() { asyncNotify(d.ddlJobDoneCh) diff --git a/ddl/partition.go b/ddl/partition.go index 1fcf4ca4f3db0..a38bdf184bc5a 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1483,6 +1483,13 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + logutil.BgLogger().Info("!!!!onExchangeTablePartition", zap.Int64("RowID", newAutoIDs.RowID), + zap.Int64("IncrementID", newAutoIDs.IncrementID), zap.Int64("RandomID", newAutoIDs.RandomID)) + + d.mu.RLock() + d.mu.hook.OnJobUpdated(job) + d.mu.RUnlock() + err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) if err != nil { job.State = model.JobStateCancelled diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index 026fee08e2bdc..7338e8bea0435 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -15,10 +15,10 @@ package ddl_test import ( "context" + "github.com/pingcap/tidb/infoschema" "testing" "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" diff --git a/domain/domain.go b/domain/domain.go index a8f35f8abdd05..e02197b70cbf5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -126,6 +126,12 @@ func (do *Domain) EtcdClient() *clientv3.Client { func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, int64, *transaction.RelatedSchemaChange, error) { snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) m := meta.NewSnapshotMeta(snapshot) + sysSess, err := do.sysSessionPool.Get() + if err != nil { + return nil, false, 0, nil, err + } + sessCtx := sysSess.(sessionctx.Context) + neededSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff() if err != nil { return nil, false, 0, nil, err @@ -148,7 +154,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // 3. There are less 100 diffs. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { - is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) + is, relatedChanges, err := do.tryLoadSchemaDiffs(sessCtx, m, currentSchemaVersion, neededSchemaVersion) if err == nil { do.infoCache.Insert(is, startTS) logutil.BgLogger().Info("diff load InfoSchema success", @@ -281,7 +287,7 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { +func (do *Domain) tryLoadSchemaDiffs(sess sessionctx.Context, m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ @@ -301,7 +307,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { - IDs, err := builder.ApplyDiff(m, diff) + IDs, err := builder.ApplyDiff(sess, m, diff) if err != nil { return nil, nil, err } diff --git a/infoschema/builder.go b/infoschema/builder.go index 4e7b767aecdb0..324dd7d7b98fe 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -16,6 +16,8 @@ package infoschema import ( "fmt" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/tmpsession" "strings" "github.com/ngaut/pools" @@ -32,6 +34,7 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -74,7 +77,7 @@ func (b *bundleInfoBuilder) SetDeltaUpdateBundles() { b.deltaUpdate = true } -func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) { +func (*bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) { delete(is.ruleBundleMap, tblID) } @@ -190,7 +193,7 @@ type Builder struct { // ApplyDiff applies SchemaDiff to the new InfoSchema. // Return the detail updated table IDs that are produced from SchemaDiff and an error. -func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) ApplyDiff(sess sessionctx.Context, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version switch diff.Type { case model.ActionCreateSchema: @@ -214,13 +217,15 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro case model.ActionRecoverTable: return b.applyRecoverTable(m, diff) case model.ActionCreateTables: - return b.applyCreateTables(m, diff) + return b.applyCreateTables(sess, m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangePartitionWithTable(sess, m, diff) default: - return b.applyDefaultAction(m, diff) + return b.applyDefaultAction(sess, m, diff) } } -func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyCreateTables(sess sessionctx.Context, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs := make([]int64, 0, len(diff.AffectedOpts)) if diff.AffectedOpts != nil { for _, opt := range diff.AffectedOpts { @@ -232,7 +237,7 @@ func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) + affectedIDs, err := b.ApplyDiff(sess, m, affectedDiff) if err != nil { return nil, errors.Trace(err) } @@ -242,6 +247,99 @@ func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int return tblIDs, nil } +func (b *Builder) applyExchangePartitionWithTable(sess sessionctx.Context, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + + logutil.BgLogger().Info("~~~~~~~~applyExchangePartitionWithTable start1") + tblIDs, err := b.applyTableUpdate(m, diff) + if err != nil { + return nil, errors.Trace(err) + } + + logutil.BgLogger().Info("~~~~~~~~applyExchangePartitionWithTable start2") + for _, opt := range diff.AffectedOpts { + affectedDiff := &model.SchemaDiff{ + Version: diff.Version, + Type: diff.Type, + SchemaID: opt.SchemaID, + TableID: opt.TableID, + OldSchemaID: opt.OldSchemaID, + OldTableID: opt.OldTableID, + } + affectedIDs, err := b.ApplyDiff(sess, m, affectedDiff) + if err != nil { + return nil, errors.Trace(err) + } + tblIDs = append(tblIDs, affectedIDs...) + + logutil.BgLogger().Info("++++++applyExchangePartitionWithTable start3") + + // handle partition table and table AutoID + //err = updateAutoIDForExchangePartition(m, affectedDiff.SchemaID, diff.OldTableID, diff.SchemaID, affectedDiff.TableID) + err = updateAutoIDForExchangePartition(sess, affectedDiff.SchemaID, diff.OldTableID, diff.SchemaID, diff.TableID) + // err = updateAutoIDForExchangePartition(m, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.OldTableID) + if err != nil { + return nil, errors.Trace(err) + } + } + + return tblIDs, nil +} + +func updateAutoIDForExchangePartition(sessCtx sessionctx.Context, ptSchemaID, ptID, ntSchemaID, ntID int64) error { + // partition table auto IDs. + sess := tmpsession.NewSession(sessCtx) + err := sess.Begin() + if err != nil { + return err + } + txn, err := sess.STxn() + if err != nil { + sess.Rollback() + return err + } + t := meta.NewMeta(txn) + if err != nil { + return err + } + + ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() + if err != nil { + return err + } + logutil.BgLogger().Info("~~~~~~~~updateAutoIDForExchangePartitio 1111") + + // non-partition table auto IDs. + ntAutoIDs, err := t.GetAutoIDAccessors(ntSchemaID, ntID).Get() + if err != nil { + return err + } + logutil.BgLogger().Info("~~~~~~~~updateAutoIDForExchangePartitio 222") + + // Set both tables to the maximum auto IDs between normal table and partitioned table. + newAutoIDs := meta.AutoIDGroup{ + RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), + IncrementID: mathutil.Max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID), + RandomID: mathutil.Max(ptAutoIDs.RandomID, ntAutoIDs.RandomID), + } + logutil.BgLogger().Info("~~~~~~~~updateAutoIDForExchangePartitio 3333") + err = t.GetAutoIDAccessors(ptSchemaID, ptID).Put(newAutoIDs) + if err != nil { + return err + } + logutil.BgLogger().Info("~~~~~~~~updateAutoIDForExchangePartitio 444") + + err = t.GetAutoIDAccessors(ntSchemaID, ntID).Put(newAutoIDs) + if err != nil { + return err + } + err = sess.Commit() + + logutil.BgLogger().Info("~~~~~~~~updateAutoIDForExchangePartition 555", zap.Int64("RowID", newAutoIDs.RowID), + zap.Int64("IncrementID", newAutoIDs.IncrementID), zap.Int64("RandomID", newAutoIDs.RandomID)) + + return nil +} + func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -287,7 +385,7 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int return tblIDs, nil } -func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func (b *Builder) applyDefaultAction(sess sessionctx.Context, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { return nil, errors.Trace(err) @@ -303,13 +401,15 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in OldSchemaID: opt.OldSchemaID, OldTableID: opt.OldTableID, } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) + affectedIDs, err := b.ApplyDiff(sess, m, affectedDiff) if err != nil { return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) } - + if diff.Type == model.ActionExchangeTablePartition { + logutil.BgLogger().Info("!!!!applyDefaultAction ActionExchangeTablePartition success") + } return tblIDs, nil } @@ -328,8 +428,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: - oldTableID = diff.OldTableID - newTableID = diff.TableID + oldTableID = diff.OldTableID // nt id (before change) + newTableID = diff.TableID // def id (before change) default: oldTableID = diff.TableID newTableID = diff.TableID @@ -529,8 +629,8 @@ func (b *Builder) applyModifySchemaDefaultPlacement(m *meta.Meta, diff *model.Sc return nil } -func (b *Builder) applyDropPolicy(PolicyID int64) []int64 { - po, ok := b.is.PolicyByID(PolicyID) +func (b *Builder) applyDropPolicy(policyID int64) []int64 { + po, ok := b.is.PolicyByID(policyID) if !ok { return nil } @@ -578,6 +678,8 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i if err != nil { return nil, errors.Trace(err) } + logutil.BgLogger().Info("~~~~~~~~applyCreateTable start1") + if tblInfo == nil { // When we apply an old schema diff, the table may has been dropped already, so we need to fall back to // full load. @@ -586,6 +688,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i fmt.Sprintf("(Table ID %d)", tableID), ) } + logutil.BgLogger().Info("~~~~~~~~applyCreateTable start2") switch tp { case model.ActionDropTablePartition: diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 04dc3c4395284..c20a9eae0ed33 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -16,6 +16,7 @@ package infoschema import ( "fmt" + "github.com/pingcap/tidb/infoschema/tmpbuild" "sort" "sync" @@ -114,7 +115,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { for _, tb := range tbList { tbl := table.MockTableFromMeta(tb) tableNames.tables[tb.Name.L] = tbl - bucketIdx := tableBucketIdx(tb.ID) + bucketIdx := tmpbuild.tableBucketIdx(tb.ID) result.sortedTablesBuckets[bucketIdx] = append(result.sortedTablesBuckets[bucketIdx], tbl) } for i := range result.sortedTablesBuckets { @@ -141,7 +142,7 @@ func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) Inf for _, tb := range tbList { tbl := table.MockTableFromMeta(tb) tableNames.tables[tb.Name.L] = tbl - bucketIdx := tableBucketIdx(tb.ID) + bucketIdx := tmpbuild.tableBucketIdx(tb.ID) result.sortedTablesBuckets[bucketIdx] = append(result.sortedTablesBuckets[bucketIdx], tbl) } for i := range result.sortedTablesBuckets { @@ -242,7 +243,7 @@ func (is *infoSchema) SchemaByTable(tableInfo *model.TableInfo) (val *model.DBIn } func (is *infoSchema) TableByID(id int64) (val table.Table, ok bool) { - slice := is.sortedTablesBuckets[tableBucketIdx(id)] + slice := is.sortedTablesBuckets[tmpbuild.tableBucketIdx(id)] idx := slice.searchTable(id) if idx == -1 { return nil, false diff --git a/infoschema/perfschema/init.go b/infoschema/perfschema/init.go index 05d486dbcf1c0..1c9eedd8bd8f4 100644 --- a/infoschema/perfschema/init.go +++ b/infoschema/perfschema/init.go @@ -16,11 +16,11 @@ package perfschema import ( "fmt" + "github.com/pingcap/tidb/infoschema" "sync" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" diff --git a/sessiontxn/internal/txn.go b/sessiontxn/internal/txn.go index 00db4561b979b..7fec75a0a9cf2 100644 --- a/sessiontxn/internal/txn.go +++ b/sessiontxn/internal/txn.go @@ -16,7 +16,6 @@ package internal import ( "context" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" diff --git a/structure/hash.go b/structure/hash.go index c92617efbed32..d7de921df4972 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -17,6 +17,7 @@ package structure import ( "bytes" "context" + "github.com/pingcap/tidb/util/logutil" "strconv" "github.com/pingcap/errors" @@ -31,9 +32,12 @@ type HashPair struct { // HSet sets the string value of a hash field. func (t *TxStructure) HSet(key []byte, field []byte, value []byte) error { + logutil.BgLogger().Info("~~~~~~~~HSet 1111") if t.readWriter == nil { return ErrWriteOnSnapshot } + logutil.BgLogger().Info("~~~~~~~~HSet 2222") + return t.updateHash(key, field, func([]byte) ([]byte, error) { return value, nil }) diff --git a/tmpsession/session.go b/tmpsession/session.go new file mode 100644 index 0000000000000..111b008bbdceb --- /dev/null +++ b/tmpsession/session.go @@ -0,0 +1,103 @@ +package tmpsession + +import ( + "context" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" + "time" +) + +// session wraps sessionctx.Context for transaction usage. +type session struct { + sessionctx.Context +} + +func NewSession(s sessionctx.Context) *session { + return &session{s} +} + +func newSession(s sessionctx.Context) *session { + return &session{s} +} + +func (s *session) Begin() error { + err := sessiontxn.NewTxn(context.Background(), s) + if err != nil { + return err + } + s.GetSessionVars().SetInTxn(true) + return nil +} + +func (s *session) begin() error { + err := sessiontxn.NewTxn(context.Background(), s) + if err != nil { + return err + } + s.GetSessionVars().SetInTxn(true) + return nil +} + +func (s *session) Commit() error { + s.StmtCommit() + return s.CommitTxn(context.Background()) +} + +func (s *session) commit() error { + s.StmtCommit() + return s.CommitTxn(context.Background()) +} + +func (s *session) STxn() (kv.Transaction, error) { + return s.Txn(true) +} + +func (s *session) txn() (kv.Transaction, error) { + return s.Txn(true) +} + +func (s *session) Rollback() { + s.StmtRollback() + s.RollbackTxn(context.Background()) +} + +func (s *session) rollback() { + s.StmtRollback() + s.RollbackTxn(context.Background()) +} + +func (s *session) reset() { + s.StmtRollback() +} + +func (s *session) execute(ctx context.Context, query string, label string) ([]chunk.Row, error) { + startTime := time.Now() + var err error + defer func() { + metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) + }() + rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query) + if err != nil { + return nil, errors.Trace(err) + } + + if rs == nil { + return nil, nil + } + var rows []chunk.Row + defer terror.Call(rs.Close) + if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil { + return nil, errors.Trace(err) + } + return rows, nil +} + +func (s *session) session() sessionctx.Context { + return s.Context +}