From 7f97a5fa68f049f6c8ec95715eb7b68719243535 Mon Sep 17 00:00:00 2001 From: jaime Date: Thu, 13 Jun 2024 17:57:35 +0800 Subject: [PATCH] enhance: enable flush rate limiter of collection level Signed-off-by: jaime --- configs/milvus.yaml | 4 ++-- internal/proxy/simple_rate_limiter_test.go | 26 ++++++++++++++++------ pkg/util/paramtable/quota_param.go | 4 ++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d6415e1a76208..1513328cd0014 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -654,10 +654,10 @@ quotaAndLimits: db: max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex flushRate: - enabled: false + enabled: true max: -1 # qps, default no limit, rate for flush collection: - max: -1 # qps, default no limit, rate for flush at collection level. + max: 0.1 # qps, default no limit, rate for flush at collection level. db: max: -1 # qps of db level, default no limit, rate for flush compactionRate: diff --git a/internal/proxy/simple_rate_limiter_test.go b/internal/proxy/simple_rate_limiter_test.go index d9f555b4a87ce..cbbe248dc0465 100644 --- a/internal/proxy/simple_rate_limiter_test.go +++ b/internal/proxy/simple_rate_limiter_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/stretchr/testify/assert" @@ -116,6 +117,16 @@ func TestSimpleRateLimiter(t *testing.T) { } for _, rt := range internalpb.RateType_value { + if internalpb.RateType_DDLFlush == internalpb.RateType(rt) { + // the flush request has 0.1 rate limiter that means only allow to execute one request each 10 seconds. + time.Sleep(10 * time.Second) + err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1) + assert.ErrorIs(t, err, merr.ErrServiceRateLimit) + continue + } + if IsDDLRequest(internalpb.RateType(rt)) { err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) assert.NoError(t, err) @@ -123,14 +134,15 @@ func TestSimpleRateLimiter(t *testing.T) { assert.NoError(t, err) err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) - } else { - err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.NoError(t, err) - err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.NoError(t, err) - err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.ErrorIs(t, err, merr.ErrServiceRateLimit) + continue } + + err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak) }) diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 3cc550e9e6eee..b68a236ecc94b 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -330,7 +330,7 @@ seconds, (0 ~ 65536)`, p.FlushLimitEnabled = ParamItem{ Key: "quotaAndLimits.flushRate.enabled", Version: "2.2.0", - DefaultValue: "false", + DefaultValue: "true", Export: true, } p.FlushLimitEnabled.Init(base.mgr) @@ -376,7 +376,7 @@ seconds, (0 ~ 65536)`, p.MaxFlushRatePerCollection = ParamItem{ Key: "quotaAndLimits.flushRate.collection.max", Version: "2.3.9", - DefaultValue: "-1", + DefaultValue: "0.1", Formatter: func(v string) string { if !p.FlushLimitEnabled.GetAsBool() { return max