Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/infoschema: add predicate pushdown for system tables #50779

Merged
merged 21 commits into from
Feb 1, 2024
111 changes: 88 additions & 23 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,18 @@ func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas

func (e *memtableRetriever) setDataForStatistics(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
Expand Down Expand Up @@ -459,8 +469,18 @@ func (e *memtableRetriever) setDataForStatisticsInTable(schema *model.DBInfo, ta
func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schemas []*model.DBInfo) error {
checker := privilege.GetPrivilegeManager(sctx)
var rows [][]types.Datum
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return nil
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if !table.IsBaseTable() {
continue
}
Expand Down Expand Up @@ -496,12 +516,51 @@ func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schem
return nil
}

func fetchColumnsFromStatsCache(table *model.TableInfo) (rowCount uint64, avgRowLength uint64, dataLength uint64, indexLength uint64) {
cache := cache.TableRowStatsCache
if table.GetPartitionInfo() == nil {
rowCount = cache.GetTableRows(table.ID)
dataLength, indexLength = cache.GetDataAndIndexLength(table, table.ID, rowCount)
} else {
for _, pi := range table.GetPartitionInfo().Definitions {
piRowCnt := cache.GetTableRows(pi.ID)
rowCount += piRowCnt
parDataLen, parIndexLen := cache.GetDataAndIndexLength(table, pi.ID, piRowCnt)
dataLength += parDataLen
indexLength += parIndexLen
}
}
avgRowLength = uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}

if table.IsSequence() {
// sequence is always 1 row regardless of stats.
rowCount = 1
}
return
}

func (e *memtableRetriever) updateStatsCacheIfNeed(sctx sessionctx.Context) (bool, error) {
for _, col := range e.columns {
// only the following columns need stats cahce.
if col.Name.O == "AVG_ROW_LENGTH" || col.Name.O == "DATA_LENGTH" || col.Name.O == "INDEX_LENGTH" || col.Name.O == "TABLE_ROWS" {
err := cache.TableRowStatsCache.Update(sctx)
if err != nil {
return false, err
}
return true, err
}
}
return false, nil
}

func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas []*model.DBInfo) error {
err := cache.TableRowStatsCache.Update(sctx)
useStatsCache, err := e.updateStatsCacheIfNeed(sctx)
if err != nil {
return err
}

checker := privilege.GetPrivilegeManager(sctx)

var rows [][]types.Datum
Expand All @@ -510,8 +569,18 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
if loc == nil {
loc = time.Local
}
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return nil
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
collation := table.Collate
if collation == "" {
collation = mysql.DefaultCollationName
Expand All @@ -530,6 +599,7 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
} else if table.TableCacheStatusType == model.TableCacheStatusEnable {
createOptions = "cached=on"
}
var err error
var autoIncID any
hasAutoIncID, _ := infoschema.HasAutoIncrementColumn(table)
if hasAutoIncID {
Expand All @@ -538,33 +608,12 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
return err
}
}

