Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add cache for approximate table count #44979

Merged
merged 16 commits into from
Jul 3, 2023
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_library(
"//executor/internal/builder",
"//executor/internal/exec",
"//executor/internal/mpp",
"//executor/internal/pdhelper",
"//executor/internal/util",
"//executor/metrics",
"//executor/mppcoordmanager",
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/internal/builder"
"github.com/pingcap/tidb/executor/internal/exec"
internalutil "github.com/pingcap/tidb/executor/internal/util"
"github.com/pingcap/tidb/executor/internal/pdhelper"
executor_metrics "github.com/pingcap/tidb/executor/metrics"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand Down Expand Up @@ -2787,7 +2787,7 @@ func (b *executorBuilder) getAdjustedSampleRate(task plannercore.AnalyzeColumnsT
}

func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) {
return internalutil.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
Expand Down
11 changes: 11 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/executor/internal/pdhelper"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -171,6 +172,16 @@ func init() {
CheckTableFastBucketSize.Store(1024)
}

// Start the backend components
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest add some comments since we are introducing the "backend components" new concept to this package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func Start() {
pdhelper.GlobalPDHelper.Start()
}

// Stop the backend components
func Stop() {
pdhelper.GlobalPDHelper.Stop()
}

// Action panics when storage usage exceeds storage quota.
func (a *globalPanicOnExceed) Action(t *memory.Tracker) {
a.mutex.Lock()
Expand Down
4 changes: 2 additions & 2 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/domain/resourcegroup"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor/internal/exec"
internalutil "github.com/pingcap/tidb/executor/internal/util"
"github.com/pingcap/tidb/executor/internal/pdhelper"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func getRemainDurationForAnalyzeStatusHelper(
}
}
if tid > 0 && totalCnt == 0 {
totalCnt, _ = internalutil.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName)
totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName)
}
RemainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration)
}
Expand Down
30 changes: 30 additions & 0 deletions executor/internal/pdhelper/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "pdhelper",
srcs = ["pd.go"],
importpath = "github.com/pingcap/tidb/executor/internal/pdhelper",
visibility = ["//executor:__subpackages__"],
deps = [
"//kv",
"//sessionctx",
"//store/helper",
"//util",
"//util/sqlexec",
"@com_github_jellydator_ttlcache_v3//:ttlcache",
"@com_github_pingcap_failpoint//:failpoint",
],
)

go_test(
name = "pdhelper_test",
timeout = "short",
srcs = ["pd_test.go"],
embed = [":pdhelper"],
flaky = True,
deps = [
"//sessionctx",
"@com_github_jellydator_ttlcache_v3//:ttlcache",
"@com_github_stretchr_testify//require",
],
)
124 changes: 124 additions & 0 deletions executor/internal/pdhelper/pd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pdhelper

import (
"context"
"strconv"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/sqlexec"
)

// GlobalPDHelper is the global variable for PDHelper.
var GlobalPDHelper = defaultPDHelper()
var globalPDHelperOnce sync.Once

// PDHelper is used to get some information from PD.
type PDHelper struct {
wg util.WaitGroupWrapper
cacheForApproximateTableCountFromStorage *ttlcache.Cache[string, float64]

getApproximateTableCountFromStorageFunc func(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool)
}

func defaultPDHelper() *PDHelper {
cache := ttlcache.New[string, float64](
ttlcache.WithTTL[string, float64](30*time.Second),
ttlcache.WithCapacity[string, float64](1024*1024),
)
return &PDHelper{
cacheForApproximateTableCountFromStorage: cache,
getApproximateTableCountFromStorageFunc: getApproximateTableCountFromStorage,
}
}

// Start is used to start the background task of PDHelper. Currently, the background task is used to clean up TTL cache.
func (p *PDHelper) Start() {
globalPDHelperOnce.Do(func() {
p.wg.Run(p.cacheForApproximateTableCountFromStorage.Start)
chrysan marked this conversation as resolved.
Show resolved Hide resolved
})
}

// Stop stops the background task of PDHelper.
func (p *PDHelper) Stop() {
p.cacheForApproximateTableCountFromStorage.Stop()
p.wg.Wait()
}

func approximateTableCountKey(tid int64, dbName, tableName, partitionName string) string {
return strings.Join([]string{strconv.FormatInt(tid, 10), dbName, tableName, partitionName}, "_")
}

// GetApproximateTableCountFromStorage gets the approximate count of the table.
func (p *PDHelper) GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
key := approximateTableCountKey(tid, dbName, tableName, partitionName)
if item := p.cacheForApproximateTableCountFromStorage.Get(key); item != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetApproximateTableCountFromStorage is the same as the old. but we add cache here.

