Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support extracting source/schema/table name to specific column #37790

Merged
merged 53 commits into from Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
7c2407d
support entend routes for lightning
lichunzhu Sep 13, 2022
48d5230
merge master
lichunzhu Sep 13, 2022
9407ccf
fix ci
lichunzhu Sep 14, 2022
c4d00bb
Merge branch 'master' into supportExtendColumn
lichunzhu Sep 14, 2022
e441446
fix ci again
lichunzhu Sep 14, 2022
9fc6f36
Merge branch 'master' of https://github.com/pingcap/tidb into support…
lichunzhu Sep 14, 2022
e047d9b
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Sep 14, 2022
ce289be
fix
lichunzhu Sep 14, 2022
437f9db
support sample, fix ut
lichunzhu Sep 14, 2022
c8a98ab
Merge branch 'master' of https://github.com/pingcap/tidb into support…
lichunzhu Sep 14, 2022
d50ecd1
Apply suggestions from code review
lichunzhu Sep 14, 2022
43cb4f6
fix
lichunzhu Sep 14, 2022
972acd7
simplify
lichunzhu Sep 14, 2022
01717b8
block test
lichunzhu Sep 15, 2022
79d3d42
Merge branch 'master' of https://github.com/pingcap/tidb into support…
lichunzhu Sep 15, 2022
0b9baa5
fix
lichunzhu Sep 15, 2022
65322cc
fix lightning various types integration test
lichunzhu Sep 15, 2022
689f231
extract function
lichunzhu Sep 15, 2022
d5d2185
Merge branch 'master' of https://github.com/pingcap/tidb into support…
lichunzhu Sep 15, 2022
befefba
Update br/pkg/lightning/mydump/loader.go
lichunzhu Sep 16, 2022
f0e2406
address some comments, add unit test for filterColumns
lichunzhu Sep 16, 2022
72f33fb
Merge branch 'master' of https://github.com/pingcap/tidb into support…
lichunzhu Sep 16, 2022
9aa4393
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Sep 16, 2022
d52d9e2
address comments, add ignore-columnm test case
lichunzhu Sep 16, 2022
64e56bd
update precheck and unit tests
lichunzhu Sep 20, 2022
d1d054b
Merge branch 'master' into supportExtendColumn
lichunzhu Sep 20, 2022
5f0eb0e
fix it
lichunzhu Sep 20, 2022
91e4172
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Sep 20, 2022
90d8aae
fix ut
lichunzhu Sep 20, 2022
750dc95
Merge branch 'master' into supportExtendColumn
lichunzhu Sep 20, 2022
81648e0
tmp
lichunzhu Sep 21, 2022
f8fca3c
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 18, 2022
3479f1e
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 19, 2022
c73e1e2
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Oct 19, 2022
6847ebc
address comments
lichunzhu Oct 19, 2022
fd448f9
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 19, 2022
73647ed
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 20, 2022
e9a1763
address comments
lichunzhu Oct 20, 2022
bfe361c
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Oct 20, 2022
66516b2
address comments
lichunzhu Oct 20, 2022
0c4d822
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 20, 2022
bdce0bb
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 21, 2022
8826f72
address comment
lichunzhu Oct 25, 2022
b9df587
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Oct 25, 2022
9c1cc44
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 25, 2022
1b82d71
ignore .bak files
lichunzhu Oct 25, 2022
e88b373
Merge branch 'supportExtendColumn' of https://github.com/lichunzhu/ti…
lichunzhu Oct 25, 2022
18dc3ad
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 25, 2022
4978bd8
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 26, 2022
87ef359
Merge branch 'master' into supportExtendColumn
dsdashun Oct 26, 2022
525b3e6
Merge branch 'master' into supportExtendColumn
lichunzhu Oct 26, 2022
112901a
fix it
lichunzhu Oct 26, 2022
1d8424b
Merge branch 'master' into supportExtendColumn
ti-chi-bot Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Expand Up @@ -454,6 +454,7 @@ type MydumperRuntime struct {
ReadBlockSize ByteSize `toml:"read-block-size" json:"read-block-size"`
BatchSize ByteSize `toml:"batch-size" json:"batch-size"`
BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"`
SourceID string `toml:"source-id" json:"source-id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should expose it to user by toml, because this seems a DM only feature request

SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
CharacterSet string `toml:"character-set" json:"character-set"`
CSV CSVConfig `toml:"csv" json:"csv"`
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/mydump/loader.go
Expand Up @@ -82,6 +82,7 @@ type SourceFileMeta struct {
Compression Compression
SortKey string
FileSize int64
ExtendData ExtendColumnData
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

// NewMDTableMeta creates an Mydumper table meta with specified character set.
Expand Down Expand Up @@ -167,6 +168,7 @@ type MDLoader struct {
}

type mdLoaderSetup struct {
sourceID string
loader *MDLoader
dbSchemas []FileInfo
tableSchemas []FileInfo
Expand Down Expand Up @@ -245,6 +247,7 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto
}

setup := mdLoaderSetup{
sourceID: cfg.Mydumper.SourceID,
loader: mdl,
dbIndexMap: make(map[string]int),
tableIndexMap: make(map[filter.Table]int),
Expand Down Expand Up @@ -289,6 +292,12 @@ type FileInfo struct {
FileMeta SourceFileMeta
}

// ExtendColumnData contains the extended column names and values information for a table.
type ExtendColumnData struct {
Columns []string
Values []string
}

// setup the `s.loader.dbs` slice by scanning all *.sql files inside `dir`.
//
// The database and tables are inserted in a consistent order, so creating an
Expand Down Expand Up @@ -488,6 +497,13 @@ func (s *mdLoaderSetup) route() error {
knownDBNames[targetDB] = newInfo
}
arr[i].TableName = filter.Table{Schema: targetDB, Name: targetTable}
extendCols, extendVals := r.FetchExtendColumn(rawDB, rawTable, s.sourceID)
if len(extendCols) > 0 {
arr[i].FileMeta.ExtendData = ExtendColumnData{
Columns: extendCols,
Values: extendVals,
}
}
}
return nil
}
Expand Down
63 changes: 63 additions & 0 deletions br/pkg/lightning/mydump/loader_test.go
Expand Up @@ -990,3 +990,66 @@ func TestMaxScanFilesOption(t *testing.T) {
tbl = dbMeta.Tables[0]
require.Equal(t, maxScanFilesCount-2, len(tbl.DataFiles))
}

func TestExternalDataRoutes(t *testing.T) {
s := newTestMydumpLoaderSuite(t)

s.touch(t, "test_1-schema-create.sql")
s.touch(t, "test_1.t1-schema.sql")
s.touch(t, "test_1.t1.sql")
s.touch(t, "test_2-schema-create.sql")
s.touch(t, "test_2.t2-schema.sql")
s.touch(t, "test_2.t2.sql")
s.touch(t, "test_3-schema-create.sql")
s.touch(t, "test_3.t1-schema.sql")
s.touch(t, "test_3.t1.sql")
s.touch(t, "test_3.t3-schema.sql")
s.touch(t, "test_3.t3.sql")

s.cfg.Mydumper.SourceID = "mysql-01"
s.cfg.Routes = []*router.TableRule{
{
TableExtractor: &router.TableExtractor{
TargetColumn: "c_table",
TableRegexp: "t(.*)",
},
SchemaExtractor: &router.SchemaExtractor{
TargetColumn: "c_schema",
SchemaRegexp: "test_(.*)",
},
SourceExtractor: &router.SourceExtractor{
TargetColumn: "c_source",
SourceRegexp: "mysql-(.*)",
},
SchemaPattern: "test_*",
TablePattern: "t*",
TargetSchema: "test",
TargetTable: "t",
},
}

mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)

require.NoError(t, err)
var database *md.MDDatabaseMeta
for _, db := range mdl.GetDatabases() {
if db.Name == "test" {
require.Nil(t, database)
database = db
}
}
require.NotNil(t, database)
require.Len(t, database.Tables, 1)
require.Len(t, database.Tables[0].DataFiles, 4)
expectExtendCols := []string{"c_table", "c_schema", "c_source"}
expectedExtendVals := [][]string{
{"1", "1", "01"},
{"2", "2", "01"},
{"1", "3", "01"},
{"3", "3", "01"},
}
for i, fileInfo := range database.Tables[0].DataFiles {
require.Equal(t, expectExtendCols, fileInfo.FileMeta.ExtendData.Columns)
require.Equal(t, expectedExtendVals[i], fileInfo.FileMeta.ExtendData.Values)
}
}
7 changes: 4 additions & 3 deletions br/pkg/lightning/mydump/region.go
Expand Up @@ -40,9 +40,10 @@ const (
type TableRegion struct {
EngineID int32

DB string
Table string
FileMeta SourceFileMeta
DB string
Table string
FileMeta SourceFileMeta
ExtendData ExtendColumnData

Chunk Chunk
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/mydump/router.go
Expand Up @@ -184,6 +184,11 @@ func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter,
return chainRouters(res), nil
}

// NewDefaultFileRouter creates a new file router with the default file route rules.
func NewDefaultFileRouter(logger log.Logger) (FileRouter, error) {
return NewFileRouter(defaultFileRouteRules, logger)
}

// RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath.
// if regex pattern match, then each extractors with capture the matched regexp pattern and
// set value to target field in `RouteResult`
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Expand Up @@ -62,6 +62,8 @@ go_library(
"//util/engine",
"//util/mathutil",
"//util/mock",
"//util/regexpr-router",
"//util/set",
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down Expand Up @@ -140,6 +142,7 @@ go_test(
"//util/mock",
"//util/promutil",
"//util/table-filter",
"//util/table-router",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
64 changes: 64 additions & 0 deletions br/pkg/lightning/restore/chunk_restore_test.go
Expand Up @@ -36,7 +36,12 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/types"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -273,6 +278,65 @@ func (s *chunkRestoreSuite) TestEncodeLoop() {
require.Equal(s.T(), s.cr.chunk.Chunk.EndOffset, kvs[0].offset)
}

func (s *chunkRestoreSuite) TestEncodeLoopWithExtendData() {
ctx := context.Background()
kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)

p := parser.New()
se := tmock.NewContext()

lastTi := s.tr.tableInfo
defer func() {
s.tr.tableInfo = lastTi
}()

node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL, `c_table` varchar(5), `c_schema` varchar(5), `c_source` varchar(5))", "utf8mb4", "utf8mb4_bin")
require.NoError(s.T(), err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), int64(1))
require.NoError(s.T(), err)
tableInfo.State = model.StatePublic

schema := "test_1"
tb := "t1"
ti := &checkpoints.TidbTableInfo{
ID: tableInfo.ID,
DB: schema,
Name: tb,
Core: tableInfo,
}
s.tr.tableInfo = ti
s.cr.chunk.FileMeta.ExtendData = mydump.ExtendColumnData{
Columns: []string{"c_table", "c_schema", "c_source"},
Values: []string{"1", "1", "01"},
}
defer func() {
s.cr.chunk.FileMeta.ExtendData = mydump.ExtendColumnData{}
}()

kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
}, nil, log.L())
require.NoError(s.T(), err)
cfg := config.NewConfig()
rc := &Controller{pauser: DeliverPauser, cfg: cfg}
_, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
require.NoError(s.T(), err)
require.Len(s.T(), kvsCh, 2)

kvs := <-kvsCh
require.Len(s.T(), kvs, 1)
require.Equal(s.T(), int64(19), kvs[0].rowID)
require.Equal(s.T(), int64(36), kvs[0].offset)
require.Equal(s.T(), []string{"c1", "c_table", "c_schema", "c_source"}, kvs[0].columns)

kvs = <-kvsCh
require.Equal(s.T(), 1, len(kvs))
require.Nil(s.T(), kvs[0].kvs)
require.Equal(s.T(), s.cr.chunk.Chunk.EndOffset, kvs[0].offset)
}

func (s *chunkRestoreSuite) TestEncodeLoopCanceled() {
ctx, cancel := context.WithCancel(context.Background())
kvsCh := make(chan []deliveredKVs)
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/lightning/restore/get_pre_info.go
Expand Up @@ -648,9 +648,12 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable(
}

initializedColumns := false
var columnPermutation []int
var kvSize uint64 = 0
var rowSize uint64 = 0
var (
columnPermutation []int
kvSize uint64 = 0
rowSize uint64 = 0
extendVals []types.Datum
)
rowCount := 0
dataKVs := p.encBuilder.MakeEmptyRows()
indexKVs := p.encBuilder.MakeEmptyRows()
Expand All @@ -665,17 +668,32 @@ outloop:
switch errors.Cause(err) {
case nil:
if !initializedColumns {
ignoreColsMap := igCols.ColumnsMap()
if len(columnPermutation) == 0 {
columnPermutation, err = createColumnPermutation(
columnNames,
igCols.ColumnsMap(),
ignoreColsMap,
tableInfo,
log.FromContext(ctx))
if err != nil {
return 0.0, false, errors.Trace(err)
}
}
if len(sampleFile.ExtendData.Columns) > 0 {
_, extendVals = filterColumns(columnNames, sampleFile.ExtendData, ignoreColsMap, tableInfo)
}
initializedColumns = true
lastRow := parser.LastRow()
lastRowLen := len(lastRow.Row)
extendColsMap := make(map[string]int)
for i, c := range sampleFile.ExtendData.Columns {
extendColsMap[c] = lastRowLen + i
}
for i, col := range tableInfo.Columns {
if p, ok := extendColsMap[col.Name.O]; ok {
columnPermutation[i] = p
}
}
}
case io.EOF:
break outloop
Expand All @@ -685,6 +703,7 @@ outloop:
}
lastRow := parser.LastRow()
rowCount++
lastRow.Row = append(lastRow.Row, extendVals...)

var dataChecksum, indexChecksum verification.KVChecksum
kvs, encodeErr := kvEncoder.Encode(logTask.Logger, lastRow.Row, lastRow.RowID, columnPermutation, sampleFile.Path, offset)
Expand Down
61 changes: 60 additions & 1 deletion br/pkg/lightning/restore/precheck_impl.go
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/set"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -749,15 +750,73 @@ func (ci *schemaCheckItem) SchemaIsValid(ctx context.Context, tableInfo *mydump.
}
igCols := igCol.ColumnsMap()

fullExtendColsSet := make(set.StringSet)
for _, fileInfo := range tableInfo.DataFiles {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
for _, col := range fileInfo.FileMeta.ExtendData.Columns {
if _, ok = igCols[col]; ok {
msgs = append(msgs, fmt.Sprintf("extend column %s is also assigned in ignore-column for table `%s`.`%s`, "+
"please keep only either one of them", col, tableInfo.DB, tableInfo.Name))
}
fullExtendColsSet.Insert(col)
}
}
if len(msgs) > 0 {
return msgs, nil
}

colCountFromTiDB := len(info.Core.Columns)
if len(fullExtendColsSet) > 0 {
log.FromContext(ctx).Info("check extend column count through data files", zap.String("db", tableInfo.DB),
zap.String("table", tableInfo.Name))
igColCnt := 0
for _, col := range info.Core.Columns {
if _, ok = igCols[col.Name.L]; ok {
igColCnt++
}
}
for _, f := range tableInfo.DataFiles {
cols, previewRows, err := ci.preInfoGetter.ReadFirstNRowsByFileMeta(ctx, f.FileMeta, 1)
if err != nil {
return nil, errors.Trace(err)
}
if len(cols) > 0 {
colsSet := set.NewStringSet(cols...)
for _, extendCol := range f.FileMeta.ExtendData.Columns {
if colsSet.Exist(strings.ToLower(extendCol)) {
msgs = append(msgs, fmt.Sprintf("extend column %s is contained in table `%s`.`%s`'s header, "+
"please remove this column in data or remove this extend rule", extendCol, tableInfo.DB, tableInfo.Name))
}
}
} else if len(previewRows) > 0 && len(previewRows[0])+len(f.FileMeta.ExtendData.Columns) > colCountFromTiDB+igColCnt {
msgs = append(msgs, fmt.Sprintf("row count %d adding with extend column length %d is larger than columnCount %d plus ignore column count %d for table `%s`.`%s`, "+
"please make sure your source data don't have extend columns and target schema has all of them", len(previewRows[0]), len(f.FileMeta.ExtendData.Columns), colCountFromTiDB, igColCnt, tableInfo.DB, tableInfo.Name))
}
}
}
if len(msgs) > 0 {
return msgs, nil
}

core := info.Core
defaultCols := make(map[string]struct{})
for _, col := range core.Columns {
if hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.GetFlag())) {
// we can extend column the same with columns with default values
if _, isExtendCol := fullExtendColsSet[col.Name.O]; isExtendCol || hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.GetFlag())) {
// this column has default value or it's auto random id, so we can ignore it
defaultCols[col.Name.L] = struct{}{}
}
delete(fullExtendColsSet, col.Name.O)
}
if len(fullExtendColsSet) > 0 {
extendCols := make([]string, 0, len(fullExtendColsSet))
for col := range fullExtendColsSet {
extendCols = append(extendCols, col)
}
msgs = append(msgs, fmt.Sprintf("extend column [%s] don't exist in target table `%s`.`%s` schema, "+
"please add these extend columns manually in downstream database/schema file", strings.Join(extendCols, ","), tableInfo.DB, tableInfo.Name))
return msgs, nil
}

// tidb_rowid have a default value.
defaultCols[model.ExtraHandleName.String()] = struct{}{}

Expand Down