Skip to content

Commit

Permalink
planner/infoschema: add predicate pushdown for system tables (#50779)
Browse files Browse the repository at this point in the history
ref #50305
  • Loading branch information
ywqzzy committed Feb 1, 2024
1 parent b69e5e3 commit efe8523
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 30 deletions.
111 changes: 88 additions & 23 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,18 @@ func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas

func (e *memtableRetriever) setDataForStatistics(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
Expand Down Expand Up @@ -459,8 +469,18 @@ func (e *memtableRetriever) setDataForStatisticsInTable(schema *model.DBInfo, ta
func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schemas []*model.DBInfo) error {
checker := privilege.GetPrivilegeManager(sctx)
var rows [][]types.Datum
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return nil
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if !table.IsBaseTable() {
continue
}
Expand Down Expand Up @@ -496,12 +516,51 @@ func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schem
return nil
}

func fetchColumnsFromStatsCache(table *model.TableInfo) (rowCount uint64, avgRowLength uint64, dataLength uint64, indexLength uint64) {
cache := cache.TableRowStatsCache
if table.GetPartitionInfo() == nil {
rowCount = cache.GetTableRows(table.ID)
dataLength, indexLength = cache.GetDataAndIndexLength(table, table.ID, rowCount)
} else {
for _, pi := range table.GetPartitionInfo().Definitions {
piRowCnt := cache.GetTableRows(pi.ID)
rowCount += piRowCnt
parDataLen, parIndexLen := cache.GetDataAndIndexLength(table, pi.ID, piRowCnt)
dataLength += parDataLen
indexLength += parIndexLen
}
}
avgRowLength = uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}

if table.IsSequence() {
// sequence is always 1 row regardless of stats.
rowCount = 1
}
return
}

func (e *memtableRetriever) updateStatsCacheIfNeed(sctx sessionctx.Context) (bool, error) {
for _, col := range e.columns {
// only the following columns need stats cahce.
if col.Name.O == "AVG_ROW_LENGTH" || col.Name.O == "DATA_LENGTH" || col.Name.O == "INDEX_LENGTH" || col.Name.O == "TABLE_ROWS" {
err := cache.TableRowStatsCache.Update(sctx)
if err != nil {
return false, err
}
return true, err
}
}
return false, nil
}

func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas []*model.DBInfo) error {
err := cache.TableRowStatsCache.Update(sctx)
useStatsCache, err := e.updateStatsCacheIfNeed(sctx)
if err != nil {
return err
}

checker := privilege.GetPrivilegeManager(sctx)

var rows [][]types.Datum
Expand All @@ -510,8 +569,18 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
if loc == nil {
loc = time.Local
}
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return nil
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
collation := table.Collate
if collation == "" {
collation = mysql.DefaultCollationName
Expand All @@ -530,6 +599,7 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
} else if table.TableCacheStatusType == model.TableCacheStatusEnable {
createOptions = "cached=on"
}
var err error
var autoIncID any
hasAutoIncID, _ := infoschema.HasAutoIncrementColumn(table)
if hasAutoIncID {
Expand All @@ -538,33 +608,12 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
return err
}
}

