From dbad4ef2eeacbd3fc0579200e14c9625071fa2cc Mon Sep 17 00:00:00 2001 From: jaime Date: Thu, 13 Jun 2024 17:57:35 +0800 Subject: [PATCH] enhance: enable flush rate limiter of collection level Signed-off-by: jaime --- configs/milvus.yaml | 4 ++-- internal/proxy/simple_rate_limiter_test.go | 26 ++++++++++++++++------ pkg/util/paramtable/quota_param.go | 4 ++-- pkg/util/paramtable/quota_param_test.go | 4 ++-- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9bb5f2963e35..4beef0c6462d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -691,10 +691,10 @@ quotaAndLimits: db: max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex flushRate: - enabled: false + enabled: true max: -1 # qps, default no limit, rate for flush collection: - max: -1 # qps, default no limit, rate for flush at collection level. + max: 0.1 # qps, default no limit, rate for flush at collection level. db: max: -1 # qps of db level, default no limit, rate for flush compactionRate: diff --git a/internal/proxy/simple_rate_limiter_test.go b/internal/proxy/simple_rate_limiter_test.go index d9f555b4a87c..cbbe248dc046 100644 --- a/internal/proxy/simple_rate_limiter_test.go +++ b/internal/proxy/simple_rate_limiter_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/stretchr/testify/assert" @@ -116,6 +117,16 @@ func TestSimpleRateLimiter(t *testing.T) { } for _, rt := range internalpb.RateType_value { + if internalpb.RateType_DDLFlush == internalpb.RateType(rt) { + // the flush request has 0.1 rate limiter that means only allow to execute one request each 10 seconds. + time.Sleep(10 * time.Second) + err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1) + assert.ErrorIs(t, err, merr.ErrServiceRateLimit) + continue + } + if IsDDLRequest(internalpb.RateType(rt)) { err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) assert.NoError(t, err) @@ -123,14 +134,15 @@ func TestSimpleRateLimiter(t *testing.T) { assert.NoError(t, err) err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) - } else { - err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.NoError(t, err) - err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.NoError(t, err) - err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) - assert.ErrorIs(t, err, merr.ErrServiceRateLimit) + continue } + + err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1) + assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak) }) diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 3cc550e9e6ee..b68a236ecc94 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -330,7 +330,7 @@ seconds, (0 ~ 65536)`, p.FlushLimitEnabled = ParamItem{ Key: "quotaAndLimits.flushRate.enabled", Version: "2.2.0", - DefaultValue: "false", + DefaultValue: "true", Export: true, } p.FlushLimitEnabled.Init(base.mgr) @@ -376,7 +376,7 @@ seconds, (0 ~ 65536)`, p.MaxFlushRatePerCollection = ParamItem{ Key: "quotaAndLimits.flushRate.collection.max", Version: "2.3.9", - DefaultValue: "-1", + DefaultValue: "0.1", Formatter: func(v string) string { if !p.FlushLimitEnabled.GetAsBool() { return max diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 14f589a3480a..8fbf3ad1a260 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -41,8 +41,8 @@ func TestQuotaParam(t *testing.T) { t.Run("test functional params", func(t *testing.T) { assert.Equal(t, false, qc.IndexLimitEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.MaxIndexRate.GetAsFloat()) - assert.False(t, qc.FlushLimitEnabled.GetAsBool()) - assert.Equal(t, defaultMax, qc.MaxFlushRatePerCollection.GetAsFloat()) + assert.True(t, qc.FlushLimitEnabled.GetAsBool()) + assert.Equal(t, 0.1, qc.MaxFlushRatePerCollection.GetAsFloat()) assert.Equal(t, defaultMax, qc.MaxFlushRate.GetAsFloat()) assert.Equal(t, false, qc.CompactionLimitEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.MaxCompactionRate.GetAsFloat())