Skip to content

Commit

Permalink
fix: Fix memory buffer error & some renaming (#33850)
Browse files Browse the repository at this point in the history
#30633

---------

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jun 21, 2024
1 parent 2f691f1 commit 380d3f4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 72 deletions.
162 changes: 92 additions & 70 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ type clusteringCompactionTask struct {
binlogIO io.BinlogIO
allocator allocator.Allocator

ctx context.Context
cancel context.CancelFunc
done chan struct{}
tr *timerecord.TimeRecorder
ctx context.Context
cancel context.CancelFunc
done chan struct{}
tr *timerecord.TimeRecorder
mappingPool *conc.Pool[any]
flushPool *conc.Pool[any]

plan *datapb.CompactionPlan

// schedule
spillChan chan SpillSignal
pool *conc.Pool[any]
// flush
flushMutex sync.Mutex
flushCount *atomic.Int64
flushChan chan SpillSignal

// metrics
spillCount *atomic.Int64
writtenRowNum *atomic.Int64

// inner field
Expand All @@ -84,7 +86,6 @@ type clusteringCompactionTask struct {
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema

spillMutex sync.Mutex
memoryBufferSize int64
clusterBuffers []*ClusterBuffer
clusterBufferLocks *lock.KeyLock[int]
Expand All @@ -101,7 +102,7 @@ type ClusterBuffer struct {
writer *SegmentWriter
bufferRowNum atomic.Int64

flushedRowNum int64
flushedRowNum atomic.Int64
flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog

uploadedSegments []*datapb.CompactionSegment
Expand Down Expand Up @@ -129,10 +130,10 @@ func NewClusteringCompactionTask(
plan: plan,
tr: timerecord.NewTimeRecorder("clustering_compaction"),
done: make(chan struct{}, 1),
spillChan: make(chan SpillSignal, 100),
flushChan: make(chan SpillSignal, 100),
clusterBuffers: make([]*ClusterBuffer, 0),
clusterBufferLocks: lock.NewKeyLock[int](),
spillCount: atomic.NewInt64(0),
flushCount: atomic.NewInt64(0),
writtenRowNum: atomic.NewInt64(0),
}
}
Expand Down Expand Up @@ -179,7 +180,8 @@ func (t *clusteringCompactionTask) init() error {
t.currentTs = tsoutil.GetCurrentTime()
t.memoryBufferSize = t.getMemoryBufferSize()
workerPoolSize := t.getWorkerPoolSize()
t.pool = conc.NewPool[any](workerPoolSize)
t.mappingPool = conc.NewPool[any](workerPoolSize)
t.flushPool = conc.NewPool[any](workerPoolSize)
log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize))
return nil
}
Expand Down Expand Up @@ -253,7 +255,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
metrics.DataNodeCompactionLatency.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).
Observe(float64(t.tr.ElapseSpan().Milliseconds()))
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()))
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load()))

return planResult, nil
}
Expand Down Expand Up @@ -345,8 +347,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
inputSegments := t.plan.GetSegmentBinlogs()
mapStart := time.Now()

// start spill goroutine
go t.backgroundSpill(ctx)
// start flush goroutine
go t.backgroundFlush(ctx)

futures := make([]*conc.Future[any], 0, len(inputSegments))
for _, segment := range inputSegments {
Expand All @@ -355,7 +357,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
// only FieldBinlogs needed
FieldBinlogs: segment.FieldBinlogs,
}
future := t.pool.Submit(func() (any, error) {
future := t.mappingPool.Submit(func() (any, error) {
err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts)
return struct{}{}, err
})
Expand All @@ -365,8 +367,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
return nil, nil, err
}

// force spill all buffers
err := t.spillAll(ctx)
// force flush all buffers
err := t.flushAll(ctx)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -405,7 +407,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
return resultSegments, resultPartitionStats, nil
}

