Skip to content

Commit

Permalink
sync to 1.1: restore the switched account id when mo_table_size and m…
Browse files Browse the repository at this point in the history
…o_table_rows. (#14741)
  • Loading branch information
gouhongshen committed Mar 4, 2024
1 parent 552618d commit 5b2c4b7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 142 deletions.
57 changes: 38 additions & 19 deletions pkg/sql/plan/function/func_mo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// Mo functions are better tested with bvt.

// MoTableRows returns an estimated row number of a table.
func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) error {
func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) (err error) {
rs := vector.MustFunctionResult[int64](result)
dbs := vector.GenerateFunctionStrParameter(ivecs[0])
tbls := vector.GenerateFunctionStrParameter(ivecs[1])
Expand All @@ -52,8 +52,15 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr
}
txn := proc.TxnOperator

var accountId uint32
var accSwitched bool
// XXX old code starts a new transaction. why?
for i := uint64(0); i < uint64(length); i++ {
if accSwitched {
accSwitched = false
proc.Ctx = defines.AttachAccountId(proc.Ctx, accountId)
}

db, dbnull := dbs.GetStrValue(i)
tbl, tblnull := tbls.GetStrValue(i)
if dbnull || tblnull {
Expand All @@ -67,21 +74,20 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr

if isClusterTable(dbStr, tblStr) {
//if it is the cluster table in the general account, switch into the sys account
accountId, err := defines.GetAccountId(proc.Ctx)
accountId, err = defines.GetAccountId(proc.Ctx)
if err != nil {
return err
}
if accountId != uint32(sysAccountID) {
accSwitched = true
proc.Ctx = defines.AttachAccountId(proc.Ctx, uint32(sysAccountID))
}
}
ctx := proc.Ctx
dbo, err := e.Database(ctx, dbStr, txn)
var dbo engine.Database
dbo, err = e.Database(ctx, dbStr, txn)
if err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
if DebugGetDatabaseExpectedEOB != nil {
DebugGetDatabaseExpectedEOB("MoTableRows", proc)
}
return moerr.NewInvalidArgNoCtx("db not found when mo_table_rows", dbStr)
}
return err
Expand All @@ -92,7 +98,8 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr
}

// get the table definition information and check whether the current table is a partition table
engineDefs, err := rel.TableDefs(ctx)
var engineDefs []engine.TableDef
engineDefs, err = rel.TableDefs(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,12 +156,8 @@ func MoTableRows(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr
return nil
}

// TODO(ghs)
// is debug for #13151, will remove later
var DebugGetDatabaseExpectedEOB func(caller string, proc *process.Process)

// MoTableSize returns an estimated size of a table.
func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) error {
func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int) (err error) {
rs := vector.MustFunctionResult[int64](result)
dbs := vector.GenerateFunctionStrParameter(ivecs[0])
tbls := vector.GenerateFunctionStrParameter(ivecs[1])
Expand All @@ -165,12 +168,27 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr
}
txn := proc.TxnOperator

