Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: LOAD DATA use lightning CSV parser #40852

Merged
merged 13 commits into from
Feb 16, 2023
3 changes: 2 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ type CSVConfig struct {
EscapedBy string `toml:"escaped-by" json:"escaped-by"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
StartingBy string `toml:"-" json:"-"`
AllowEmptyLine bool `toml:"-" json:"-"`
// For non-empty Delimiter (for example quotes), null elements inside quotes are not considered as null except for
// `\N` (when escape-by is `\`). That is to say, `\N` is special for null because it always means null.
QuotedNullIsText bool
Expand Down
22 changes: 13 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type CSVParser struct {
escFlavor escapeFlavor
// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
quotedNullIsText bool
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
}

type field struct {
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewCSVParser(
unquoteByteSet: makeByteSet(unquoteStopSet),
newLineByteSet: makeByteSet(newLineStopSet),
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
}, nil
}
Expand Down Expand Up @@ -446,7 +449,6 @@ outside:
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}
Expand Down Expand Up @@ -497,13 +499,15 @@ outside:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
if !parser.allowEmptyLine {
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
Expand Down
141 changes: 38 additions & 103 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,6 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int
}
}

func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor)
assert.NoError(t, err)

for ignoreNLines > 0 {
// IGNORE N LINES will directly find (line) terminator without checking it's inside quotes
_, _, err = parser.ReadUntilTerminator()
if errors.Cause(err) == io.EOF {
assert.Len(t, tc.expected, 0, "input = %q", tc.input)
return
}
assert.NoError(t, err)
ignoreNLines--
}

for i, row := range tc.expected {
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
}
}

func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
Expand Down Expand Up @@ -450,6 +421,21 @@ func TestMySQL(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"\0\b\n\r\t\Z\\\ \c\'\""`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})),
},
Length: 23,
}, parser.LastRow())
}

func TestCustomEscapeChar(t *testing.T) {
Expand Down Expand Up @@ -491,6 +477,29 @@ func TestCustomEscapeChar(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

cfg = config.CSVConfig{
Separator: ",",
Delimiter: `"`,
EscapedBy: ``,
NotNull: false,
Null: []string{`NULL`},
}

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`),
},
Length: 115,
}, parser.LastRow())
}

func TestSyntaxErrorCSV(t *testing.T) {
Expand Down Expand Up @@ -1287,80 +1296,6 @@ yyy",5,xx"xxxx,8
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
}

func TestCallerCanIgnoreNLines(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases := []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{
{types.NewStringDatum("3"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

testCases = []testCase{
{
input: `"bad syntax"1
"b",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100)

// test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `"a
",1
"b
",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("b\n"), types.NewStringDatum("2")},
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)
}

func TestCharsetConversion(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,19 @@ func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReade

// Read implements io.Reader
func (pr PooledReader) Read(p []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Read(p)
}

// Seek implements io.Seeker
func (pr PooledReader) Seek(offset int64, whence int) (int64, error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Seek(offset, whence)
}

Expand All @@ -182,7 +186,9 @@ func (pr PooledReader) Close() error {

// ReadFull is same as `io.ReadFull(pr)` with less worker recycling
func (pr PooledReader) ReadFull(buf []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return io.ReadFull(pr.reader, buf)
}
3 changes: 3 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ go_library(
deps = [
"//bindinfo",
"//br/pkg/glue",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//br/pkg/task",
"//config",
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
loadDataInfo: loadDataInfo,
}
var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.initQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
86 changes: 0 additions & 86 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -89,64 +88,6 @@ func generateDatumSlice(vals ...int64) []types.Datum {
return datums
}

func TestGetFieldsFromLine(t *testing.T) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
// Test mixed.
{
`"123",456,"\t7890",abcd`,
[]string{"123", "456", "\t7890", "abcd"},
},
}

ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
Escaped: '\\',
},
}

for _, test := range tests {
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
require.NoErrorf(t, err, "failed: %s", test.input)
assertEqualStrings(t, got, test.expected)
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
require.NoError(t, err)
}

func assertEqualStrings(t *testing.T, got []field, expect []string) {
require.Equal(t, len(expect), len(got))
for i := 0; i < len(got); i++ {
require.Equal(t, expect[i], string(got[i].str))
}
}

func TestSlowQueryRuntimeStats(t *testing.T) {
stats := &slowQueryRuntimeStats{
totalFileNum: 2,
Expand Down Expand Up @@ -270,33 +211,6 @@ func TestFilterTemporaryTableKeys(t *testing.T) {
require.Len(t, res, 1)
}

func TestLoadDataWithDifferentEscapeChar(t *testing.T) {
tests := []struct {
input string
escapeChar byte
expected []string
}{
{
`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`,
byte(0), // escaped by ''
[]string{`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`},
},
}

for _, test := range tests {
ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
Escaped: test.escapeChar,
},
}
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
require.NoErrorf(t, err, "failed: %s", test.input)
assertEqualStrings(t, got, test.expected)
}
}

func TestSortSpillDisk(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
Expand Down