func (t *clusteringCompactionTask) getWrittenMemoryBufferSize() int64 {
func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 {
var totalBufferSize int64 = 0
for _, buffer := range t.clusterBuffers {
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize())
Expand Down Expand Up @@ -539,22 +541,21 @@ func (t *clusteringCompactionTask) mappingSegment(
}
remained++

// currentSize := t.totalBufferSize.Load()
if (remained+1)%20 == 0 {
currentBufferSize := t.getWrittenMemoryBufferSize()
// trigger spill
if clusterBuffer.bufferRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
if (remained+1)%100 == 0 {
currentBufferSize := t.getUsedMemoryBufferSize()
// trigger flushBinlog
if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
// reach segment/binlog max size
t.spillChan <- SpillSignal{
t.flushChan <- SpillSignal{
buffer: clusterBuffer,
}
} else if currentBufferSize >= t.getMemoryBufferMiddleWatermark() {
// reach spill trigger threshold
t.spillChan <- SpillSignal{}
} else if currentBufferSize >= t.getMemoryBufferHighWatermark() {
// reach flushBinlog trigger threshold
t.flushChan <- SpillSignal{}
}

// if the total buffer size is too large, block here, wait for memory release by spill
if currentBufferSize > t.getMemoryBufferHighWatermark() {
// if the total buffer size is too large, block here, wait for memory release by flushBinlog
if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() {
loop:
for {
select {
Expand All @@ -565,8 +566,8 @@ func (t *clusteringCompactionTask) mappingSegment(
log.Warn("stop waiting for memory buffer release as task chan done")
return nil
default:
currentSize := t.getWrittenMemoryBufferSize()
if currentSize < t.getMemoryBufferMiddleWatermark() {
currentSize := t.getUsedMemoryBufferSize()
if currentSize < t.getMemoryBufferLowWatermark() {
break loop
}
time.Sleep(time.Millisecond * 200)
Expand Down Expand Up @@ -614,15 +615,19 @@ func (t *clusteringCompactionTask) getMemoryBufferSize() int64 {
return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
}

func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.5)
func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.3)
}

func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.9)
}

func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) {
func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 {
return t.memoryBufferSize
}

func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand All @@ -631,63 +636,68 @@ func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) {
case <-t.done:
log.Info("clustering compaction task done")
return
case signal := <-t.spillChan:
case signal := <-t.flushChan:
var err error
if signal.buffer == nil {
err = t.spillLargestBuffers(ctx)
err = t.flushLargestBuffers(ctx)
} else {
err = func() error {
t.clusterBufferLocks.Lock(signal.buffer.id)
defer t.clusterBufferLocks.Unlock(signal.buffer.id)
return t.spill(ctx, signal.buffer)
return t.flushBinlog(ctx, signal.buffer)
}()
}
if err != nil {
log.Warn("fail to spill data", zap.Error(err))
log.Warn("fail to flushBinlog data", zap.Error(err))
// todo handle error
}
}
}
}

func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) error {
// only one spillLargestBuffers or spillAll should do at the same time
t.spillMutex.Lock()
defer t.spillMutex.Unlock()
func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error {
// only one flushLargestBuffers or flushAll should do at the same time
getLock := t.flushMutex.TryLock()
if !getLock {
return nil
}
defer t.flushMutex.Unlock()
bufferIDs := make([]int, 0)
for _, buffer := range t.clusterBuffers {
bufferIDs = append(bufferIDs, buffer.id)
}
sort.Slice(bufferIDs, func(i, j int) bool {
return t.clusterBuffers[i].writer.GetRowNum() > t.clusterBuffers[j].writer.GetRowNum()
return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load()
})
for index, bufferId := range bufferIDs {
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs))
for _, bufferId := range bufferIDs {
err := func() error {
t.clusterBufferLocks.Lock(bufferId)
defer t.clusterBufferLocks.Unlock(bufferId)
return t.spill(ctx, t.clusterBuffers[bufferId])
return t.flushBinlog(ctx, t.clusterBuffers[bufferId])
}()
if err != nil {
return err
}
if index >= len(bufferIDs) {
if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() {
log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize()))
break
}
}
return nil
}

func (t *clusteringCompactionTask) spillAll(ctx context.Context) error {
// only one spillLargestBuffers or spillAll should do at the same time
t.spillMutex.Lock()
defer t.spillMutex.Unlock()
func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
// only one flushLargestBuffers or flushAll should do at the same time
t.flushMutex.Lock()
defer t.flushMutex.Unlock()
for _, buffer := range t.clusterBuffers {
err := func() error {
t.clusterBufferLocks.Lock(buffer.id)
defer t.clusterBufferLocks.Unlock(buffer.id)
err := t.spill(ctx, buffer)
err := t.flushBinlog(ctx, buffer)
if err != nil {
log.Error("spill fail")
log.Error("flushBinlog fail")
return err
}
err = t.packBufferToSegment(ctx, buffer)
Expand All @@ -708,44 +718,56 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
for _, fieldBinlog := range buffer.flushedBinlogs {
insertLogs = append(insertLogs, fieldBinlog)
}
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum)
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load())
if err != nil {
return err
}

// pack current spill data into a segment
// pack current flushBinlog data into a segment
seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(),
SegmentID: buffer.writer.GetSegmentID(),
NumOfRows: buffer.flushedRowNum,
NumOfRows: buffer.flushedRowNum.Load(),
InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Channel: t.plan.GetChannel(),
}
buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(buffer.flushedRowNum),
NumRows: int(buffer.flushedRowNum.Load()),
}
buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats
// refresh
t.refreshBufferWriter(buffer)
buffer.flushedRowNum = 0
buffer.flushedRowNum.Store(0)
buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0)
log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
for _, binlog := range seg.InsertLogs {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String()))
}
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats))
return nil
}

func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuffer) error {
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()))
func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error {
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID()))
if buffer.writer.IsEmpty() {
return nil
}
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer)
if err != nil {
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))

future := t.flushPool.Submit(func() (any, error) {
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer)
if err != nil {
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))
return typeutil.NewPair(kvs, partialBinlogs), err
}
return typeutil.NewPair(kvs, partialBinlogs), nil
})
if err := conc.AwaitAll(future); err != nil {
return err
}
kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A
partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B

if err := t.binlogIO.Upload(ctx, kvs); err != nil {
log.Warn("compact wrong, failed to upload kvs", zap.Error(err))
Expand All @@ -760,14 +782,14 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf
}
buffer.flushedBinlogs[fID] = tmpBinlog
}
buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load()
buffer.flushedRowNum.Add(buffer.bufferRowNum.Load())

// clean buffer
buffer.bufferRowNum.Store(0)

t.spillCount.Inc()
log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load()))
if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
t.flushCount.Inc()
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()))
if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
return err
}
Expand Down Expand Up @@ -817,7 +839,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
}
future := t.pool.Submit(func() (any, error) {
future := t.mappingPool.Submit(func() (any, error) {
analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone)
mutex.Lock()
defer mutex.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (t *mixCompactionTask) merge(
unflushedRowCount++
remainingRowCount++

if (unflushedRowCount+1)%100 == 0 && writer.IsFull() {
if (unflushedRowCount+1)%100 == 0 && writer.FlushAndIsFull() {
serWriteStart := time.Now()
kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer)
if err != nil {
Expand All @@ -214,7 +214,7 @@ func (t *mixCompactionTask) merge(
}
}

if !writer.IsEmpty() {
if !writer.FlushAndIsEmpty() {
serWriteStart := time.Now()
kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer)
if err != nil {
Expand Down
Loading

0 comments on commit 380d3f4

Please sign in to comment.