Skip to content

Commit

Permalink
enhance: enable flush rate limiter of collection level
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed Jun 14, 2024
1 parent c6a1c49 commit 7f97a5f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
4 changes: 2 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ quotaAndLimits:
db:
max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex
flushRate:
enabled: false
enabled: true
max: -1 # qps, default no limit, rate for flush
collection:
max: -1 # qps, default no limit, rate for flush at collection level.
max: 0.1 # qps, default no limit, rate for flush at collection level.
db:
max: -1 # qps of db level, default no limit, rate for flush
compactionRate:
Expand Down
26 changes: 19 additions & 7 deletions internal/proxy/simple_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -116,21 +117,32 @@ func TestSimpleRateLimiter(t *testing.T) {
}

for _, rt := range internalpb.RateType_value {
if internalpb.RateType_DDLFlush == internalpb.RateType(rt) {
// the flush request has 0.1 rate limiter that means only allow to execute one request each 10 seconds.
time.Sleep(10 * time.Second)
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
continue
}

if IsDDLRequest(internalpb.RateType(rt)) {
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
} else {
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
continue
}

err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
}
Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/paramtable/quota_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ seconds, (0 ~ 65536)`,
p.FlushLimitEnabled = ParamItem{
Key: "quotaAndLimits.flushRate.enabled",
Version: "2.2.0",
DefaultValue: "false",
DefaultValue: "true",
Export: true,
}
p.FlushLimitEnabled.Init(base.mgr)
Expand Down Expand Up @@ -376,7 +376,7 @@ seconds, (0 ~ 65536)`,
p.MaxFlushRatePerCollection = ParamItem{
Key: "quotaAndLimits.flushRate.collection.max",
Version: "2.3.9",
DefaultValue: "-1",
DefaultValue: "0.1",
Formatter: func(v string) string {
if !p.FlushLimitEnabled.GetAsBool() {
return max
Expand Down

0 comments on commit 7f97a5f

Please sign in to comment.