Skip to content

Commit

Permalink
lightning: fix risk of OOM (#40443) (#44333) (#45072)
Browse files Browse the repository at this point in the history
close #40400
  • Loading branch information
ti-chi-bot committed Jun 30, 2023
1 parent 54a8c45 commit 5e0977a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 2 deletions.
21 changes: 20 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mathutil"
)
Expand All @@ -30,8 +31,14 @@ var (
errUnterminatedQuotedField = errors.NewNoStackError("syntax error: unterminated quoted field")
errDanglingBackslash = errors.NewNoStackError("syntax error: no character after backslash")
errUnexpectedQuoteField = errors.NewNoStackError("syntax error: cannot have consecutive fields without separator")
// LargestEntryLimit is the max size for reading file to buf
LargestEntryLimit int
)

func init() {
LargestEntryLimit = tidbconfig.MaxTxnEntrySizeLimit
}

// CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
type CSVParser struct {
blockParser
Expand Down Expand Up @@ -331,6 +338,9 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
var buf []byte
for {
buf = append(buf, parser.buf...)
if len(buf) > LargestEntryLimit {
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
}
parser.buf = nil
if err := parser.readBlock(); err != nil || len(parser.buf) == 0 {
if err == nil {
Expand Down Expand Up @@ -442,9 +452,18 @@ outside:

func (parser *CSVParser) readQuotedField() error {
for {
prevPos := parser.pos
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
err = parser.replaceEOF(err, errUnterminatedQuotedField)
if err != nil {
if errors.Cause(err) == io.EOF {
// return the position of quote to the caller.
// because we return an error here, the parser won't
// use the `pos` again, so it's safe to modify it here.
parser.pos = prevPos - 1
// set buf to parser.buf in order to print err log
parser.buf = content
err = parser.replaceEOF(err, errUnterminatedQuotedField)
}
return err
}
parser.recordBuffer = append(parser.recordBuffer, content...)
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/mydump/csv_parser_test.go
@@ -1,6 +1,7 @@
package mydump_test

import (
"bytes"
"context"
"encoding/csv"
"fmt"
Expand Down Expand Up @@ -680,6 +681,29 @@ func TestConsecutiveFields(t *testing.T) {
})
}

func TestTooLargeRow(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
},
}
var testCase bytes.Buffer
testCase.WriteString("a,b,c,d")
// WARN: will take up 10KB memory here.
mydump.LargestEntryLimit = 10 * 1024
for i := 0; i < mydump.LargestEntryLimit; i++ {
testCase.WriteByte('d')
}
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(&cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
}

func TestSpecialChars(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{Separator: ",", Delimiter: `"`},
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db-schema-create.sql
@@ -0,0 +1 @@
create database if not exists db;
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db.test-schema.sql
@@ -0,0 +1 @@
create table test(a int primary key, b int, c int, d int);
3 changes: 3 additions & 0 deletions br/tests/lightning_csv/errData/db.test.1.csv
@@ -0,0 +1,3 @@
1,2,3,4
2,10,4,5
1111,",7,8
8 changes: 8 additions & 0 deletions br/tests/lightning_csv/run.sh
Expand Up @@ -41,3 +41,11 @@ for BACKEND in tidb local; do
check_not_contains 'id:'

done

set +e
run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null
set -e
# err content presented
grep ",7,8" "$TEST_DIR/lightning-err.log"
# pos should not set to end
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -44,6 +44,8 @@ import (
// Config number limitations
const (
MaxLogFileSize = 4096 // MB
// MaxTxnEntrySize is the max value of TxnEntrySizeLimit.
MaxTxnEntrySizeLimit = 120 * 1024 * 1024 // 120MB
// DefTxnEntrySizeLimit is the default value of TxnEntrySizeLimit.
DefTxnEntrySizeLimit = 6 * 1024 * 1024
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Expand Up @@ -613,7 +613,7 @@ func setGlobalVars() {
plannercore.AllowCartesianProduct.Store(cfg.Performance.CrossJoin)
privileges.SkipWithGrant = cfg.Security.SkipGrantTable
kv.TxnTotalSizeLimit = cfg.Performance.TxnTotalSizeLimit
if cfg.Performance.TxnEntrySizeLimit > 120*1024*1024 {
if cfg.Performance.TxnEntrySizeLimit > config.MaxTxnEntrySizeLimit {
log.Fatal("cannot set txn entry size limit larger than 120M")
}
kv.TxnEntrySizeLimit = cfg.Performance.TxnEntrySizeLimit
Expand Down

0 comments on commit 5e0977a

Please sign in to comment.