From 5b2c4b71afd75de47f0c990280fc347bfc1cdc10 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Mon, 4 Mar 2024 13:21:37 +0800 Subject: [PATCH] sync to 1.1: restore the switched account id when mo_table_size and mo_table_rows. (#14741) --- pkg/sql/plan/function/func_mo.go | 57 +++++++++++++++-------- pkg/vm/engine/disttae/cache/catalog.go | 64 -------------------------- pkg/vm/engine/disttae/engine.go | 59 ------------------------ 3 files changed, 38 insertions(+), 142 deletions(-) diff --git a/pkg/sql/plan/function/func_mo.go b/pkg/sql/plan/function/func_mo.go index 37dad91eda2c..42e695fcddae 100644 --- a/pkg/sql/plan/function/func_mo.go +++ b/pkg/sql/plan/function/func_mo.go @@ -40,7 +40,7 @@ const ( // Mo functions are better tested with bvt. // MoTableRows returns an estimated row number of a table. -func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) error { +func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) (err error) { rs := vector.MustFunctionResult[int64](result) dbs := vector.GenerateFunctionStrParameter(ivecs[0]) tbls := vector.GenerateFunctionStrParameter(ivecs[1]) @@ -52,8 +52,15 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr } txn := proc.TxnOperator + var accountId uint32 + var accSwitched bool // XXX old code starts a new transaction. why? for i := uint64(0); i < uint64(length); i++ { + if accSwitched { + accSwitched = false + proc.Ctx = defines.AttachAccountId(proc.Ctx, accountId) + } + db, dbnull := dbs.GetStrValue(i) tbl, tblnull := tbls.GetStrValue(i) if dbnull || tblnull { @@ -67,21 +74,20 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr if isClusterTable(dbStr, tblStr) { //if it is the cluster table in the general account, switch into the sys account - accountId, err := defines.GetAccountId(proc.Ctx) + accountId, err = defines.GetAccountId(proc.Ctx) if err != nil { return err } if accountId != uint32(sysAccountID) { + accSwitched = true proc.Ctx = defines.AttachAccountId(proc.Ctx, uint32(sysAccountID)) } } ctx := proc.Ctx - dbo, err := e.Database(ctx, dbStr, txn) + var dbo engine.Database + dbo, err = e.Database(ctx, dbStr, txn) if err != nil { if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) { - if DebugGetDatabaseExpectedEOB != nil { - DebugGetDatabaseExpectedEOB("MoTableRows", proc) - } return moerr.NewInvalidArgNoCtx("db not found when mo_table_rows", dbStr) } return err @@ -92,7 +98,8 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr } // get the table definition information and check whether the current table is a partition table - engineDefs, err := rel.TableDefs(ctx) + var engineDefs []engine.TableDef + engineDefs, err = rel.TableDefs(ctx) if err != nil { return err } @@ -149,12 +156,8 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr return nil } -// TODO(ghs) -// is debug for #13151, will remove later -var DebugGetDatabaseExpectedEOB func(caller string, proc *process.Process) - // MoTableSize returns an estimated size of a table. -func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) error { +func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) (err error) { rs := vector.MustFunctionResult[int64](result) dbs := vector.GenerateFunctionStrParameter(ivecs[0]) tbls := vector.GenerateFunctionStrParameter(ivecs[1]) @@ -165,12 +168,27 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr } txn := proc.TxnOperator + var accountId uint32 + var accSwitched bool // XXX old code starts a new transaction. why? for i := uint64(0); i < uint64(length); i++ { + if accSwitched { + // consider this situation: + // if a sql trys to gather all table's size in one query, + // this will traverse all tables, including the cluster table, belongs + // this account. + // but if these tables in `tbls` has orders: xxx, cluster table, xxx, xxx, xxx. + // the account id stored in proc.Ctx will be changed to system account id when process that cluster + // table, and causing the last three tables can not be found when call the `engine.Database()`. + // so should be first to switch bach the right account id here. + accSwitched = false + proc.Ctx = defines.AttachAccountId(proc.Ctx, accountId) + } + db, dbnull := dbs.GetStrValue(i) tbl, tblnull := tbls.GetStrValue(i) if dbnull || tblnull { - if err := rs.Append(0, true); err != nil { + if err = rs.Append(0, true); err != nil { return err } } else { @@ -180,21 +198,21 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr if isClusterTable(dbStr, tblStr) { //if it is the cluster table in the general account, switch into the sys account - accountId, err := defines.GetAccountId(proc.Ctx) + accountId, err = defines.GetAccountId(proc.Ctx) if err != nil { return err } if accountId != uint32(sysAccountID) { + accSwitched = true proc.Ctx = defines.AttachAccountId(proc.Ctx, uint32(sysAccountID)) } } ctx := proc.Ctx - dbo, err := e.Database(ctx, dbStr, txn) + + var dbo engine.Database + dbo, err = e.Database(ctx, dbStr, txn) if err != nil { if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) { - if DebugGetDatabaseExpectedEOB != nil { - DebugGetDatabaseExpectedEOB("MoTableSize", proc) - } return moerr.NewInvalidArgNoCtx("db not found when mo_table_size", dbStr) } return err @@ -205,7 +223,8 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr } // get the table definition information and check whether the current table is a partition table - engineDefs, err := rel.TableDefs(ctx) + var engineDefs []engine.TableDef + engineDefs, err = rel.TableDefs(ctx) if err != nil { return err } diff --git a/pkg/vm/engine/disttae/cache/catalog.go b/pkg/vm/engine/disttae/cache/catalog.go index b31fd71876dd..955b09f36139 100644 --- a/pkg/vm/engine/disttae/cache/catalog.go +++ b/pkg/vm/engine/disttae/cache/catalog.go @@ -774,67 +774,3 @@ func getTableDef(tblItem *TableItem, coldefs []engine.TableDef) *plan.TableDef { Version: tblItem.Version, } } - -// TODO(ghs) -// is debug for #13151, will remove later -func (cc *CatalogCache) TraverseDbAndTbl() (dbs []DebugDatabaseItem, tbls []DebugTableItem) { - dbs = cc.databases.TraverseDatabaseCache() - tbls = cc.tables.TraverseTableCache() - return -} - -type DebugTableItem struct { - AccountId uint32 - DatabaseId uint64 - Name string - Ts timestamp.Timestamp - Id uint64 - Deleted bool -} - -func (dt *DebugTableItem) String() string { - return fmt.Sprintf("%d-%d-%d-%s-%v-%s", - dt.AccountId, dt.DatabaseId, dt.Id, dt.Name, dt.Deleted, types.TimestampToTS(dt.Ts).ToString()) -} - -type DebugDatabaseItem struct { - AccountId uint32 - Name string - Ts timestamp.Timestamp - Id uint64 - Deleted bool -} - -func (dd *DebugDatabaseItem) String() string { - return fmt.Sprintf("%d-%d-%s-%v-%s", - dd.AccountId, dd.Id, dd.Name, dd.Deleted, types.TimestampToTS(dd.Ts).ToString()) -} - -func (c *tableCache) TraverseTableCache() (ret []DebugTableItem) { - c.data.Scan(func(tblItem *TableItem) bool { - ret = append(ret, DebugTableItem{ - Ts: tblItem.Ts, - Id: tblItem.Id, - Name: tblItem.Name, - Deleted: tblItem.deleted, - AccountId: tblItem.AccountId, - DatabaseId: tblItem.DatabaseId, - }) - return true - }) - return -} - -func (t *databaseCache) TraverseDatabaseCache() (ret []DebugDatabaseItem) { - t.data.Scan(func(dbItem *DatabaseItem) bool { - ret = append(ret, DebugDatabaseItem{ - Ts: dbItem.Ts, - Id: dbItem.Id, - Name: dbItem.Name, - Deleted: dbItem.deleted, - AccountId: dbItem.AccountId, - }) - return true - }) - return -} diff --git a/pkg/vm/engine/disttae/engine.go b/pkg/vm/engine/disttae/engine.go index 8f208c17201e..d81441a038fd 100644 --- a/pkg/vm/engine/disttae/engine.go +++ b/pkg/vm/engine/disttae/engine.go @@ -15,11 +15,8 @@ package disttae import ( - "bytes" "context" - "fmt" "runtime" - "sort" "strings" "sync" "time" @@ -42,7 +39,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/timestamp" txn2 "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/sql/colexec" - "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/util/errutil" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -105,64 +101,9 @@ func New( panic(err) } - // TODO(ghs) - // is debug for #13151, will remove later - e.enableDebug() - return e } -func (e *Engine) enableDebug() { - function.DebugGetDatabaseExpectedEOB = func(caller string, proc *process.Process) { - txnState := fmt.Sprintf("account-%s, accId-%d, user-%s, role-%s, timezone-%s, txn-%s", - proc.SessionInfo.Account, proc.SessionInfo.AccountId, - proc.SessionInfo.User, proc.SessionInfo.Role, - proc.SessionInfo.TimeZone.String(), - proc.TxnOperator.Txn().DebugString(), - ) - e.DebugGetDatabaseExpectedEOB(caller, txnState) - } -} - -func (e *Engine) DebugGetDatabaseExpectedEOB(caller string, txnState string) { - dbs, tbls := e.catalog.TraverseDbAndTbl() - sort.Slice(dbs, func(i, j int) bool { - if dbs[i].AccountId != dbs[j].AccountId { - return dbs[i].AccountId < dbs[j].AccountId - } - return dbs[i].Id < dbs[j].Id - }) - - sort.Slice(tbls, func(i, j int) bool { - if tbls[i].AccountId != tbls[j].AccountId { - return tbls[i].AccountId < tbls[j].AccountId - } - - if tbls[i].DatabaseId != tbls[j].DatabaseId { - return tbls[i].DatabaseId < tbls[j].DatabaseId - } - - return tbls[i].Id < tbls[j].Id - }) - - var out bytes.Buffer - tIdx := 0 - for idx := range dbs { - out.WriteString(fmt.Sprintf("%s\n", dbs[idx].String())) - for ; tIdx < len(tbls); tIdx++ { - if tbls[tIdx].AccountId != dbs[idx].AccountId || - tbls[tIdx].DatabaseId != dbs[idx].Id { - break - } - - out.WriteString(fmt.Sprintf("\t%s\n", tbls[tIdx].String())) - } - } - - logutil.Infof("%s.Database got ExpectEOB, current txn state: \n%s\n%s", - caller, txnState, out.String()) -} - func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error { txn := e.getTransaction(op) if txn == nil {