From 4d5f1e14161088a7bc2a7e982c724527fa33303e Mon Sep 17 00:00:00 2001 From: yangxuan Date: Wed, 26 Jun 2024 17:34:40 +0800 Subject: [PATCH 1/2] fix: DataNode might OOM by estimating based on MemorySize See also: #34136 pr: #34201 Signed-off-by: yangxuan --- configs/milvus.yaml | 3 +- internal/datanode/compaction/l0_compactor.go | 61 +++++++++++-------- .../datanode/compaction/l0_compactor_test.go | 30 +++++++++ pkg/util/paramtable/component_param.go | 12 +++- 4 files changed, 77 insertions(+), 29 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3b42e8a47e29..176d96cf64fb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -517,8 +517,6 @@ dataNode: # if this parameter <= 0, will set it as the maximum number of CPUs that can be executing # suggest to set it bigger on large collection numbers to avoid blocking workPoolSize: -1 - # specify the size of global work pool for channel checkpoint updating - # if this parameter <= 0, will set it as 10 updateChannelCheckpointMaxParallel: 10 updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call @@ -530,6 +528,7 @@ dataNode: readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. compaction: levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode + levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop ip: # if not specified, use the first unicastable address port: 21124 diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 12f4c41601fa..86c8f1b01cc0 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -132,24 +132,19 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error } var ( - totalSize int64 - totalDeltalogs = make(map[int64][]string) + memorySize int64 + totalDeltalogs = []string{} ) for _, s := range l0Segments { - paths := []string{} for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { - paths = append(paths, l.GetLogPath()) - totalSize += l.GetMemorySize() + totalDeltalogs = append(totalDeltalogs, l.GetLogPath()) + memorySize += l.GetMemorySize() } } - if len(paths) > 0 { - totalDeltalogs[s.GetSegmentID()] = paths - } } - batchSize := getMaxBatchSize(totalSize) - resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...) + resultSegments, err := t.process(ctx, memorySize, targetSegments, totalDeltalogs) if err != nil { return nil, err } @@ -169,15 +164,22 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error return result, nil } -// batch size means segment count -func getMaxBatchSize(totalSize int64) int { - max := 1 - memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() - if memLimit > float64(totalSize) { - max = int(memLimit / float64(totalSize)) +// BatchSize refers to the L1/L2 segments count that in one batch, batchSize controls the expansion ratio +// of deltadata in memory. +func getMaxBatchSize(baseMemSize, memLimit float64) int { + batchSize := 1 + if memLimit > baseMemSize { + batchSize = int(memLimit / baseMemSize) + } + + maxSizeLimit := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.GetAsInt() + // Set batch size to maxSizeLimit if it is larger than maxSizeLimit. + // When maxSizeLimit <= 0, it means no limit. + if maxSizeLimit > 0 && batchSize > maxSizeLimit { + return maxSizeLimit } - return max + return batchSize } func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { @@ -315,18 +317,15 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaDa return retMap } -func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { +func (t *LevelZeroCompactionTask) process(ctx context.Context, l0MemSize int64, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process") defer span.End() - results := make([]*datapb.CompactionSegment, 0) - batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) - log := log.Ctx(ctx).With( - zap.Int64("planID", t.plan.GetPlanID()), - zap.Int("max conc segment counts", batchSize), - zap.Int("total segment counts", len(targetSegments)), - zap.Int("total batch", batch), - ) + ratio := paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() + memLimit := float64(hardware.GetFreeMemoryCount()) * ratio + if float64(l0MemSize) > memLimit { + return nil, errors.Newf("L0 compaction failed, not enough memory, request memory size: %v, memory limit: %v", l0MemSize, memLimit) + } log.Info("L0 compaction process start") allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs)) @@ -335,6 +334,16 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return nil, err } + batchSize := getMaxBatchSize(float64(allDelta.Size()), memLimit) + batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) + log := log.Ctx(ctx).With( + zap.Int64("planID", t.plan.GetPlanID()), + zap.Int("max conc segment counts", batchSize), + zap.Int("total segment counts", len(targetSegments)), + zap.Int("total batch", batch), + ) + + results := make([]*datapb.CompactionSegment, 0) for i := 0; i < batch; i++ { left, right := i*batchSize, (i+1)*batchSize if right > len(targetSegments) { diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index e96fdd2fca5c..9cf69665c1ad 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -79,6 +79,36 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() { s.dBlob = blob.GetValue() } +func (s *LevelZeroCompactionTaskSuite) TestGetMaxBatchSize() { + tests := []struct { + baseMem float64 + memLimit float64 + batchSizeLimit string + + expected int + description string + }{ + {10, 100, "-1", 10, "no limitation on maxBatchSize"}, + {10, 100, "0", 10, "no limitation on maxBatchSize v2"}, + {10, 100, "11", 10, "maxBatchSize == 11"}, + {10, 100, "1", 1, "maxBatchSize == 1"}, + {10, 12, "-1", 1, "no limitation on maxBatchSize"}, + {10, 12, "100", 1, "maxBatchSize == 100"}, + } + + maxSizeK := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.Key + defer paramtable.Get().Reset(maxSizeK) + for _, test := range tests { + s.Run(test.description, func() { + paramtable.Get().Save(maxSizeK, test.batchSizeLimit) + defer paramtable.Get().Reset(maxSizeK) + + actual := getMaxBatchSize(test.baseMem, test.memLimit) + s.Equal(test.expected, actual) + }) + } +} + func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() { plan := &datapb.CompactionPlan{ PlanID: 19530, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c207a160d389..d7423aff4209 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3478,7 +3478,8 @@ type dataNodeConfig struct { ReadBufferSizeInMB ParamItem `refreshable:"true"` // Compaction - L0BatchMemoryRatio ParamItem `refreshable:"true"` + L0BatchMemoryRatio ParamItem `refreshable:"true"` + L0CompactionMaxBatchSize ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` @@ -3782,6 +3783,15 @@ if this parameter <= 0, will set it as 10`, } p.L0BatchMemoryRatio.Init(base.mgr) + p.L0CompactionMaxBatchSize = ParamItem{ + Key: "dataNode.compaction.levelZeroMaxBatchSize", + Version: "2.4.5", + Doc: "Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.", + DefaultValue: "-1", + Export: true, + } + p.L0CompactionMaxBatchSize.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataNode.gracefulStopTimeout", Version: "2.3.7", From 06752392a4e43aac8d72a77a10f275cb2624e473 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Thu, 27 Jun 2024 16:01:05 +0800 Subject: [PATCH 2/2] fix merge conflict in ut Signed-off-by: yangxuan --- internal/datanode/compaction/l0_compactor_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index 9cf69665c1ad..de6c05c4f1b9 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -286,8 +286,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { s.task.cm = cm s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Twice() - + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) s.Require().Equal(plan.GetPlanID(), s.task.GetPlanID())