var accountId uint32
var accSwitched bool
// XXX old code starts a new transaction. why?
for i := uint64(0); i < uint64(length); i++ {
if accSwitched {
// consider this situation:
// if a sql trys to gather all table's size in one query,
// this will traverse all tables, including the cluster table, belongs
// this account.
// but if these tables in `tbls` has orders: xxx, cluster table, xxx, xxx, xxx.
// the account id stored in proc.Ctx will be changed to system account id when process that cluster
// table, and causing the last three tables can not be found when call the `engine.Database()`.
// so should be first to switch bach the right account id here.
accSwitched = false
proc.Ctx = defines.AttachAccountId(proc.Ctx, accountId)
}

db, dbnull := dbs.GetStrValue(i)
tbl, tblnull := tbls.GetStrValue(i)
if dbnull || tblnull {
if err := rs.Append(0, true); err != nil {
if err = rs.Append(0, true); err != nil {
return err
}
} else {
Expand All @@ -180,21 +198,21 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr

if isClusterTable(dbStr, tblStr) {
//if it is the cluster table in the general account, switch into the sys account
accountId, err := defines.GetAccountId(proc.Ctx)
accountId, err = defines.GetAccountId(proc.Ctx)
if err != nil {
return err
}
if accountId != uint32(sysAccountID) {
accSwitched = true
proc.Ctx = defines.AttachAccountId(proc.Ctx, uint32(sysAccountID))
}
}
ctx := proc.Ctx
dbo, err := e.Database(ctx, dbStr, txn)

var dbo engine.Database
dbo, err = e.Database(ctx, dbStr, txn)
if err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
if DebugGetDatabaseExpectedEOB != nil {
DebugGetDatabaseExpectedEOB("MoTableSize", proc)
}
return moerr.NewInvalidArgNoCtx("db not found when mo_table_size", dbStr)
}
return err
Expand All @@ -205,7 +223,8 @@ func MoTableSize(ivecs []*vector.Vector, result vector.FunctionResultWrapper, pr
}

// get the table definition information and check whether the current table is a partition table
engineDefs, err := rel.TableDefs(ctx)
var engineDefs []engine.TableDef
engineDefs, err = rel.TableDefs(ctx)
if err != nil {
return err
}
Expand Down
64 changes: 0 additions & 64 deletions pkg/vm/engine/disttae/cache/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,67 +774,3 @@ func getTableDef(tblItem *TableItem, coldefs []engine.TableDef) *plan.TableDef {
Version: tblItem.Version,
}
}

// TODO(ghs)
// is debug for #13151, will remove later
func (cc *CatalogCache) TraverseDbAndTbl() (dbs []DebugDatabaseItem, tbls []DebugTableItem) {
dbs = cc.databases.TraverseDatabaseCache()
tbls = cc.tables.TraverseTableCache()
return
}

type DebugTableItem struct {
AccountId uint32
DatabaseId uint64
Name string
Ts timestamp.Timestamp
Id uint64
Deleted bool
}

func (dt *DebugTableItem) String() string {
return fmt.Sprintf("%d-%d-%d-%s-%v-%s",
dt.AccountId, dt.DatabaseId, dt.Id, dt.Name, dt.Deleted, types.TimestampToTS(dt.Ts).ToString())
}

type DebugDatabaseItem struct {
AccountId uint32
Name string
Ts timestamp.Timestamp
Id uint64
Deleted bool
}

func (dd *DebugDatabaseItem) String() string {
return fmt.Sprintf("%d-%d-%s-%v-%s",
dd.AccountId, dd.Id, dd.Name, dd.Deleted, types.TimestampToTS(dd.Ts).ToString())
}

func (c *tableCache) TraverseTableCache() (ret []DebugTableItem) {
c.data.Scan(func(tblItem *TableItem) bool {
ret = append(ret, DebugTableItem{
Ts: tblItem.Ts,
Id: tblItem.Id,
Name: tblItem.Name,
Deleted: tblItem.deleted,
AccountId: tblItem.AccountId,
DatabaseId: tblItem.DatabaseId,
})
return true
})
return
}

func (t *databaseCache) TraverseDatabaseCache() (ret []DebugDatabaseItem) {
t.data.Scan(func(dbItem *DatabaseItem) bool {
ret = append(ret, DebugDatabaseItem{
Ts: dbItem.Ts,
Id: dbItem.Id,
Name: dbItem.Name,
Deleted: dbItem.deleted,
AccountId: dbItem.AccountId,
})
return true
})
return
}
59 changes: 0 additions & 59 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
package disttae

import (
"bytes"
"context"
"fmt"
"runtime"
"sort"
"strings"
"sync"
"time"
Expand All @@ -42,7 +39,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
txn2 "github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util/errutil"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
Expand Down Expand Up @@ -105,64 +101,9 @@ func New(
panic(err)
}

// TODO(ghs)
// is debug for #13151, will remove later
e.enableDebug()

return e
}

func (e *Engine) enableDebug() {
function.DebugGetDatabaseExpectedEOB = func(caller string, proc *process.Process) {
txnState := fmt.Sprintf("account-%s, accId-%d, user-%s, role-%s, timezone-%s, txn-%s",
proc.SessionInfo.Account, proc.SessionInfo.AccountId,
proc.SessionInfo.User, proc.SessionInfo.Role,
proc.SessionInfo.TimeZone.String(),
proc.TxnOperator.Txn().DebugString(),
)
e.DebugGetDatabaseExpectedEOB(caller, txnState)
}
}

func (e *Engine) DebugGetDatabaseExpectedEOB(caller string, txnState string) {
dbs, tbls := e.catalog.TraverseDbAndTbl()
sort.Slice(dbs, func(i, j int) bool {
if dbs[i].AccountId != dbs[j].AccountId {
return dbs[i].AccountId < dbs[j].AccountId
}
return dbs[i].Id < dbs[j].Id
})

sort.Slice(tbls, func(i, j int) bool {
if tbls[i].AccountId != tbls[j].AccountId {
return tbls[i].AccountId < tbls[j].AccountId
}

if tbls[i].DatabaseId != tbls[j].DatabaseId {
return tbls[i].DatabaseId < tbls[j].DatabaseId
}

return tbls[i].Id < tbls[j].Id
})

var out bytes.Buffer
tIdx := 0
for idx := range dbs {
out.WriteString(fmt.Sprintf("%s\n", dbs[idx].String()))
for ; tIdx < len(tbls); tIdx++ {
if tbls[tIdx].AccountId != dbs[idx].AccountId ||
tbls[tIdx].DatabaseId != dbs[idx].Id {
break
}

out.WriteString(fmt.Sprintf("\t%s\n", tbls[tIdx].String()))
}
}

logutil.Infof("%s.Database got ExpectEOB, current txn state: \n%s\n%s",
caller, txnState, out.String())
}

func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error {
txn := e.getTransaction(op)
if txn == nil {
Expand Down

0 comments on commit 5b2c4b7

Please sign in to comment.