Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DataNode might OOM by estimating based on MemorySize #34203

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,6 @@ dataNode:
# if this parameter <= 0, will set it as the maximum number of CPUs that can be executing
# suggest to set it bigger on large collection numbers to avoid blocking
workPoolSize: -1
# specify the size of global work pool for channel checkpoint updating
# if this parameter <= 0, will set it as 10
updateChannelCheckpointMaxParallel: 10
updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel
updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call
Expand All @@ -531,6 +529,7 @@ dataNode:
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 21124
Expand Down
61 changes: 35 additions & 26 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,19 @@
}

var (
totalSize int64
totalDeltalogs = make(map[int64][]string)
memorySize int64
totalDeltalogs = []string{}
)
for _, s := range l0Segments {
paths := []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
paths = append(paths, l.GetLogPath())
totalSize += l.GetMemorySize()
totalDeltalogs = append(totalDeltalogs, l.GetLogPath())
memorySize += l.GetMemorySize()
}
}
if len(paths) > 0 {
totalDeltalogs[s.GetSegmentID()] = paths
}
}

batchSize := getMaxBatchSize(totalSize)
resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...)
resultSegments, err := t.process(ctx, memorySize, targetSegments, totalDeltalogs)
if err != nil {
return nil, err
}
Expand All @@ -169,15 +164,22 @@
return result, nil
}

// batch size means segment count
func getMaxBatchSize(totalSize int64) int {
max := 1
memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
if memLimit > float64(totalSize) {
max = int(memLimit / float64(totalSize))
// BatchSize refers to the L1/L2 segments count that in one batch, batchSize controls the expansion ratio
// of deltadata in memory.
func getMaxBatchSize(baseMemSize, memLimit float64) int {
batchSize := 1
if memLimit > baseMemSize {
batchSize = int(memLimit / baseMemSize)
}

maxSizeLimit := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.GetAsInt()
// Set batch size to maxSizeLimit if it is larger than maxSizeLimit.
// When maxSizeLimit <= 0, it means no limit.
if maxSizeLimit > 0 && batchSize > maxSizeLimit {
return maxSizeLimit
}

return max
return batchSize
}

func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
Expand Down Expand Up @@ -315,18 +317,15 @@
return retMap
}

func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
func (t *LevelZeroCompactionTask) process(ctx context.Context, l0MemSize int64, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()

results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)
ratio := paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
memLimit := float64(hardware.GetFreeMemoryCount()) * ratio
if float64(l0MemSize) > memLimit {
return nil, errors.Newf("L0 compaction failed, not enough memory, request memory size: %v, memory limit: %v", l0MemSize, memLimit)

Check warning on line 327 in internal/datanode/compaction/l0_compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compaction/l0_compactor.go#L327

Added line #L327 was not covered by tests
}

log.Info("L0 compaction process start")
allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
Expand All @@ -335,6 +334,16 @@
return nil, err
}

batchSize := getMaxBatchSize(float64(allDelta.Size()), memLimit)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)

results := make([]*datapb.CompactionSegment, 0)
for i := 0; i < batch; i++ {
left, right := i*batchSize, (i+1)*batchSize
if right > len(targetSegments) {
Expand Down
33 changes: 31 additions & 2 deletions internal/datanode/compaction/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.dBlob = blob.GetValue()
}

func (s *LevelZeroCompactionTaskSuite) TestGetMaxBatchSize() {
tests := []struct {
baseMem float64
memLimit float64
batchSizeLimit string

expected int
description string
}{
{10, 100, "-1", 10, "no limitation on maxBatchSize"},
{10, 100, "0", 10, "no limitation on maxBatchSize v2"},
{10, 100, "11", 10, "maxBatchSize == 11"},
{10, 100, "1", 1, "maxBatchSize == 1"},
{10, 12, "-1", 1, "no limitation on maxBatchSize"},
{10, 12, "100", 1, "maxBatchSize == 100"},
}

maxSizeK := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.Key
defer paramtable.Get().Reset(maxSizeK)
for _, test := range tests {
s.Run(test.description, func() {
paramtable.Get().Save(maxSizeK, test.batchSizeLimit)
defer paramtable.Get().Reset(maxSizeK)

actual := getMaxBatchSize(test.baseMem, test.memLimit)
s.Equal(test.expected, actual)
})
}
}

func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Expand Down Expand Up @@ -256,8 +286,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
s.task.cm = cm

s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Twice()

s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)

s.Require().Equal(plan.GetPlanID(), s.task.GetPlanID())
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,8 @@ type dataNodeConfig struct {
ReadBufferSizeInMB ParamItem `refreshable:"true"`

// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0CompactionMaxBatchSize ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3801,6 +3802,15 @@ if this parameter <= 0, will set it as 10`,
}
p.L0BatchMemoryRatio.Init(base.mgr)

p.L0CompactionMaxBatchSize = ParamItem{
Key: "dataNode.compaction.levelZeroMaxBatchSize",
Version: "2.4.5",
Doc: "Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.",
DefaultValue: "-1",
Export: true,
}
p.L0CompactionMaxBatchSize.Init(base.mgr)

p.GracefulStopTimeout = ParamItem{
Key: "dataNode.gracefulStopTimeout",
Version: "2.3.7",
Expand Down
Loading