Skip to content

Commit

Permalink
sink(ticdc): use subDir in storage.Walk (#10027) (#10051)
Browse files Browse the repository at this point in the history
close #10041, close #10044
  • Loading branch information
ti-chi-bot committed Nov 10, 2023
1 parent cab3b59 commit 96adee7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 24 deletions.
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Expand Up @@ -80,7 +80,7 @@ type DMLSink struct {
alive struct {
sync.RWMutex
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
msgCh *chann.DrainableChann[eventFragment]
isDead bool
}

Expand Down Expand Up @@ -142,15 +142,15 @@ func NewDMLSink(ctx context.Context,
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = make(chan eventFragment, defaultChannelSize)
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh.Out(), encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
Expand All @@ -170,7 +170,7 @@ func NewDMLSink(ctx context.Context,

s.alive.Lock()
s.alive.isDead = true
close(s.alive.msgCh)
s.alive.msgCh.CloseAndDrain()
s.alive.Unlock()
close(s.dead)

Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa

s.statistics.ObserveRows(txn.Event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
s.alive.msgCh <- eventFragment{
s.alive.msgCh.In() <- eventFragment{
seqNumber: seq,
versionedTable: tbl,
event: txn,
Expand Down
39 changes: 20 additions & 19 deletions pkg/sink/cloudstorage/path.go
Expand Up @@ -192,27 +192,28 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
_, checksum := mustParseSchemaName(tblSchemaFile)
schemaFileCnt := 0
lastVersion := uint64(0)
prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table)
subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table)
checksumSuffix := fmt.Sprintf("%010d.json", checksum)
err = f.storage.WalkDir(ctx, &storage.WalkOption{ObjPrefix: prefix},
func(path string, _ int64) error {
schemaFileCnt++
if !strings.HasSuffix(path, checksumSuffix) {
return nil
}
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
// TODO: parsedChecksum should be ignored, remove this panic
// after the new path protocol is verified.
log.Panic("invalid schema file name",
zap.String("path", path), zap.Any("checksum", checksum))
}
if version > lastVersion {
lastVersion = version
}
err = f.storage.WalkDir(ctx, &storage.WalkOption{
SubDir: subDir, /* use subDir to prevent walk the whole storage */
ObjPrefix: subDir + "schema_",
}, func(path string, _ int64) error {
schemaFileCnt++
if !strings.HasSuffix(path, checksumSuffix) {
return nil
},
)
}
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
// TODO: parsedChecksum should be ignored, remove this panic
// after the new path protocol is verified.
log.Panic("invalid schema file name",
zap.String("path", path), zap.Any("checksum", checksum))
}
if version > lastVersion {
lastVersion = version
}
return nil
})
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/cloudstorage/path_test.go
Expand Up @@ -17,9 +17,14 @@ import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"testing"
"time"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -275,3 +280,51 @@ func TestIsSchemaFile(t *testing.T) {
"testCase: %s, path: %v", tt.name, tt.path)
}
}

func TestCheckOrWriteSchema(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dir := t.TempDir()
f := testFilePathGenerator(ctx, t, dir)

var columns []*timodel.ColumnInfo
ft := types.NewFieldType(mysql.TypeLong)
ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag)
col := &timodel.ColumnInfo{
Name: timodel.NewCIStr("Id"),
FieldType: *ft,
DefaultValue: 10,
}
columns = append(columns, col)
tableInfo := &model.TableInfo{
TableInfo: &timodel.TableInfo{Columns: columns},
Version: 100,
TableName: model.TableName{
Schema: "test",
Table: "table1",
TableID: 20,
},
}

table := VersionedTableName{
TableNameWithPhysicTableID: tableInfo.TableName,
TableInfoVersion: tableInfo.Version,
}

err := f.CheckOrWriteSchema(ctx, table, tableInfo)
require.NoError(t, err)
require.Equal(t, tableInfo.Version, f.versionMap[table])

// test only table version changed, schema file should be reused
table.TableInfoVersion = 101
err = f.CheckOrWriteSchema(ctx, table, tableInfo)
require.NoError(t, err)
require.Equal(t, tableInfo.Version, f.versionMap[table])

dir = filepath.Join(dir, "test/table1/meta")
cnt, err := os.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 1, len(cnt))
}

0 comments on commit 96adee7

Please sign in to comment.