New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
api: add Rate-limit config update API #4843
Changes from 78 commits
e8102b4
b9bcfbf
1a93604
cefebc9
826ef7f
8f24203
176d73b
fdbf344
0d73c53
3318100
4e2a840
6d1c289
001cb93
713c8a8
1744bd6
456d504
049c4d6
1b183a0
900df00
133ab15
899e1f9
71bbf8c
79eb437
f9bd7ec
3a6be7c
124658c
cadaebb
6cd4621
464bc85
f57c923
afdbb90
c637fb0
273a970
c87f49b
b29445d
f18956e
6e0819c
1be372a
9988a7e
052e49c
1ee0dbc
c12a3a2
e682407
c3b65a9
24fa55b
c35fab6
3d87b64
4f93cac
9022724
db80b3c
df78088
76f347c
f640762
4a250d0
cc697ea
099e49a
40832d8
5799d37
f5af578
2653577
5c55e8b
0e4a60d
392ce5a
de4f4af
c332aae
0dc5dc1
39c0868
21664fc
cc6bd2b
2c7c24a
2d0f39f
c2f6f23
79ada21
7161d5c
c741126
1fedfbc
e9ee0e2
dd70f46
b15ac5a
2d9f124
97d1b9e
fa28dd4
aca84d7
d558575
3774d3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
// Copyright 2022 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package jsonutil | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
|
||
"github.com/tikv/pd/pkg/reflectutil" | ||
) | ||
|
||
// AddKeyValue is used to add a key value pair into `old` | ||
func AddKeyValue(old interface{}, key string, value interface{}) (updated bool, found bool, err error) { | ||
data, err := json.Marshal(map[string]interface{}{key: value}) | ||
if err != nil { | ||
return false, false, err | ||
} | ||
return MergeJSONObject(old, data) | ||
} | ||
|
||
// MergeJSONObject is used to merge a marshaled json object into v | ||
func MergeJSONObject(v interface{}, data []byte) (updated bool, found bool, err error) { | ||
old, _ := json.Marshal(v) | ||
if err := json.Unmarshal(data, v); err != nil { | ||
return false, false, err | ||
} | ||
new, _ := json.Marshal(v) | ||
if !bytes.Equal(old, new) { | ||
return true, true, nil | ||
} | ||
m := make(map[string]interface{}) | ||
if err := json.Unmarshal(data, &m); err != nil { | ||
return false, false, err | ||
} | ||
found = reflectutil.FindSameFieldByJSON(v, m) | ||
return false, found, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Copyright 2022 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package jsonutil | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type testJSONStructLevel1 struct { | ||
Name string `json:"name"` | ||
Sub1 testJSONStructLevel2 `json:"sub1"` | ||
Sub2 testJSONStructLevel2 `json:"sub2"` | ||
} | ||
|
||
type testJSONStructLevel2 struct { | ||
SubName string `json:"sub-name"` | ||
} | ||
|
||
func TestJSONUtil(t *testing.T) { | ||
t.Parallel() | ||
re := require.New(t) | ||
father := &testJSONStructLevel1{ | ||
Name: "father", | ||
} | ||
son1 := &testJSONStructLevel2{ | ||
SubName: "son1", | ||
} | ||
update, found, err := AddKeyValue(&father, "sub1", &son1) | ||
re.NoError(err) | ||
re.True(update) | ||
re.True(found) | ||
|
||
son2 := &testJSONStructLevel2{ | ||
SubName: "son2", | ||
} | ||
|
||
update, found, err = AddKeyValue(father, "sub2", &son2) | ||
re.NoError(err) | ||
re.True(update) | ||
re.True(found) | ||
|
||
update, found, err = AddKeyValue(father, "sub3", &son2) | ||
re.NoError(err) | ||
re.False(update) | ||
re.False(found) | ||
|
||
update, found, err = AddKeyValue(father, "sub2", &son2) | ||
re.NoError(err) | ||
re.False(update) | ||
re.True(found) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -23,6 +23,9 @@ import ( | |||||||||||
"strings" | ||||||||||||
|
||||||||||||
"github.com/pingcap/errors" | ||||||||||||
"github.com/tikv/pd/pkg/apiutil" | ||||||||||||
"github.com/tikv/pd/pkg/jsonutil" | ||||||||||||
"github.com/tikv/pd/pkg/ratelimit" | ||||||||||||
"github.com/tikv/pd/pkg/reflectutil" | ||||||||||||
"github.com/tikv/pd/server" | ||||||||||||
"github.com/tikv/pd/server/config" | ||||||||||||
|
@@ -107,18 +110,13 @@ func (h *serviceMiddlewareHandler) updateServiceMiddlewareConfig(cfg *config.Ser | |||||||||||
case "audit": | ||||||||||||
return h.updateAudit(cfg, kp[len(kp)-1], value) | ||||||||||||
case "rate-limit": | ||||||||||||
return h.updateRateLimit(cfg, kp[len(kp)-1], value) | ||||||||||||
return h.svr.UpdateRateLimit(&cfg.RateLimitConfig, kp[len(kp)-1], value) | ||||||||||||
} | ||||||||||||
return errors.Errorf("config prefix %s not found", kp[0]) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error { | ||||||||||||
data, err := json.Marshal(map[string]interface{}{key: value}) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
|
||||||||||||
updated, found, err := mergeConfig(&config.AuditConfig, data) | ||||||||||||
updated, found, err := jsonutil.AddKeyValue(&config.AuditConfig, key, value) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
|
@@ -133,23 +131,106 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC | |||||||||||
return err | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (h *serviceMiddlewareHandler) updateRateLimit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error { | ||||||||||||
data, err := json.Marshal(map[string]interface{}{key: value}) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
// @Tags service_middleware | ||||||||||||
// @Summary update ratelimit config | ||||||||||||
// @Param body body object string "json params" | ||||||||||||
// @Produce json | ||||||||||||
// @Success 200 {string} string "" | ||||||||||||
// @Failure 400 {string} string "" | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 500 is missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
// @Failure 500 {string} string "config item not found" | ||||||||||||
// @Router /service-middleware/rate-limit/config [POST] | ||||||||||||
func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *http.Request) { | ||||||||||||
var input map[string]interface{} | ||||||||||||
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { | ||||||||||||
return | ||||||||||||
} | ||||||||||||
|
||||||||||||
updated, found, err := mergeConfig(&config.RateLimitConfig, data) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
typeStr, ok := input["type"].(string) | ||||||||||||
if !ok { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "The type is empty.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
|
||||||||||||
if !found { | ||||||||||||
return errors.Errorf("config item %s not found", key) | ||||||||||||
var serviceLabel string | ||||||||||||
switch typeStr { | ||||||||||||
case "label": | ||||||||||||
serviceLabel, ok = input["label"].(string) | ||||||||||||
if !ok || len(serviceLabel) == 0 { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "The label is empty.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
if len(h.svr.GetServiceLabels(serviceLabel)) == 0 { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
case "path": | ||||||||||||
method, _ := input["method"].(string) | ||||||||||||
path, ok := input["path"].(string) | ||||||||||||
if !ok || len(path) == 0 { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "The path is empty.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
serviceLabel = h.svr.GetAPIAccessServiceLabel(apiutil.NewAccessPath(path, method)) | ||||||||||||
if len(serviceLabel) == 0 { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
default: | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "The type is invalid.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
|
||||||||||||
if updated { | ||||||||||||
err = h.svr.SetRateLimitConfig(config.RateLimitConfig) | ||||||||||||
if h.svr.IsInRateLimitAllowList(serviceLabel) { | ||||||||||||
h.rd.JSON(w, http.StatusBadRequest, "This service is in allow list whose config can not be changed.") | ||||||||||||
return | ||||||||||||
} | ||||||||||||
return err | ||||||||||||
cfg := h.svr.GetRateLimitConfig().LimiterConfig[serviceLabel] | ||||||||||||
// update concurrency limiter | ||||||||||||
concurrencyUpdatedFlag := "Concurrency limiter is not changed." | ||||||||||||
concurrencyFloat, okc := input["concurrency"].(float64) | ||||||||||||
if okc { | ||||||||||||
concurrency := uint64(concurrencyFloat) | ||||||||||||
cfg.ConcurrencyLimit = concurrency | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
// update qps rate limiter | ||||||||||||
qpsRateUpdatedFlag := "QPS rate limiter is not changed." | ||||||||||||
qps, okq := input["qps"].(float64) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if QPS is less than 0, does it belong to an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It means delete QPS limit |
||||||||||||
if okq { | ||||||||||||
brust := 0 | ||||||||||||
|
||||||||||||
if int(qps) > 1 { | ||||||||||||
brust = int(qps) | ||||||||||||
} else if qps > 0 { | ||||||||||||
brust = 1 | ||||||||||||
} | ||||||||||||
cfg.QPS = qps | ||||||||||||
cfg.QPSBurst = brust | ||||||||||||
} | ||||||||||||
if !okc && !okq { | ||||||||||||
h.rd.JSON(w, http.StatusOK, "No changed.") | ||||||||||||
} else { | ||||||||||||
status := h.svr.UpdateServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg)) | ||||||||||||
switch { | ||||||||||||
case status&ratelimit.QPSChanged != 0: | ||||||||||||
qpsRateUpdatedFlag = "QPS rate limiter is changed." | ||||||||||||
case status&ratelimit.QPSDeleted != 0: | ||||||||||||
qpsRateUpdatedFlag = "QPS rate limiter is deleted." | ||||||||||||
} | ||||||||||||
switch { | ||||||||||||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
case status&ratelimit.ConcurrencyChanged != 0: | ||||||||||||
concurrencyUpdatedFlag = "Concurrency limiter is changed." | ||||||||||||
case status&ratelimit.ConcurrencyDeleted != 0: | ||||||||||||
concurrencyUpdatedFlag = "Concurrency limiter is deleted." | ||||||||||||
} | ||||||||||||
err := h.svr.UpdateRateLimitConfig("limiter-config", serviceLabel, cfg) | ||||||||||||
if err != nil { | ||||||||||||
h.rd.JSON(w, http.StatusInternalServerError, err.Error()) | ||||||||||||
} else { | ||||||||||||
result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().RateLimitConfig.LimiterConfig} | ||||||||||||
h.rd.JSON(w, http.StatusOK, result) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
type rateLimitResult struct { | ||||||||||||
ConcurrencyUpdatedFlag string `json:"Concurrency"` | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use small letter? |
||||||||||||
QpsRateUpdatedFlag string `json:"qps"` | ||||||||||||
LimiterConfig map[string]ratelimit.DimensionConfig `json:"new-config"` | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/service-middleware/rate-limit/config
is used to update Incrementally. If users want to overwrite config, we can use/service-middleware/config