Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): use subDir in storage.Walk #10027

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type DMLSink struct {
alive struct {
sync.RWMutex
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
msgCh *chann.DrainableChann[eventFragment]
isDead bool
}

Expand Down Expand Up @@ -142,15 +142,15 @@ func NewDMLSink(ctx context.Context,
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = make(chan eventFragment, defaultChannelSize)
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh.Out(), encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
Expand All @@ -170,7 +170,7 @@ func NewDMLSink(ctx context.Context,

s.alive.Lock()
s.alive.isDead = true
close(s.alive.msgCh)
s.alive.msgCh.CloseAndDrain()
s.alive.Unlock()
close(s.dead)

Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa

s.statistics.ObserveRows(txn.Event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
s.alive.msgCh <- eventFragment{
s.alive.msgCh.In() <- eventFragment{
seqNumber: seq,
versionedTable: tbl,
event: txn,
Expand Down
39 changes: 20 additions & 19 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,27 +192,28 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
_, checksum := mustParseSchemaName(tblSchemaFile)
schemaFileCnt := 0
lastVersion := uint64(0)
prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table)
subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table)
checksumSuffix := fmt.Sprintf("%010d.json", checksum)
err = f.storage.WalkDir(ctx, &storage.WalkOption{ObjPrefix: prefix},
func(path string, _ int64) error {
schemaFileCnt++
if !strings.HasSuffix(path, checksumSuffix) {
return nil
}
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
// TODO: parsedChecksum should be ignored, remove this panic
// after the new path protocol is verified.
log.Panic("invalid schema file name",
zap.String("path", path), zap.Any("checksum", checksum))
}
if version > lastVersion {
lastVersion = version
}
err = f.storage.WalkDir(ctx, &storage.WalkOption{
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
SubDir: subDir, /* use subDir to prevent walk the whole storage */
ObjPrefix: subDir + "schema_",
}, func(path string, _ int64) error {
schemaFileCnt++
if !strings.HasSuffix(path, checksumSuffix) {
return nil
},
)
}
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
// TODO: parsedChecksum should be ignored, remove this panic
// after the new path protocol is verified.
log.Panic("invalid schema file name",
zap.String("path", path), zap.Any("checksum", checksum))
}
if version > lastVersion {
lastVersion = version
}
return nil
})
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"testing"
"time"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -275,3 +280,51 @@ func TestIsSchemaFile(t *testing.T) {
"testCase: %s, path: %v", tt.name, tt.path)
}
}

func TestCheckOrWriteSchema(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dir := t.TempDir()
f := testFilePathGenerator(ctx, t, dir)

var columns []*timodel.ColumnInfo
ft := types.NewFieldType(mysql.TypeLong)
ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag)
col := &timodel.ColumnInfo{
Name: timodel.NewCIStr("Id"),
FieldType: *ft,
DefaultValue: 10,
}
columns = append(columns, col)
tableInfo := &model.TableInfo{
TableInfo: &timodel.TableInfo{Columns: columns},
Version: 100,
TableName: model.TableName{
Schema: "test",
Table: "table1",
TableID: 20,
},
}

table := VersionedTableName{
TableNameWithPhysicTableID: tableInfo.TableName,
TableInfoVersion: tableInfo.Version,
}

err := f.CheckOrWriteSchema(ctx, table, tableInfo)
require.NoError(t, err)
require.Equal(t, tableInfo.Version, f.versionMap[table])

// test only table version changed, schema file should be reused
table.TableInfoVersion = 101
err = f.CheckOrWriteSchema(ctx, table, tableInfo)
require.NoError(t, err)
require.Equal(t, tableInfo.Version, f.versionMap[table])

dir = filepath.Join(dir, "test/table1/meta")
cnt, err := os.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 1, len(cnt))
}
Loading