Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into fix-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
chrysan committed Feb 16, 2023
2 parents 1365261 + d579dd1 commit c08ab0e
Show file tree
Hide file tree
Showing 39 changed files with 1,926 additions and 814 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ def go_deps():
name = "com_github_blacktear23_go_proxyprotocol",
build_file_proto_mode = "disable_global",
importpath = "github.com/blacktear23/go-proxyprotocol",
sum = "h1:zR7PZeoU0wAkElcIXenFiy3R56WB6A+UEVi4c6RH8wo=",
version = "v1.0.2",
sum = "h1:moi4x1lJlrQj2uYUJdEyCxqj9UNmaSKZwaGZIXnbAis=",
version = "v1.0.5",
)
go_repository(
name = "com_github_blizzy78_varnamelen",
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ bazel_test: failpoint-enable bazel_ci_prepare


bazel_coverage_test: check-bazel-prepare failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ func (h *scanRegionEmptyHook) AfterScanRegions(res []*split.RegionInfo, err erro
}

func TestBatchSplitRegionByRangesScanFailed(t *testing.T) {
backup := split.ScanRegionAttemptTimes
split.ScanRegionAttemptTimes = 3
defer func() {
split.ScanRegionAttemptTimes = backup
}()
doTestBatchSplitRegionByRanges(context.Background(), t, &scanRegionEmptyHook{}, "scan region return empty result", defaultHook{})
}

Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ type CSVConfig struct {
EscapedBy string `toml:"escaped-by" json:"escaped-by"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
StartingBy string `toml:"-" json:"-"`
AllowEmptyLine bool `toml:"-" json:"-"`
// For non-empty Delimiter (for example quotes), null elements inside quotes are not considered as null except for
// `\N` (when escape-by is `\`). That is to say, `\N` is special for null because it always means null.
QuotedNullIsText bool
Expand Down
22 changes: 13 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type CSVParser struct {
escFlavor escapeFlavor
// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
quotedNullIsText bool
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
}

type field struct {
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewCSVParser(
unquoteByteSet: makeByteSet(unquoteStopSet),
newLineByteSet: makeByteSet(newLineStopSet),
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
}, nil
}
Expand Down Expand Up @@ -446,7 +449,6 @@ outside:
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}
Expand Down Expand Up @@ -497,13 +499,15 @@ outside:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
if !parser.allowEmptyLine {
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
Expand Down
141 changes: 38 additions & 103 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,6 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int
}
}

func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor)
assert.NoError(t, err)

for ignoreNLines > 0 {
// IGNORE N LINES will directly find (line) terminator without checking it's inside quotes
_, _, err = parser.ReadUntilTerminator()
if errors.Cause(err) == io.EOF {
assert.Len(t, tc.expected, 0, "input = %q", tc.input)
return
}
assert.NoError(t, err)
ignoreNLines--
}

for i, row := range tc.expected {
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
}
}

func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
Expand Down Expand Up @@ -450,6 +421,21 @@ func TestMySQL(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"\0\b\n\r\t\Z\\\ \c\'\""`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})),
},
Length: 23,
}, parser.LastRow())
}

func TestCustomEscapeChar(t *testing.T) {
Expand Down Expand Up @@ -491,6 +477,29 @@ func TestCustomEscapeChar(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

cfg = config.CSVConfig{
Separator: ",",
Delimiter: `"`,
EscapedBy: ``,
NotNull: false,
Null: []string{`NULL`},
}

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`),
},
Length: 115,
}, parser.LastRow())
}

func TestSyntaxErrorCSV(t *testing.T) {
Expand Down Expand Up @@ -1287,80 +1296,6 @@ yyy",5,xx"xxxx,8
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
}

func TestCallerCanIgnoreNLines(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases := []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{
{types.NewStringDatum("3"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

testCases = []testCase{
{
input: `"bad syntax"1
"b",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100)

// test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `"a
",1
"b
",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("b\n"), types.NewStringDatum("2")},
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)
}

func TestCharsetConversion(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,19 @@ func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReade

// Read implements io.Reader
func (pr PooledReader) Read(p []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Read(p)
}

// Seek implements io.Seeker
func (pr PooledReader) Seek(offset int64, whence int) (int64, error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Seek(offset, whence)
}

Expand All @@ -182,7 +186,9 @@ func (pr PooledReader) Close() error {

// ReadFull is same as `io.ReadFull(pr)` with less worker recycling
func (pr PooledReader) ReadFull(buf []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return io.ReadFull(pr.reader, buf)
}
10 changes: 1 addition & 9 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -185,15 +184,8 @@ type scanRegionBackoffer struct {
}

func newScanRegionBackoffer() utils.Backoffer {
attempt := ScanRegionAttemptTimes
// only use for test.
failpoint.Inject("scanRegionBackoffer", func(val failpoint.Value) {
if val.(bool) {
attempt = 3
}
})
return &scanRegionBackoffer{
attempt: attempt,
attempt: ScanRegionAttemptTimes,
}
}

Expand Down
9 changes: 5 additions & 4 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/binary"
"testing"

"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -231,7 +230,11 @@ func TestPaginateScanRegion(t *testing.T) {
regionMap := make(map[uint64]*split.RegionInfo)
var regions []*split.RegionInfo
var batch []*split.RegionInfo
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/restore/split/scanRegionBackoffer", "return(true)"))
backup := split.ScanRegionAttemptTimes
split.ScanRegionAttemptTimes = 3
defer func() {
split.ScanRegionAttemptTimes = backup
}()
_, err := split.PaginateScanRegion(ctx, NewTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
require.Error(t, err)
require.True(t, berrors.ErrPDBatchScanRegion.Equal(err))
Expand Down Expand Up @@ -294,8 +297,6 @@ func TestPaginateScanRegion(t *testing.T) {
require.Error(t, err)
require.True(t, berrors.ErrPDBatchScanRegion.Equal(err))
require.Regexp(t, ".*region endKey not equal to next region startKey.*", err.Error())

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/restore/split/scanRegionBackoffer"))
}

func TestRewriteFileKeys(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ type ProxyProtocol struct {
Networks string `toml:"networks" json:"networks"`
// PROXY protocol header read timeout, Unit is second.
HeaderTimeout uint `toml:"header-timeout" json:"header-timeout"`
// PROXY protocol header process fallback-able.
// If set to true and not send PROXY protocol header, connection will return connection's client IP.
Fallbackable bool `toml:"fallbackable" json:"fallbackable"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -979,6 +982,7 @@ var defaultConf = Config{
ProxyProtocol: ProxyProtocol{
Networks: "",
HeaderTimeout: 5,
Fallbackable: false,
},
PreparedPlanCache: PreparedPlanCache{
Enabled: true,
Expand Down

0 comments on commit c08ab0e

Please sign in to comment.