Skip to content

Commit

Permalink
sink(ticdc): remove buffer pool in storage sink worker (#8913) (#9108)
Browse files Browse the repository at this point in the history
close #8894
  • Loading branch information
ti-chi-bot committed May 31, 2023
1 parent da21b06 commit 566b9f1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 101 deletions.
2 changes: 2 additions & 0 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
TableInfoVersion: txn.Event.TableInfoVersion,
}
seq := atomic.AddUint64(&s.lastSeqNum, 1)

s.statistics.ObserveRows(txn.Event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
s.alive.msgCh <- eventFragment{
seqNumber: seq,
Expand Down
168 changes: 67 additions & 101 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"bytes"
"context"
"path"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
Expand All @@ -42,39 +42,58 @@ type dmlWorker struct {
storage storage.ExternalStorage
config *cloudstorage.Config
// flushNotifyCh is used to notify that several tables can be flushed.
flushNotifyCh chan flushTask
inputCh *chann.DrainableChann[eventFragment]
// tableEvents maintains a mapping of <table, []eventFragment>.
tableEvents *tableEventsMap
// fileSize maintains a mapping of <table, file size>.
fileSize map[cloudstorage.VersionedTableName]uint64
flushNotifyCh chan dmlTask
inputCh *chann.DrainableChann[eventFragment]
isClosed uint64
statistics *metrics.Statistics
filePathGenerator *cloudstorage.FilePathGenerator
bufferPool sync.Pool
metricWriteBytes prometheus.Gauge
metricFileCount prometheus.Gauge
}

type tableEventsMap struct {
mu sync.Mutex
fragments map[cloudstorage.VersionedTableName][]eventFragment
// dmlTask defines a task containing the tables to be flushed.
type dmlTask struct {
tasks map[cloudstorage.VersionedTableName]*singleTableTask
}

func newTableEventsMap() *tableEventsMap {
return &tableEventsMap{
fragments: make(map[cloudstorage.VersionedTableName][]eventFragment),
type singleTableTask struct {
size uint64
tableInfo *model.TableInfo
msgs []*common.Message
}

func newDMLTask() dmlTask {
return dmlTask{
tasks: make(map[cloudstorage.VersionedTableName]*singleTableTask),
}
}

type wrappedTable struct {
cloudstorage.VersionedTableName
tableInfo *model.TableInfo
func (t *dmlTask) handleSingleTableEvent(event eventFragment) {
table := event.versionedTable
if _, ok := t.tasks[table]; !ok {
t.tasks[table] = &singleTableTask{
size: 0,
tableInfo: event.event.Event.TableInfo,
}
}

v := t.tasks[table]
for _, msg := range event.encodedMsgs {
v.size += uint64(len(msg.Value))
}
v.msgs = append(v.msgs, event.encodedMsgs...)
}

// flushTask defines a task containing the tables to be flushed.
type flushTask struct {
targetTables []wrappedTable
func (t *dmlTask) generateTaskByTable(table cloudstorage.VersionedTableName) dmlTask {
v := t.tasks[table]
if v == nil {
log.Panic("table not found in dml task", zap.Any("table", table), zap.Any("task", t))
}
delete(t.tasks, table)

return dmlTask{
tasks: map[cloudstorage.VersionedTableName]*singleTableTask{table: v},
}
}

func newDMLWorker(
Expand All @@ -93,18 +112,13 @@ func newDMLWorker(
storage: storage,
config: config,
inputCh: inputCh,
tableEvents: newTableEventsMap(),
flushNotifyCh: make(chan flushTask, 1),
fileSize: make(map[cloudstorage.VersionedTableName]uint64),
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
bufferPool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

return d
Expand Down Expand Up @@ -139,19 +153,13 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error {
if atomic.LoadUint64(&d.isClosed) == 1 {
return nil
}
for _, tbl := range task.targetTables {
table := tbl.VersionedTableName
d.tableEvents.mu.Lock()
events := make([]eventFragment, len(d.tableEvents.fragments[table]))
copy(events, d.tableEvents.fragments[table])
d.tableEvents.fragments[table] = nil
d.tableEvents.mu.Unlock()
if len(events) == 0 {
for table, task := range task.tasks {
if len(task.msgs) == 0 {
continue
}

// generate scheme.json file before generating the first data file if necessary
err := d.filePathGenerator.CheckOrWriteSchema(ctx, table, tbl.tableInfo)
err := d.filePathGenerator.CheckOrWriteSchema(ctx, table, task.tableInfo)
if err != nil {
log.Error("failed to write schema file to external storage",
zap.Int("workerID", d.id),
Expand Down Expand Up @@ -190,11 +198,7 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error {
}

// then write the data file to external storage.
// TODO: if system crashes when writing date file CDC000002.csv
// (file is not generated at all), then after TiCDC recovers from the crash,
// storage sink will generate a new file named CDC000003.csv,
// we will optimize this issue later.
err = d.writeDataFile(ctx, dataFilePath, events)
err = d.writeDataFile(ctx, dataFilePath, task)
if err != nil {
log.Error("failed to write data file to external storage",
zap.Int("workerID", d.id),
Expand Down Expand Up @@ -222,24 +226,17 @@ func (d *dmlWorker) writeIndexFile(ctx context.Context, path, content string) er
return err
}

func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eventFragment) error {
func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *singleTableTask) error {
var callbacks []func()

buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
buf := d.bufferPool.Get().(*bytes.Buffer)
defer d.bufferPool.Put(buf)
buf.Reset()

for _, frag := range events {
msgs := frag.encodedMsgs
d.statistics.ObserveRows(frag.event.Event.Rows...)
for _, msg := range msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}
for _, msg := range task.msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
Expand All @@ -249,8 +246,8 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eve
}); err != nil {
return err
}
d.metricFileCount.Add(1)

d.metricFileCount.Add(1)
for _, cb := range callbacks {
if cb != nil {
cb()
Expand All @@ -266,7 +263,7 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eve
func (d *dmlWorker) dispatchFlushTasks(ctx context.Context,
ch *chann.DrainableChann[eventFragment],
) error {
tableSet := make(map[wrappedTable]struct{})
flushTask := newDMLTask()
ticker := time.NewTicker(d.config.FlushInterval)

for {
Expand All @@ -277,63 +274,32 @@ func (d *dmlWorker) dispatchFlushTasks(ctx context.Context,
if atomic.LoadUint64(&d.isClosed) == 1 {
return nil
}
var readyTables []wrappedTable
for tbl := range tableSet {
readyTables = append(readyTables, tbl)
}
if len(readyTables) == 0 {
continue
}
task := flushTask{
targetTables: readyTables,
}
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case d.flushNotifyCh <- task:
case d.flushNotifyCh <- flushTask:
log.Debug("flush task is emitted successfully when flush interval exceeds",
zap.Any("tables", task.targetTables))
for elem := range tableSet {
tbl := elem.VersionedTableName
d.fileSize[tbl] = 0
}
tableSet = make(map[wrappedTable]struct{})
zap.Int("tablesLength", len(flushTask.tasks)))
flushTask = newDMLTask()
default:
}
case frag, ok := <-ch.Out():
if !ok || atomic.LoadUint64(&d.isClosed) == 1 {
return nil
}
table := frag.versionedTable
d.tableEvents.mu.Lock()
d.tableEvents.fragments[table] = append(d.tableEvents.fragments[table], frag)
d.tableEvents.mu.Unlock()

key := wrappedTable{
VersionedTableName: table,
tableInfo: frag.event.Event.TableInfo,
}

tableSet[key] = struct{}{}
for _, msg := range frag.encodedMsgs {
if msg.Value != nil {
d.fileSize[table] += uint64(len(msg.Value))
}
}
flushTask.handleSingleTableEvent(frag)
// if the file size exceeds the upper limit, emit the flush task containing the table
// as soon as possible.
if d.fileSize[table] > uint64(d.config.FileSize) {
task := flushTask{
targetTables: []wrappedTable{key},
}
table := frag.versionedTable
if flushTask.tasks[table].size >= uint64(d.config.FileSize) {
task := flushTask.generateTaskByTable(table)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case d.flushNotifyCh <- task:
log.Debug("flush task is emitted successfully when file size exceeds",
zap.Any("tables", table))
d.fileSize[table] = 0
default:
zap.Any("table", table),
zap.Int("eventsLenth", len(task.tasks[table].msgs)))
}
}
}
Expand Down

0 comments on commit 566b9f1

Please sign in to comment.