Skip to content

Commit

Permalink
enhance: Add trace for bf cost in l0 compactor (#33860)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jun 20, 2024
1 parent f3d902c commit 31ef0a1
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func getMaxBatchSize(totalSize int64) int {
}

func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact serializeUpload")
defer span.End()
allBlobs := make(map[string][]byte)
results := make([]*datapb.CompactionSegment, 0)
for segID, writer := range segmentWriters {
Expand Down Expand Up @@ -221,7 +223,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr
return nil, nil
}

if err := t.Upload(ctx, allBlobs); err != nil {
if err := t.Upload(traceCtx, allBlobs); err != nil {
log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err))
return nil, err
}
Expand All @@ -234,7 +236,7 @@ func (t *LevelZeroCompactionTask) splitDelta(
allDelta []*storage.DeleteData,
segmentBfs map[int64]*metacache.BloomFilterSet,
) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) {
Expand All @@ -243,7 +245,7 @@ func (t *LevelZeroCompactionTask) splitDelta(

// spilt all delete data to segments

retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segmentBfs)
retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segmentBfs)

targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
retMap.Range(func(key int, value *BatchApplyRet) bool {
Expand Down Expand Up @@ -278,7 +280,9 @@ type BatchApplyRet = struct {
Segment2Hits map[int64][]bool
}

func (t *LevelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()

batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
Expand Down

0 comments on commit 31ef0a1

Please sign in to comment.