Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
Browse files Browse the repository at this point in the history
…int-v2
  • Loading branch information
tangenta committed Apr 12, 2023
2 parents e535400 + db68a12 commit 9ef599f
Show file tree
Hide file tree
Showing 36 changed files with 691 additions and 178 deletions.
4 changes: 0 additions & 4 deletions executor/BUILD.bazel
Expand Up @@ -266,7 +266,6 @@ go_test(
timeout = "moderate",
srcs = [
"adapter_test.go",
"admin_test.go",
"aggregate_test.go",
"analyze_test.go",
"apply_cache_test.go",
Expand Down Expand Up @@ -311,7 +310,6 @@ go_test(
"join_test.go",
"joiner_test.go",
"main_test.go",
"memory_test.go",
"memtable_reader_test.go",
"merge_join_test.go",
"metrics_reader_test.go",
Expand Down Expand Up @@ -421,7 +419,6 @@ go_test(
"//util/gcutil",
"//util/hack",
"//util/logutil",
"//util/logutil/consistency",
"//util/mathutil",
"//util/memory",
"//util/mock",
Expand Down Expand Up @@ -464,7 +461,6 @@ go_test(
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
38 changes: 38 additions & 0 deletions executor/admintest/BUILD.bazel
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "admintest_test",
timeout = "short",
srcs = [
"admin_test.go",
"main_test.go",
],
flaky = True,
shard_count = 23,
deps = [
"//config",
"//domain",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
"//planner/core",
"//session",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//table",
"//table/tables",
"//testkit",
"//testkit/testsetup",
"//testkit/testutil",
"//types",
"//util/codec",
"//util/logutil/consistency",
"//util/mock",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test
package admintest

import (
"context"
Expand Down
48 changes: 48 additions & 0 deletions executor/admintest/main_test.go
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package admintest

import (
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/testkit/testsetup"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
autoid.SetStep(5000)
config.UpdateGlobal(func(conf *config.Config) {
conf.Instance.SlowThreshold = 30000 // 30s
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
conf.Experimental.AllowsExpressionIndex = true
})
tikv.EnableFailpoints()

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

goleak.VerifyTestMain(m, opts...)
}
37 changes: 30 additions & 7 deletions executor/mem_reader.go
Expand Up @@ -202,7 +202,19 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *Tabl
if len(pkColIDs) == 0 {
pkColIDs = []int64{-1}
}
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, nil, us.ctx.GetSessionVars().Location())

defVal := func(i int) ([]byte, error) {
sessVars := us.ctx.GetSessionVars()
originStrict := sessVars.StrictSQLMode
sessVars.StrictSQLMode = false
d, err := table.GetColOriginDefaultValue(us.ctx, us.columns[i])
sessVars.StrictSQLMode = originStrict
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(us.ctx.GetSessionVars().StmtCtx, nil, d)
}
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, defVal, us.ctx.GetSessionVars().Location())
return &memTableReader{
ctx: us.ctx,
table: us.table.Meta(),
Expand Down Expand Up @@ -541,7 +553,7 @@ func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum,
return nil, nil
}

colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns)
colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.ctx, m.table, m.columns)
memTblReader := &memTableReader{
ctx: m.ctx,
table: m.table.Meta(),
Expand Down Expand Up @@ -588,7 +600,7 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
memReaders := make([]memReader, 0, indexCount)
for i := 0; i < indexCount; i++ {
if indexMergeReader.indexes[i] == nil {
colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.table, indexMergeReader.columns)
colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.ctx, indexMergeReader.table, indexMergeReader.columns)
memReaders = append(memReaders, &memTableReader{
ctx: us.ctx,
table: indexMergeReader.table.Meta(),
Expand Down Expand Up @@ -678,7 +690,7 @@ func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum,
if numHandles == 0 {
return nil, nil
}
colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns)
colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.ctx, m.table, m.columns)

memTblReader := &memTableReader{
ctx: m.ctx,
Expand Down Expand Up @@ -772,13 +784,13 @@ func (m *memIndexMergeReader) getMemRowsHandle() ([]kv.Handle, error) {
return nil, errors.New("getMemRowsHandle has not been implemented for memIndexMergeReader")
}

func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) {
func getColIDAndPkColIDs(ctx sessionctx.Context, tbl table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) {
colIDs := make(map[int64]int, len(columns))
for i, col := range columns {
colIDs[col.ID] = i
}

tblInfo := table.Meta()
tblInfo := tbl.Meta()
colInfos := make([]rowcodec.ColInfo, 0, len(columns))
for i := range columns {
col := columns[i]
Expand All @@ -792,6 +804,17 @@ func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[in
if len(pkColIDs) == 0 {
pkColIDs = []int64{-1}
}
rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil)
defVal := func(i int) ([]byte, error) {
sessVars := ctx.GetSessionVars()
originStrict := sessVars.StrictSQLMode
sessVars.StrictSQLMode = false
d, err := table.GetColOriginDefaultValue(ctx, columns[i])
sessVars.StrictSQLMode = originStrict
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, d)
}
rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, defVal, ctx.GetSessionVars().Location())
return colIDs, pkColIDs, rd
}
1 change: 1 addition & 0 deletions executor/oomtest/BUILD.bazel
Expand Up @@ -10,6 +10,7 @@ go_test(
deps = [
"//testkit",
"//testkit/testsetup",
"//util/set",
"//util/syncutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
Expand Down
47 changes: 41 additions & 6 deletions executor/oomtest/oom_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testsetup"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/syncutil"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -53,6 +54,10 @@ func TestMemTracker4UpdateExec(t *testing.T) {
log.SetLevel(zap.InfoLevel)

oom.SetTracker("")
oom.ClearMessageFilter()
oom.AddMessageFilter(
"expensive_query during bootstrap phase",
"schemaLeaseChecker is not set for this transaction")

tk.MustExec("insert into t_MemTracker4UpdateExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.GetTracker())
Expand All @@ -75,36 +80,48 @@ func TestMemTracker4InsertAndReplaceExec(t *testing.T) {
log.SetLevel(zap.InfoLevel)

oom.SetTracker("")
oom.AddMessageFilter(
"schemaLeaseChecker is not set for this transaction",
"expensive_query during bootstrap phase")

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
oom.ClearMessageFilter()
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.SetTracker("")
oom.ClearMessageFilter()
oom.AddMessageFilter("expensive_query during bootstrap phase")

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.GetTracker())
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.SetTracker("")
oom.ClearMessageFilter()

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "", oom.GetTracker())
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.SetTracker("")
oom.ClearMessageFilter()

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "", oom.GetTracker())
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
Expand All @@ -114,18 +131,23 @@ func TestMemTracker4InsertAndReplaceExec(t *testing.T) {
tk.Session().GetSessionVars().BatchInsert = true

oom.SetTracker("")
oom.ClearMessageFilter()

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.GetTracker())
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.SetTracker("")
oom.ClearMessageFilter()
oom.AddMessageFilter("expensive_query during bootstrap phase")

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.GetTracker())
oom.AddMessageFilter("expensive_query during bootstrap phase")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
Expand All @@ -146,6 +168,8 @@ func TestMemTracker4DeleteExec(t *testing.T) {
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")

oom.SetTracker("")
oom.ClearMessageFilter()
oom.AddMessageFilter("expensive_query during bootstrap phase")

tk.MustExec("delete from MemTracker4DeleteExec1")
require.Equal(t, "", oom.GetTracker())
Expand All @@ -166,8 +190,9 @@ func TestMemTracker4DeleteExec(t *testing.T) {
require.Equal(t, "", oom.GetTracker())
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)")
tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)")

oom.ClearMessageFilter()
oom.SetTracker("")
oom.AddMessageFilter("memory exceeds quota, rateLimitAction delegate to fallback action")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/disableFixedRowCountHint", "return"))
defer func() {
Expand All @@ -184,15 +209,26 @@ var oom *oomCapture
func registerHook() {
conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
oom = &oomCapture{r.Core, "", syncutil.Mutex{}}
oom = &oomCapture{r.Core, "", syncutil.Mutex{}, set.NewStringSet()}
lg := zap.New(oom)
log.ReplaceGlobals(lg, r)
}

type oomCapture struct {
zapcore.Core
tracker string
mu syncutil.Mutex
tracker string
mu syncutil.Mutex
messageFilter set.StringSet
}

func (h *oomCapture) AddMessageFilter(vals ...string) {
for _, val := range vals {
h.messageFilter.Insert(val)
}
}

func (h *oomCapture) ClearMessageFilter() {
h.messageFilter.Clear()
}

func (h *oomCapture) SetTracker(tracker string) {
Expand Down Expand Up @@ -223,8 +259,7 @@ func (h *oomCapture) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return nil
}
// They are just common background task and not related to the oom.
if entry.Message == "SetTiFlashGroupConfig" ||
entry.Message == "record table item load status failed due to not finding item" {
if !h.messageFilter.Empty() && !h.messageFilter.Exist(entry.Message) {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion executor/unstabletest/BUILD.bazel
Expand Up @@ -5,10 +5,11 @@ go_test(
timeout = "short",
srcs = [
"main_test.go",
"memory_test.go",
"unstable_test.go",
],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//config",
"//meta/autoid",
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test
package unstabletest

import (
"context"
Expand Down

0 comments on commit 9ef599f

Please sign in to comment.