cache := cache.TableRowStatsCache
var rowCount, dataLength, indexLength uint64
if table.GetPartitionInfo() == nil {
rowCount = cache.GetTableRows(table.ID)
dataLength, indexLength = cache.GetDataAndIndexLength(table, table.ID, rowCount)
} else {
for _, pi := range table.GetPartitionInfo().Definitions {
piRowCnt := cache.GetTableRows(pi.ID)
rowCount += piRowCnt
parDataLen, parIndexLen := cache.GetDataAndIndexLength(table, pi.ID, piRowCnt)
dataLength += parDataLen
indexLength += parIndexLen
}
}
avgRowLength := uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}
tableType := "BASE TABLE"
if util.IsSystemView(schema.Name.L) {
tableType = "SYSTEM VIEW"
}
if table.IsSequence() {
tableType = "SEQUENCE"
// sequence is always 1 row regardless of stats.
rowCount = 1
}
if table.HasClusteredIndex() {
pkType = "CLUSTERED"
Expand All @@ -574,6 +623,12 @@ func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas [
if table.PlacementPolicyRef != nil {
policyName = table.PlacementPolicyRef.Name.O
}

var rowCount, avgRowLength, dataLength, indexLength uint64
if useStatsCache {
rowCount, avgRowLength, dataLength, indexLength = fetchColumnsFromStatsCache(table)
}

record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
Expand Down Expand Up @@ -1467,8 +1522,18 @@ func (e *memtableRetriever) dataForTiDBClusterInfo(ctx sessionctx.Context) error
func (e *memtableRetriever) setDataFromKeyColumnUsage(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
rows := make([][]types.Datum, 0, len(schemas)) // The capacity is not accurate, but it is not a big problem.
extractor, ok := e.extractor.(*plannercore.InfoSchemaTablesExtractor)
if ok && extractor.SkipRequest {
return
}
for _, schema := range schemas {
if ok && extractor.Filter("table_schema", schema.Name.L) {
continue
}
for _, table := range schema.Tables {
if ok && extractor.Filter("table_name", table.Name.L) {
continue
}
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5334,6 +5334,11 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.Extractor = &TikvRegionPeersExtractor{}
case infoschema.TableColumns:
p.Extractor = &ColumnsTableExtractor{}
case infoschema.TableTables,
infoschema.TableReferConst,
infoschema.TableKeyColumn,
infoschema.TableStatistics:
p.Extractor = &InfoSchemaTablesExtractor{}
case infoschema.TableTiKVRegionStatus:
p.Extractor = &TiKVRegionStatusExtractor{tablesID: make([]int64, 0)}
}
Expand Down
68 changes: 68 additions & 0 deletions pkg/planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"regexp"
"slices"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -1676,3 +1677,70 @@ func (e *TiKVRegionStatusExtractor) explainInfo(_ *PhysicalMemTable) string {
func (e *TiKVRegionStatusExtractor) GetTablesID() []int64 {
return e.tablesID
}

// InfoSchemaTablesExtractor is used to extract infoSchema tables related predicates.
type InfoSchemaTablesExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool

colNames []string
ColPredicates map[string]set.StringSet
}

// Extract implements the MemTablePredicateExtractor Extract interface
func (e *InfoSchemaTablesExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
var resultSet set.StringSet
e.colNames = []string{"table_schema", "table_name"}
e.ColPredicates = make(map[string]set.StringSet)
remained = predicates
for _, colName := range e.colNames {
remained, e.SkipRequest, resultSet = e.extractCol(schema, names, remained, colName, true)
e.ColPredicates[colName] = resultSet
if e.SkipRequest {
break
}
}
return remained
}

func (e *InfoSchemaTablesExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
colNames := make([]string, 0, len(e.ColPredicates))
for colName := range e.ColPredicates {
colNames = append(colNames, colName)
}
sort.Strings(colNames)
for _, colName := range colNames {
if len(e.ColPredicates[colName]) > 0 {
fmt.Fprintf(r, "%s:[%s], ", colName, extractStringFromStringSet(e.ColPredicates[colName]))
}
}

// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}

// Filter use the col predicates to filter records.
func (e *InfoSchemaTablesExtractor) Filter(colName string, val string) bool {
if e.SkipRequest {
return true
}
predVals, ok := e.ColPredicates[colName]
if ok && len(predVals) > 0 {
return !predVals.Exist(val)
}
// No need to filter records since no predicate for the column exists.
return false
}
92 changes: 92 additions & 0 deletions pkg/planner/core/memtable_predicate_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,3 +1810,95 @@ func TestExtractorInPreparedStmt(t *testing.T) {
ca.checker(extractor)
}
}

func TestInformSchemaTableExtract(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

se, err := session.CreateSession4Test(store)
require.NoError(t, err)
var cases = []struct {
sql string
skipRequest bool
colPredicates map[string]set.StringSet
}{
{
sql: `select * from INFORMATION_SCHEMA.TABLES where table_name='T';`,
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t"),
"table_schema": set.NewStringSet(),
},
},
{
sql: `select * from INFORMATION_SCHEMA.TABLES where table_schema='TS';`,
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet(),
"table_schema": set.NewStringSet("ts"),
},
},
{
sql: "select * from information_schema.TABLES where table_name in ('TEST','t') and table_schema in ('A','b')",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("test", "t"),
"table_schema": set.NewStringSet("a", "b"),
},
},
{
sql: "select * from information_schema.TABLES where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.REFERENTIAL_CONSTRAINTS where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.KEY_COLUMN_USAGE where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' or table_name ='A'",
skipRequest: false,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet("t", "a"),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' and table_name ='A'",
skipRequest: true,
colPredicates: map[string]set.StringSet{
"table_name": set.NewStringSet(),
"table_schema": set.NewStringSet(),
},
},
{
sql: "select * from information_schema.STATISTICS where table_name ='t' and table_schema ='A' and table_schema = 'b'",
skipRequest: true,
colPredicates: map[string]set.StringSet{
"table_schema": set.NewStringSet(),
},
},
}
parser := parser.New()
for _, ca := range cases {
logicalMemTable := getLogicalMemTable(t, dom, se, parser, ca.sql)
require.NotNil(t, logicalMemTable.Extractor)
columnsTableExtractor := logicalMemTable.Extractor.(*plannercore.InfoSchemaTablesExtractor)
require.Equal(t, ca.skipRequest, columnsTableExtractor.SkipRequest, "SQL: %v", ca.sql)
require.Equal(t, ca.colPredicates, columnsTableExtractor.ColPredicates, "SQL: %v", ca.sql)
}
}
3 changes: 2 additions & 1 deletion pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func (p *LogicalMemTable) PruneColumns(parentUsedCols []*expression.Column, opt
infoschema.ClusterTableTiDBTrx,
infoschema.TableDataLockWaits,
infoschema.TableDeadlocks,
infoschema.ClusterTableDeadlocks:
infoschema.ClusterTableDeadlocks,
infoschema.TableTables:
default:
return nil
}
Expand Down
Loading