time-and-fate marked this conversation as resolved.
Show resolved Hide resolved
return item.Value(), true
}
result, hasPD := p.getApproximateTableCountFromStorageFunc(sctx, tid, dbName, tableName, partitionName)
p.cacheForApproximateTableCountFromStorage.Set(key, result, ttlcache.DefaultTTL)
return result, hasPD
}

func getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return 0, false
}
regionStats := &helper.PDRegionStats{}
pdHelper := helper.NewHelper(tikvStore)
err := pdHelper.GetPDRegionStats(tid, regionStats, true)
failpoint.Inject("calcSampleRateByStorageCount", func() {
// Force the TiDB thinking that there's PD and the count of region is small.
err = nil
regionStats.Count = 1
// Set a very large approximate count.
regionStats.StorageKeys = 1000000
})
if err != nil {
return 0, false
}
// If this table is not small, we directly use the count from PD,
// since for a small table, it's possible that it's data is in the same region with part of another large table.
// Thus, we use the number of the regions of the table's table KV to decide whether the table is small.
if regionStats.Count > 2 {
return float64(regionStats.StorageKeys), true
}
// Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions.
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName)
if partitionName != "" {
sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName)
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return 0, false
}
// If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row.
if len(rows) == 0 || rows[0].Len() == 0 {
return 0, false
}
return float64(rows[0].GetInt64(0)), true
}
67 changes: 67 additions & 0 deletions executor/internal/pdhelper/pd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pdhelper

import (
"testing"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pingcap/tidb/sessionctx"
"github.com/stretchr/testify/require"
)

var globalMockClient mockClient

type mockClient struct {
missCnt int
}

func (m *mockClient) getMissCnt() int {
return m.missCnt
}

func (m *mockClient) getFakeApproximateTableCountFromStorage(_ sessionctx.Context, _ int64, _, _, _ string) (float64, bool) {
m.missCnt++
return 1.0, true
}

func TestTTLCache(t *testing.T) {
cache := ttlcache.New[string, float64](
ttlcache.WithTTL[string, float64](100*time.Millisecond),
ttlcache.WithCapacity[string, float64](2),
)
helper := &PDHelper{
cacheForApproximateTableCountFromStorage: cache,
getApproximateTableCountFromStorageFunc: globalMockClient.getFakeApproximateTableCountFromStorage,
}
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss
require.Equal(t, 1, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Hit
require.Equal(t, 1, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") // Miss
require.Equal(t, 2, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Miss
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss
require.Equal(t, 4, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Hit
require.Equal(t, 4, globalMockClient.getMissCnt())
time.Sleep(200 * time.Millisecond)
// All is miss.
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition")
helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition")
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition")
require.Equal(t, 7, globalMockClient.getMissCnt())
}
5 changes: 0 additions & 5 deletions executor/internal/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/executor/internal/util",
visibility = ["//executor:__subpackages__"],
deps = [
"//kv",
"//sessionctx",
"//store/helper",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-tipb",
],
)
54 changes: 0 additions & 54 deletions executor/internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,3 @@
// limitations under the License.

package util

import (
"context"
"strings"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/util/sqlexec"
)

// GetApproximateTableCountFromStorage gets the approximate count of the table.
func GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return 0, false
}
regionStats := &helper.PDRegionStats{}
pdHelper := helper.NewHelper(tikvStore)
err := pdHelper.GetPDRegionStats(tid, regionStats, true)
failpoint.Inject("calcSampleRateByStorageCount", func() {
// Force the TiDB thinking that there's PD and the count of region is small.
err = nil
regionStats.Count = 1
// Set a very large approximate count.
regionStats.StorageKeys = 1000000
})
if err != nil {
return 0, false
}
// If this table is not small, we directly use the count from PD,
// since for a small table, it's possible that it's data is in the same region with part of another large table.
// Thus, we use the number of the regions of the table's table KV to decide whether the table is small.
if regionStats.Count > 2 {
return float64(regionStats.StorageKeys), true
}
// Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions.
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName)
if partitionName != "" {
sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName)
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return 0, false
}
// If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row.
if len(rows) == 0 || rows[0].Len() == 0 {
return 0, false
}
return float64(rows[0].GetInt64(0)), true
}
3 changes: 2 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func main() {
setupMetrics()

keyspaceName := keyspace.GetKeyspaceNameBySettings()

executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
svr := createServer(storage, dom)
Expand All @@ -251,6 +251,7 @@ func main() {
cleanup(svr, storage, dom)
cpuprofile.StopCPUProfiler()
resourcemanager.InstanceResourceManager.Stop()
executor.Stop()
close(exited)
})
topsql.SetupTopSQL()
Expand Down