Skip to content

Commit

Permalink
enhance: Add param item for segmentFlushInterval (#34629)
Browse files Browse the repository at this point in the history
See also #28817

Add paramitem for segment flush interval

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jul 12, 2024
1 parent d8e68cb commit 1a248f2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 5 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ dataCoord:
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
expansionRate: 1.25
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
enableCompaction: true # Enable data segment compaction
compaction:
Expand Down
5 changes: 2 additions & 3 deletions internal/datacoord/segment_allocation_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -194,12 +195,10 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) {

type flushPolicy func(segment *SegmentInfo, t Timestamp) bool

const flushInterval = 2 * time.Second

func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed &&
segment.Level != datapb.SegmentLevel_L0 &&
time.Since(segment.lastFlushTime) >= flushInterval &&
time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) &&
segment.GetLastExpireTime() <= t &&
segment.currRows != 0 &&
// Decoupling the importing segment from the flush process,
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
Expand Down Expand Up @@ -64,7 +65,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
SegmentInfo: info,
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)),
// A growing segment from recovery can be also considered idle.
lastWrittenTime: getZeroTime(),
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/segment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, ids)

meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-flushInterval))
meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-1*paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)))
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(ids))
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,7 @@ type dataCoordConfig struct {
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`

// compaction
EnableCompaction ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -3591,6 +3592,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.AutoUpgradeSegmentIndex.Init(base.mgr)

p.SegmentFlushInterval = ParamItem{
Key: "dataCoord.segmentFlushInterval",
Version: "2.4.6",
DefaultValue: "2",
Doc: "the minimal interval duration(unit: Seconds) between flusing operation on same segment",
Export: true,
}
p.SegmentFlushInterval.Init(base.mgr)

p.FilesPerPreImportTask = ParamItem{
Key: "dataCoord.import.filesPerPreImportTask",
Version: "2.4.0",
Expand Down

0 comments on commit 1a248f2

Please sign in to comment.