Skip to content

Commit

Permalink
dumpling: fix dumpling ignore file writer close error (#45374)
Browse files Browse the repository at this point in the history
close #45353
  • Loading branch information
lichunzhu committed Jul 18, 2023
1 parent bc65f23 commit aca4429
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 14 deletions.
1 change: 1 addition & 0 deletions dumpling/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func newMultiQueriesChunk(queries []string, colLength int) *multiQueriesChunk {
func (td *multiQueriesChunk) Start(tctx *tcontext.Context, conn *sql.Conn) error {
td.tctx = tctx
td.conn = conn
td.SQLRowIter = nil
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions dumpling/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ func (m *globalMetadata) writeGlobalMetaData() error {
if err != nil {
return err
}
defer tearDown(m.tctx)

return write(m.tctx, fileWriter, m.String())
err = write(m.tctx, fileWriter, m.String())
tearDownErr := tearDown(m.tctx)
if err == nil {
return tearDownErr
}
return err
}

func getValidStr(str []string, idx int) string {
Expand Down
14 changes: 10 additions & 4 deletions dumpling/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,13 @@ func (w *Writer) tryToWriteTableData(tctx *tcontext.Context, meta TableMeta, ir
for {
fileWriter, tearDown := buildInterceptFileWriter(tctx, w.extStorage, fileName, conf.CompressType)
n, err := format.WriteInsert(tctx, conf, meta, ir, fileWriter, w.metrics)
tearDown(tctx)
tearDownErr := tearDown(tctx)
if err != nil {
return err
}
if tearDownErr != nil {
return tearDownErr
}

if w, ok := fileWriter.(*InterceptFileWriter); ok && !w.SomethingIsWritten {
break
Expand Down Expand Up @@ -279,13 +282,16 @@ func (w *Writer) writeMetaToFile(tctx *tcontext.Context, target, metaSQL string,
if err != nil {
return errors.Trace(err)
}
defer tearDown(tctx)

return WriteMeta(tctx, &metaData{
err = WriteMeta(tctx, &metaData{
target: target,
metaSQL: metaSQL,
specCmts: getSpecialComments(w.conf.ServerInfo.ServerType),
}, fileWriter)
tearDownErr := tearDown(tctx)
if err == nil {
return tearDownErr
}
return err
}

type outputFileNamer struct {
Expand Down
14 changes: 14 additions & 0 deletions dumpling/export/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/util/promutil"
Expand Down Expand Up @@ -71,6 +72,12 @@ func TestWriteTableMeta(t *testing.T) {
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n/*!40101 SET NAMES binary*/;\nCREATE TABLE t (a INT);\n", string(bytes))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile", "return(true)"))
defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile=return(true)")

err = writer.WriteTableMeta("test", "t", "CREATE TABLE t (a INT)")
require.ErrorContains(t, err, "injected error: fail to close meta file")
}

func TestWriteViewMeta(t *testing.T) {
Expand Down Expand Up @@ -137,6 +144,13 @@ func TestWriteTableData(t *testing.T) {
"(3,'male','john@mail.com','020-1256','healthy'),\n" +
"(4,'female','sarah@mail.com','020-1235','healthy');\n"
require.Equal(t, expected, string(bytes))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile", "return(true)"))
defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=return(true)")

tableIR = newMockTableIR("test", "employee", data, specCmts, colTypes)
err = writer.WriteTableData(tableIR, tableIR, 0)
require.ErrorContains(t, err, "injected error: fail to close data file")
}

func TestWriteTableDataWithFileSize(t *testing.T) {
Expand Down
20 changes: 14 additions & 6 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
return errors.Trace(err)
}

func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context), error) {
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
Expand All @@ -462,20 +462,24 @@ func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName
return nil, nil, errors.Trace(err)
}
tctx.L().Debug("opened file", zap.String("path", fullPath))
tearDownRoutine := func(ctx context.Context) {
tearDownRoutine := func(ctx context.Context) error {
err := writer.Close(ctx)
failpoint.Inject("FailToCloseMetaFile", func(_ failpoint.Value) {
err = errors.New("injected error: fail to close meta file")
})
if err == nil {
return
return nil
}
err = errors.Trace(err)
tctx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
return err
}
return writer, tearDownRoutine, nil
}

func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(context.Context)) {
func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(context.Context) error) {
fileName += compressFileSuffix(compressType)
var writer storage.ExternalFileWriter
fullPath := s.URI() + "/" + fileName
Expand All @@ -497,17 +501,21 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
}
fileWriter.initRoutine = initRoutine

tearDownRoutine := func(ctx context.Context) {
tearDownRoutine := func(ctx context.Context) error {
if writer == nil {
return
return nil
}
pCtx.L().Debug("tear down lazy file writer...", zap.String("path", fullPath))
err := writer.Close(ctx)
failpoint.Inject("FailToCloseDataFile", func(_ failpoint.Value) {
err = errors.New("injected error: fail to close data file")
})
if err != nil {
pCtx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
}
return err
}
return fileWriter, tearDownRoutine
}
Expand Down
35 changes: 34 additions & 1 deletion dumpling/tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,43 @@ run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
cnt=$(grep "IOTotalBytes=" ${DUMPLING_OUTPUT_DIR}/dumpling.log | grep -v "IOTotalBytes=0" | wc -l)
[ "$cnt" -ge 1 ]

export GO_FAILPOINTS=""
echo "Test for failing to close meta/data file"
export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile=1*return"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump failed error stack info" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]

# dumpling retry will make it succeed
export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=1*return"
export DUMPLING_TEST_PORT=4000
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
run_sql "insert into test_db.test_table values (1),(2),(3),(4),(5),(6),(7),(8);"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump data successfully" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]
cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/test_db.test_table.000000000.sql|wc -l)
echo "records count is ${cnt}"
[ "$cnt" -eq 8 ]

export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=5*return"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump failed error stack info" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]

echo "Test for empty query result, should success."
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
export GO_FAILPOINTS=""
run_dumpling --sql "select * from test_db.test_table" --filetype csv > ${DUMPLING_OUTPUT_DIR}/dumpling.log

0 comments on commit aca4429

Please sign in to comment.