cache := cache.TableRowStatsCache
var rowCount, dataLength, indexLength uint64
if table.GetPartitionInfo() == nil {
rowCount = cache.GetTableRows(table.ID)
dataLength, indexLength = cache.GetDataAndIndexLength(table, table.ID, rowCount)
} else {
for _, pi := range table.GetPartitionInfo().Definitions {
piRowCnt := cache.GetTableRows(pi.ID)
rowCount += piRowCnt
parDataLen, parIndexLen := cache.GetDataAndIndexLength(table, pi.ID, piRowCnt)
dataLength += parDataLen
indexLength += parIndexLen
}
}
avgRowLength := uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}
tableType := "BASE TABLE"
if util.IsSystemView(schema.Name.L) {
tableType = "SYSTEM VIEW"
}
if table.IsSequence() {
tableType = "SEQUENCE"
// sequence is always 1 row regardless of stats.
rowCount = 1
}
if table.HasClusteredIndex() {
pkType = "CLUSTERED"
Expand All @@ -574,6 +623,12 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
if table.PlacementPolicyRef != nil {
policyName = table.PlacementPolicyRef.Name.O
}

var rowCount, avgRowLength, dataLength, indexLength uint64
if useStatsCache {
rowCount, avgRowLength, dataLength, indexLength = fetchColumnsFromStatsCache(table)
}

record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
Expand Down Expand Up @@ -1467,8 +1522,18 @@ func (e *memtableRetriever) dataForTiDBClusterInfo(ctx sessionctx.Context) error
func (e *memtableRetriever) setDataFromKeyColumnUsage(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
rows := make([][]types.Datum, 0, len(schemas)) // The capacity is not accurate, but it is not a big problem.
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5334,6 +5334,11 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.Extractor = &TikvRegionPeersExtractor{}
case infoschema.TableColumns:
p.Extractor = &ColumnsTableExtractor{}
case infoschema.TableTables,
infoschema.TableReferConst,
infoschema.TableKeyColumn,
infoschema.TableStatistics:
p.Extractor = &InfoSchemaTablesExtractor{}
case infoschema.TableTiKVRegionStatus:
p.Extractor = &TiKVRegionStatusExtractor{tablesID: make([]int64, 0)}
}
Expand Down
68 changes: 68 additions & 0 deletions pkg/planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"regexp"
"slices"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -1676,3 +1677,70 @@ func (e *TiKVRegionStatusExtractor) explainInfo(_ *PhysicalMemTable) string {
func (e *TiKVRegionStatusExtractor) GetTablesID() []int64 {
return e.tablesID
}

// InfoSchemaTablesExtractor is used to extract infoSchema tables related predicates.
type InfoSchemaTablesExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool

colNames []string
ColPredicates map[string]set.StringSet
}

// Extract implements the MemTablePredicateExtractor Extract interface
func (e *InfoSchemaTablesExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
var resultSet set.StringSet
e.colNames = []string{"table_schema", "table_name"}
e.ColPredicates = make(map[string]set.StringSet)
remained = predicates
for _, colName := range e.colNames {
remained, e.SkipRequest, resultSet = e.extractCol(schema, names, remained, colName, true)
e.ColPredicates[colName] = resultSet
if e.SkipRequest {
break
}
}
return remained
}

func (e *InfoSchemaTablesExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
colNames := make([]string, 0, len(e.ColPredicates))
for colName := range e.ColPredicates {
colNames = append(colNames, colName)
}
sort.Strings(colNames)
for _, colName := range colNames {
if len(e.ColPredicates[colName]) > 0 {
fmt.Fprintf(r, "%s:[%s], ", colName, extractStringFromStringSet(e.ColPredicates[colName]))
}
}

// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}

// Filter use the col predicates to filter records.
func (e *InfoSchemaTablesExtractor) Filter(colName string, val string) bool {
if e.SkipRequest {
return true
}
predVals, ok := e.ColPredicates[colName]
if ok && len(predVals) > 0 {
return !predVals.Exist(val)
}
// No need to filter records since no predicate for the column exists.
return false
}
92 changes: 92 additions & 0 deletions pkg/planner/core/memtable_predicate_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,3 +1810,95 @@ func TestExtractorInPreparedStmt(t *testing.T) {
ca.checker(extractor)
}
}

func TestInformSchemaTableExtract(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

se, err := session.CreateSession4Test(store)
require.NoError(t, err)
var cases = []struct {
sql string
skipRequest bool
colPredicates map[string]set.StringSet
}{
{
sql: `select * from INFORMATION_SCHEMA.TABLES where table_name='T';`,
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t"),
"table_schema": set.NewStringSet(),
},
},
{
sql: `select * from INFORMATION_SCHEMA.TABLES where table_schema='TS';`,
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet(),
"table_schema": set.NewStringSet("ts"),
},
},
{
sql: "select * from information_schema.TABLES where table_name in ('TEST','t') and table_schema in ('A','b')",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("test", "t"),
"table_schema": set.NewStringSet("a", "b"),
},
},
{
sql: "select * from information_schema.TABLES where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.REFERENTIAL_CONSTRAINTS where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.KEY_COLUMN_USAGE where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' and table_name ='A'",
skipRequest: true,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet(),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' and table_schema ='A' and table_schema = 'b'",
skipRequest: true,
colPredicates: map[string]set.StringSet{
"table_schema": set.NewStringSet(),
},
},
}
parser := parser.New()
for _, ca := range cases {
logicalMemTable := getLogicalMemTable(t, dom, se, parser, ca.sql)
require.NotNil(t, logicalMemTable.Extractor)
columnsTableExtractor := logicalMemTable.Extractor.(*plannercore.InfoSchemaTablesExtractor)
require.Equal(t, ca.skipRequest, columnsTableExtractor.SkipRequest, "SQL: %v", ca.sql)
require.Equal(t, ca.colPredicates, columnsTableExtractor.ColPredicates, "SQL: %v", ca.sql)
}
}
3 changes: 2 additions & 1 deletion pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func (p *LogicalMemTable) PruneColumns(parentUsedCols []*expression.Column, opt
infoschema.ClusterTableTiDBTrx,
infoschema.TableDataLockWaits,
infoschema.TableDeadlocks,
infoschema.ClusterTableDeadlocks:
infoschema.ClusterTableDeadlocks,
infoschema.TableTables:
default:
return nil
}
Expand Down

0 comments on commit efe8523

Please sign in to comment.