Skip to content

Commit

Permalink
Merge branch 'master' into memory-control-bug-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed Nov 2, 2022
2 parents fb5ffb3 + 1c5b837 commit 5a398be
Show file tree
Hide file tree
Showing 267 changed files with 13,635 additions and 10,739 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2825,8 +2825,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=",
version = "v0.0.0-20221014081430-26e28e6a281a",
sum = "h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=",
version = "v0.0.0-20221026112947-f8d61344b172",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3429,15 +3429,15 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=",
version = "v2.0.1-0.20221017092635-91be9c6ce6c0",
sum = "h1:NvQHWk0GeXSLEBbmGMPnDMc0to0a3ogzgIRbTKw8MHI=",
version = "v2.0.1-0.20221031063202-30e803b7082c",
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:REQOR1XraH1fT9BCoNBPZs1CAe+w7VPLU+d+si7DLYo=",
version = "v0.0.0-20221010134149-d50e5fe43f14",
sum = "h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=",
version = "v0.0.0-20221031025758-80f0d8ca4d07",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) build --remote_download_minimal $(BAZEL_CMD_CONFIG) \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server-check_/tidb-server-check ./bin
Expand Down
6 changes: 3 additions & 3 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,12 @@ func TestStmtHints(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(1 GB) */ * from t use index(idx)")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustQuery("select * from t")
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery)
require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
tk.MustQuery("select a, b from t")
require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery)
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
}

Expand Down
115 changes: 77 additions & 38 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ type MDLoaderSetupConfig struct {
// ReturnPartialResultOnError specifies whether the currently scanned files are analyzed,
// and return the partial result.
ReturnPartialResultOnError bool
// FileIter controls the file iteration policy when constructing a MDLoader.
FileIter FileIterator
}

// DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.
func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig {
return &MDLoaderSetupConfig{
MaxScanFiles: 0, // By default, the loader will scan all the files.
ReturnPartialResultOnError: false,
FileIter: nil,
}
}

Expand All @@ -156,6 +159,13 @@ func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption {
}
}

// WithFileIterator generates an option that specifies the file iteration policy.
func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption {
return func(cfg *MDLoaderSetupConfig) {
cfg.FileIter = fileIter
}
}

// MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.
type MDLoader struct {
store storage.ExternalStorage
Expand Down Expand Up @@ -202,6 +212,12 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto
for _, o := range opts {
o(mdLoaderSetupCfg)
}
if mdLoaderSetupCfg.FileIter == nil {
mdLoaderSetupCfg.FileIter = &allFileIterator{
store: store,
maxScanFiles: mdLoaderSetupCfg.MaxScanFiles,
}
}

if len(cfg.Routes) > 0 && len(cfg.Mydumper.FileRouters) > 0 {
return nil, common.ErrInvalidConfig.GenWithStack("table route is deprecated, can't config both [routes] and [mydumper.files]")
Expand Down Expand Up @@ -254,7 +270,7 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto
setupCfg: mdLoaderSetupCfg,
}

if err := setup.setup(ctx, mdl.store); err != nil {
if err := setup.setup(ctx); err != nil {
if mdLoaderSetupCfg.ReturnPartialResultOnError {
return mdl, errors.Trace(err)
}
Expand Down Expand Up @@ -312,15 +328,19 @@ type ExtendColumnData struct {
// Will sort tables by table size, this means that the big table is imported
// at the latest, which to avoid large table take a long time to import and block
// small table to release index worker.
func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage) error {
func (s *mdLoaderSetup) setup(ctx context.Context) error {
/*
Mydumper file names format
db —— {db}-schema-create.sql
table —— {db}.{table}-schema.sql
sql —— {db}.{table}.{part}.sql / {db}.{table}.sql
*/
var gerr error
if err := s.listFiles(ctx, store); err != nil {
fileIter := s.setupCfg.FileIter
if fileIter == nil {
return errors.New("file iterator is not defined")
}
if err := fileIter.IterateFiles(ctx, s.constructFileInfo); err != nil {
if s.setupCfg.ReturnPartialResultOnError {
gerr = err
} else {
Expand Down Expand Up @@ -389,55 +409,74 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
return gerr
}

func (s *mdLoaderSetup) listFiles(ctx context.Context, store storage.ExternalStorage) error {
// FileHandler is the interface to handle the file give the path and size.
// It is mainly used in the `FileIterator` as parameters.
type FileHandler func(ctx context.Context, path string, size int64) error

// FileIterator is the interface to iterate files in a data source.
// Use this interface to customize the file iteration policy.
type FileIterator interface {
IterateFiles(ctx context.Context, hdl FileHandler) error
}

type allFileIterator struct {
store storage.ExternalStorage
maxScanFiles int
}

func (iter *allFileIterator) IterateFiles(ctx context.Context, hdl FileHandler) error {
// `filepath.Walk` yields the paths in a deterministic (lexicographical) order,
// meaning the file and chunk orders will be the same everytime it is called
// (as long as the source is immutable).
totalScannedFileCount := 0
err := store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
logger := log.FromContext(ctx).With(zap.String("path", path))
err := iter.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
totalScannedFileCount++
if s.setupCfg.MaxScanFiles > 0 && totalScannedFileCount > s.setupCfg.MaxScanFiles {
if iter.maxScanFiles > 0 && totalScannedFileCount > iter.maxScanFiles {
return common.ErrTooManySourceFiles
}
res, err := s.loader.fileRouter.Route(filepath.ToSlash(path))
if err != nil {
return errors.Annotatef(err, "apply file routing on file '%s' failed", path)
}
if res == nil {
logger.Info("[loader] file is filtered by file router")
return nil
}

info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
}
return hdl(ctx, path, size)
})

if s.loader.shouldSkip(&info.TableName) {
logger.Debug("[filter] ignoring table file")
return errors.Trace(err)
}

return nil
}
func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size int64) error {
logger := log.FromContext(ctx).With(zap.String("path", path))
res, err := s.loader.fileRouter.Route(filepath.ToSlash(path))
if err != nil {
return errors.Annotatef(err, "apply file routing on file '%s' failed", path)
}
if res == nil {
logger.Info("[loader] file is filtered by file router")
return nil
}

switch res.Type {
case SourceTypeSchemaSchema:
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}
info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
}

logger.Debug("file route result", zap.String("schema", res.Schema),
zap.String("table", res.Name), zap.Stringer("type", res.Type))
if s.loader.shouldSkip(&info.TableName) {
logger.Debug("[filter] ignoring table file")

return nil
})
}

return errors.Trace(err)
switch res.Type {
case SourceTypeSchemaSchema:
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}

logger.Debug("file route result", zap.String("schema", res.Schema),
zap.String("table", res.Name), zap.Stringer("type", res.Type))

return nil
}

func (l *MDLoader) shouldSkip(table *filter.Table) bool {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"db.go",
"import.go",
"import_retry.go",
"log_client.go",
"merge.go",
"pipeline_items.go",
"range.go",
Expand Down Expand Up @@ -41,6 +42,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//config",
"//ddl",
"//ddl/util",
Expand Down Expand Up @@ -131,6 +133,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//infoschema",
"//kv",
"//meta/autoid",
Expand Down

0 comments on commit 5a398be

Please sign in to comment.