diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index c7fd895e7642..713af63997ac 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -61,19 +61,21 @@ type clusteringCompactionTask struct { binlogIO io.BinlogIO allocator allocator.Allocator - ctx context.Context - cancel context.CancelFunc - done chan struct{} - tr *timerecord.TimeRecorder + ctx context.Context + cancel context.CancelFunc + done chan struct{} + tr *timerecord.TimeRecorder + mappingPool *conc.Pool[any] + flushPool *conc.Pool[any] plan *datapb.CompactionPlan - // schedule - spillChan chan SpillSignal - pool *conc.Pool[any] + // flush + flushMutex sync.Mutex + flushCount *atomic.Int64 + flushChan chan SpillSignal // metrics - spillCount *atomic.Int64 writtenRowNum *atomic.Int64 // inner field @@ -84,7 +86,6 @@ type clusteringCompactionTask struct { clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema - spillMutex sync.Mutex memoryBufferSize int64 clusterBuffers []*ClusterBuffer clusterBufferLocks *lock.KeyLock[int] @@ -101,7 +102,7 @@ type ClusterBuffer struct { writer *SegmentWriter bufferRowNum atomic.Int64 - flushedRowNum int64 + flushedRowNum atomic.Int64 flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog uploadedSegments []*datapb.CompactionSegment @@ -129,10 +130,10 @@ func NewClusteringCompactionTask( plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), - spillChan: make(chan SpillSignal, 100), + flushChan: make(chan SpillSignal, 100), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), - spillCount: atomic.NewInt64(0), + flushCount: atomic.NewInt64(0), writtenRowNum: atomic.NewInt64(0), } } @@ -179,7 +180,8 @@ func (t *clusteringCompactionTask) init() error { t.currentTs = tsoutil.GetCurrentTime() t.memoryBufferSize = t.getMemoryBufferSize() workerPoolSize := t.getWorkerPoolSize() - t.pool = conc.NewPool[any](workerPoolSize) + t.mappingPool = conc.NewPool[any](workerPoolSize) + t.flushPool = conc.NewPool[any](workerPoolSize) log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize)) return nil } @@ -253,7 +255,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro metrics.DataNodeCompactionLatency. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). Observe(float64(t.tr.ElapseSpan().Milliseconds())) - log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan())) + log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load())) return planResult, nil } @@ -345,8 +347,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, inputSegments := t.plan.GetSegmentBinlogs() mapStart := time.Now() - // start spill goroutine - go t.backgroundSpill(ctx) + // start flush goroutine + go t.backgroundFlush(ctx) futures := make([]*conc.Future[any], 0, len(inputSegments)) for _, segment := range inputSegments { @@ -355,7 +357,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, // only FieldBinlogs needed FieldBinlogs: segment.FieldBinlogs, } - future := t.pool.Submit(func() (any, error) { + future := t.mappingPool.Submit(func() (any, error) { err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts) return struct{}{}, err }) @@ -365,8 +367,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } - // force spill all buffers - err := t.spillAll(ctx) + // force flush all buffers + err := t.flushAll(ctx) if err != nil { return nil, nil, err } @@ -405,7 +407,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return resultSegments, resultPartitionStats, nil } -func (t *clusteringCompactionTask) getWrittenMemoryBufferSize() int64 { +func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) @@ -539,22 +541,21 @@ func (t *clusteringCompactionTask) mappingSegment( } remained++ - // currentSize := t.totalBufferSize.Load() - if (remained+1)%20 == 0 { - currentBufferSize := t.getWrittenMemoryBufferSize() - // trigger spill - if clusterBuffer.bufferRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { + if (remained+1)%100 == 0 { + currentBufferSize := t.getUsedMemoryBufferSize() + // trigger flushBinlog + if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { // reach segment/binlog max size - t.spillChan <- SpillSignal{ + t.flushChan <- SpillSignal{ buffer: clusterBuffer, } - } else if currentBufferSize >= t.getMemoryBufferMiddleWatermark() { - // reach spill trigger threshold - t.spillChan <- SpillSignal{} + } else if currentBufferSize >= t.getMemoryBufferHighWatermark() { + // reach flushBinlog trigger threshold + t.flushChan <- SpillSignal{} } - // if the total buffer size is too large, block here, wait for memory release by spill - if currentBufferSize > t.getMemoryBufferHighWatermark() { + // if the total buffer size is too large, block here, wait for memory release by flushBinlog + if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() { loop: for { select { @@ -565,8 +566,8 @@ func (t *clusteringCompactionTask) mappingSegment( log.Warn("stop waiting for memory buffer release as task chan done") return nil default: - currentSize := t.getWrittenMemoryBufferSize() - if currentSize < t.getMemoryBufferMiddleWatermark() { + currentSize := t.getUsedMemoryBufferSize() + if currentSize < t.getMemoryBufferLowWatermark() { break loop } time.Sleep(time.Millisecond * 200) @@ -614,15 +615,19 @@ func (t *clusteringCompactionTask) getMemoryBufferSize() int64 { return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) } -func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 { - return int64(float64(t.memoryBufferSize) * 0.5) +func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 { + return int64(float64(t.memoryBufferSize) * 0.3) } func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryBufferSize) * 0.9) } -func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) { +func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 { + return t.memoryBufferSize +} + +func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) { for { select { case <-ctx.Done(): @@ -631,63 +636,68 @@ func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) { case <-t.done: log.Info("clustering compaction task done") return - case signal := <-t.spillChan: + case signal := <-t.flushChan: var err error if signal.buffer == nil { - err = t.spillLargestBuffers(ctx) + err = t.flushLargestBuffers(ctx) } else { err = func() error { t.clusterBufferLocks.Lock(signal.buffer.id) defer t.clusterBufferLocks.Unlock(signal.buffer.id) - return t.spill(ctx, signal.buffer) + return t.flushBinlog(ctx, signal.buffer) }() } if err != nil { - log.Warn("fail to spill data", zap.Error(err)) + log.Warn("fail to flushBinlog data", zap.Error(err)) // todo handle error } } } } -func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) error { - // only one spillLargestBuffers or spillAll should do at the same time - t.spillMutex.Lock() - defer t.spillMutex.Unlock() +func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error { + // only one flushLargestBuffers or flushAll should do at the same time + getLock := t.flushMutex.TryLock() + if !getLock { + return nil + } + defer t.flushMutex.Unlock() bufferIDs := make([]int, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) } sort.Slice(bufferIDs, func(i, j int) bool { - return t.clusterBuffers[i].writer.GetRowNum() > t.clusterBuffers[j].writer.GetRowNum() + return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load() }) - for index, bufferId := range bufferIDs { + log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs)) + for _, bufferId := range bufferIDs { err := func() error { t.clusterBufferLocks.Lock(bufferId) defer t.clusterBufferLocks.Unlock(bufferId) - return t.spill(ctx, t.clusterBuffers[bufferId]) + return t.flushBinlog(ctx, t.clusterBuffers[bufferId]) }() if err != nil { return err } - if index >= len(bufferIDs) { + if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() { + log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize())) break } } return nil } -func (t *clusteringCompactionTask) spillAll(ctx context.Context) error { - // only one spillLargestBuffers or spillAll should do at the same time - t.spillMutex.Lock() - defer t.spillMutex.Unlock() +func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { + // only one flushLargestBuffers or flushAll should do at the same time + t.flushMutex.Lock() + defer t.flushMutex.Unlock() for _, buffer := range t.clusterBuffers { err := func() error { t.clusterBufferLocks.Lock(buffer.id) defer t.clusterBufferLocks.Unlock(buffer.id) - err := t.spill(ctx, buffer) + err := t.flushBinlog(ctx, buffer) if err != nil { - log.Error("spill fail") + log.Error("flushBinlog fail") return err } err = t.packBufferToSegment(ctx, buffer) @@ -708,16 +718,16 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum) + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load()) if err != nil { return err } - // pack current spill data into a segment + // pack current flushBinlog data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), SegmentID: buffer.writer.GetSegmentID(), - NumOfRows: buffer.flushedRowNum, + NumOfRows: buffer.flushedRowNum.Load(), InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, Channel: t.plan.GetChannel(), @@ -725,27 +735,39 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff buffer.uploadedSegments = append(buffer.uploadedSegments, seg) segmentStats := storage.SegmentStats{ FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, - NumRows: int(buffer.flushedRowNum), + NumRows: int(buffer.flushedRowNum.Load()), } buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats // refresh t.refreshBufferWriter(buffer) - buffer.flushedRowNum = 0 + buffer.flushedRowNum.Store(0) buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0) - log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) + for _, binlog := range seg.InsertLogs { + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String())) + } + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats)) return nil } -func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuffer) error { - log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load())) +func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error { + log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID())) if buffer.writer.IsEmpty() { return nil } - kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) - if err != nil { - log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) + + future := t.flushPool.Submit(func() (any, error) { + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) + if err != nil { + log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) + return typeutil.NewPair(kvs, partialBinlogs), err + } + return typeutil.NewPair(kvs, partialBinlogs), nil + }) + if err := conc.AwaitAll(future); err != nil { return err } + kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A + partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) @@ -760,14 +782,14 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf } buffer.flushedBinlogs[fID] = tmpBinlog } - buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load() + buffer.flushedRowNum.Add(buffer.bufferRowNum.Load()) // clean buffer buffer.bufferRowNum.Store(0) - t.spillCount.Inc() - log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load())) - if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() { + t.flushCount.Inc() + log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load())) + if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() { if err := t.packBufferToSegment(ctx, buffer); err != nil { return err } @@ -817,7 +839,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter CollectionID: segment.CollectionID, PartitionID: segment.PartitionID, } - future := t.pool.Submit(func() (any, error) { + future := t.mappingPool.Submit(func() (any, error) { analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone) mutex.Lock() defer mutex.Unlock() diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index b647584f82b3..f46ef87724da 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -193,7 +193,7 @@ func (t *mixCompactionTask) merge( unflushedRowCount++ remainingRowCount++ - if (unflushedRowCount+1)%100 == 0 && writer.IsFull() { + if (unflushedRowCount+1)%100 == 0 && writer.FlushAndIsFull() { serWriteStart := time.Now() kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) if err != nil { @@ -214,7 +214,7 @@ func (t *mixCompactionTask) merge( } } - if !writer.IsEmpty() { + if !writer.FlushAndIsEmpty() { serWriteStart := time.Now() kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) if err != nil { diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index be4265440343..c4fb51c8c80d 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -150,11 +150,19 @@ func (w *SegmentWriter) Finish(actualRowCount int64) (*storage.Blob, error) { } func (w *SegmentWriter) IsFull() bool { + return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() +} + +func (w *SegmentWriter) FlushAndIsFull() bool { w.writer.Flush() return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() } func (w *SegmentWriter) IsEmpty() bool { + return w.writer.WrittenMemorySize() == 0 +} + +func (w *SegmentWriter) FlushAndIsEmpty() bool { w.writer.Flush() return w.writer.WrittenMemorySize() == 0 } diff --git a/internal/storage/serde.go b/internal/storage/serde.go index a329af470b08..2f243aa166ef 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "math" + "sync" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" @@ -678,6 +679,7 @@ type SerializeWriter[T any] struct { rw RecordWriter serializer Serializer[T] batchSize int + mu sync.Mutex buffer []T pos int @@ -685,6 +687,8 @@ type SerializeWriter[T any] struct { } func (sw *SerializeWriter[T]) Flush() error { + sw.mu.Lock() + defer sw.mu.Unlock() if sw.pos == 0 { return nil }