From 1a248f2668f86ae740a7f45b970d38f5bb0d43e5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 12 Jul 2024 18:59:35 +0800 Subject: [PATCH] enhance: Add param item for segmentFlushInterval (#34629) See also #28817 Add paramitem for segment flush interval Signed-off-by: Congqi Xia --- configs/milvus.yaml | 1 + internal/datacoord/segment_allocation_policy.go | 5 ++--- internal/datacoord/segment_info.go | 3 ++- internal/datacoord/segment_manager_test.go | 2 +- pkg/util/paramtable/component_param.go | 10 ++++++++++ 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7196a666f643..8710e4f6e1ea 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -448,6 +448,7 @@ dataCoord: # MUST BE GREATER THAN OR EQUAL TO !!! # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. expansionRate: 1.25 + segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version enableCompaction: true # Enable data segment compaction compaction: diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 8ea149ae3fd1..a17a6deff199 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -194,12 +195,10 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) { type flushPolicy func(segment *SegmentInfo, t Timestamp) bool -const flushInterval = 2 * time.Second - func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool { return segment.GetState() == commonpb.SegmentState_Sealed && segment.Level != datapb.SegmentLevel_L0 && - time.Since(segment.lastFlushTime) >= flushInterval && + time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) && segment.GetLastExpireTime() <= t && segment.currRows != 0 && // Decoupling the importing segment from the flush process, diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 35ed52f0018b..8dcd183632b8 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation @@ -64,7 +65,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { SegmentInfo: info, currRows: info.GetNumOfRows(), allocations: make([]*Allocation, 0, 16), - lastFlushTime: time.Now().Add(-1 * flushInterval), + lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)), // A growing segment from recovery can be also considered idle. lastWrittenTime: getZeroTime(), } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index fefb7c62509d..418c52bd21bd 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -514,7 +514,7 @@ func TestGetFlushableSegments(t *testing.T) { assert.NoError(t, err) assert.Empty(t, ids) - meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-flushInterval)) + meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-1*paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second))) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) assert.NoError(t, err) assert.EqualValues(t, 1, len(ids)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 44b8c18c4875..0c9d2582235f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2855,6 +2855,7 @@ type dataCoordConfig struct { SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"` AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` + SegmentFlushInterval ParamItem `refreshable:"true"` // compaction EnableCompaction ParamItem `refreshable:"false"` @@ -3591,6 +3592,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.AutoUpgradeSegmentIndex.Init(base.mgr) + p.SegmentFlushInterval = ParamItem{ + Key: "dataCoord.segmentFlushInterval", + Version: "2.4.6", + DefaultValue: "2", + Doc: "the minimal interval duration(unit: Seconds) between flusing operation on same segment", + Export: true, + } + p.SegmentFlushInterval.Init(base.mgr) + p.FilesPerPreImportTask = ParamItem{ Key: "dataCoord.import.filesPerPreImportTask", Version: "2.4.0",