Skip to content

Commit

Permalink
redo(ticdc): use uuid in s3 log file to avoid name conflict (#5595) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed May 27, 2022
1 parent 400a715 commit 679061c
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 109 deletions.
37 changes: 25 additions & 12 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
Expand Down Expand Up @@ -57,6 +66,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
Expand All @@ -67,7 +83,7 @@ func ParseLogFileName(name string) (uint64, string, error) {
// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
Expand All @@ -76,31 +92,28 @@ func ParseLogFileName(name string) (uint64, string, error) {
return 0, "", nil
}

var commitTs, d1 uint64
var s1, namespace, s2, fileType string
var commitTs uint64
var s1, namespace, s2, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = "%s %s %s %d %s %d" + LogEXT
vars = []any{&s1, &namespace, &s2, &d1, &fileType, &commitTs}
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&s1, &namespace, &s2, &fileType, &commitTs, &uid}
} else {
formatStr = "%s %s %d %s %d" + LogEXT
vars = []any{&s1, &s2, &d1, &fileType, &commitTs}
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&s1, &s2, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
if ext == TmpEXT {
formatStr += TmpEXT
}
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
Expand Down
49 changes: 24 additions & 25 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package common
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestParseLogFileName(t *testing.T) {
type arg struct {
name string
}
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
tests := []struct {
name string
args arg
Expand All @@ -36,99 +35,99 @@ func TestParseLogFileName(t *testing.T) {
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"default", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
Expand All @@ -150,9 +149,9 @@ func TestParseLogFileName(t *testing.T) {
{
name: "err wrong format ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s%d%s", "cp",
"default", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf("%s_%s_%s_%s_%d%s%s", /* a wrong format */
"cp", "default", "test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantErr: ".*bad log name*.",
},
Expand Down
34 changes: 19 additions & 15 deletions cdc/redo/reader/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -62,7 +63,10 @@ func TestReaderRead(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := writer.NewWriter(ctx, cfg)
uuidGen := uuid.NewConstGenerator("const-uuid")
w, err := writer.NewWriter(ctx, cfg,
writer.WithUUIDGenerator(func() uuid.Generator { return uuidGen }),
)
require.Nil(t, err)
log := &model.RedoLog{
RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 1123}},
Expand All @@ -75,9 +79,9 @@ func TestReaderRead(t *testing.T) {
err = w.Close()
require.Nil(t, err)
require.True(t, !w.IsRunning())
fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", cfg.CaptureID,
fileName := fmt.Sprintf(common.RedoLogFileFormatV1, cfg.CaptureID,
cfg.ChangeFeedID.ID,
cfg.CreateTime.Unix(), cfg.FileType, 11, common.LogEXT)
cfg.FileType, 11, uuidGen.NewString(), common.LogEXT)
path := filepath.Join(cfg.Dir, fileName)
info, err := os.Stat(path)
require.Nil(t, err)
Expand Down Expand Up @@ -110,9 +114,10 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf",
time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+common.TmpEXT)
uuidGen := uuid.NewGenerator()
fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf", common.DefaultDDLLogFileType, 11,
uuidGen.NewString(), common.LogEXT+common.TmpEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -138,27 +143,26 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
require.Nil(t, err)

// no data, wil not open
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf11",
time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT)
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf11", common.DefaultDDLLogFileType, 10,
uuidGen.NewString(), common.LogEXT)
path = filepath.Join(dir, fileName)
_, err = os.Create(path)
require.Nil(t, err)

// SortLogEXT, wil open
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf111",
time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT) + common.SortLogEXT
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default",
"test-cf111", common.DefaultDDLLogFileType, 10, uuidGen.NewString(),
common.LogEXT) + common.SortLogEXT
path = filepath.Join(dir, fileName)
f1, err := os.Create(path)
require.Nil(t, err)

dir1, err := ioutil.TempDir("", "redo-openSelectedFiles1")
require.Nil(t, err)
defer os.RemoveAll(dir1) //nolint:errcheck
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf",
time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+"test")
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default", "test-cf",
common.DefaultDDLLogFileType, 11, uuidGen.NewString(), common.LogEXT+"test")
path = filepath.Join(dir1, fileName)
_, err = os.Create(path)
require.Nil(t, err)
Expand Down
9 changes: 5 additions & 4 deletions cdc/redo/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -84,9 +85,9 @@ func TestLogReaderResetReader(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf100",
time.Now().Unix(), common.DefaultDDLLogFileType, 100, common.LogEXT)
common.DefaultDDLLogFileType, 100, uuid.New().String(), common.LogEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -105,9 +106,9 @@ func TestLogReaderResetReader(t *testing.T) {
f, err := os.Open(path)
require.Nil(t, err)

fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf10",
time.Now().Unix(), common.DefaultRowLogFileType, 10, common.LogEXT)
common.DefaultRowLogFileType, 10, uuid.New().String(), common.LogEXT)
w, err = writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand Down
Loading

0 comments on commit 679061c

Please sign in to comment.