Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

placement: support batch deletions, with insertions #2699

Merged
merged 14 commits into from Jul 30, 2020
1 change: 1 addition & 0 deletions server/api/router.go
Expand Up @@ -92,6 +92,7 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.R
rulesHandler := newRulesHandler(svr, rd)
clusterRouter.HandleFunc("/config/rules", rulesHandler.GetAll).Methods("GET")
clusterRouter.HandleFunc("/config/rules", rulesHandler.SetAll).Methods("POST")
clusterRouter.HandleFunc("/config/rules/batch", rulesHandler.Batch).Methods("POST")
clusterRouter.HandleFunc("/config/rules/group/{group}", rulesHandler.GetAllByGroup).Methods("GET")
clusterRouter.HandleFunc("/config/rules/region/{region}", rulesHandler.GetAllByRegion).Methods("GET")
clusterRouter.HandleFunc("/config/rules/key/{key}", rulesHandler.GetAllByKey).Methods("GET")
Expand Down
37 changes: 36 additions & 1 deletion server/api/rule.go
Expand Up @@ -61,7 +61,7 @@ func (h *ruleHandler) GetAll(w http.ResponseWriter, r *http.Request) {
}

// @Tags rule
// @Summary Set all rules for the cluster.
// @Summary Set all rules for the cluster. If there is an error, modifications are promised to be rollback in memory, but may fail to rollback disk. You propabably want to request again to make rules in memory/disk consistent.
// @Produce json
// @Param rules body []placement.Rule true "Parameters of rules"
// @Success 200 {string} string "Update rules successfully."
Expand Down Expand Up @@ -292,3 +292,38 @@ func (h *ruleHandler) Delete(w http.ResponseWriter, r *http.Request) {

h.rd.JSON(w, http.StatusOK, "Delete rule successfully.")
}

// @Tags rule
// @Summary Batch operations for the cluster. Operations should be independent(differnt ID). If there is an error, modifications are promised to be rollback in memory, but may fail to rollback disk. You propabably want to request again to make rules in memory/disk consistent.
// @Produce json
// @Param operations body []placement.Batch true "Parameters of rule operations"
// @Success 200 {string} string "Batch operations successfully."
// @Failure 400 {string} string "The input is invalid."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules/batch [post]
func (h *ruleHandler) Batch(w http.ResponseWriter, r *http.Request) {
cluster := getCluster(r.Context())
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
var opts []placement.Batch
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &opts); err != nil {
return
}
for _, opt := range opts {
switch opt.Action {
case placement.BatchAdd:
if err := h.checkRule(opt.Rule); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}
}
if err := cluster.GetRuleManager().Batch(opts); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, "Batch operations successfully.")
}
119 changes: 119 additions & 0 deletions server/api/rule_test.go
Expand Up @@ -496,3 +496,122 @@ func compareRule(c *C, r1 *placement.Rule, r2 *placement.Rule) {
c.Assert(r1.Role, Equals, r2.Role)
c.Assert(r1.Count, Equals, r2.Count)
}

func (s *testRuleSuite) TestBatch(c *C) {
opt1 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "a", ID: "13", StartKeyHex: "1111", EndKeyHex: "3333", Role: "voter", Count: 1},
}
opt2 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "b", ID: "13", StartKeyHex: "1111", EndKeyHex: "3333", Role: "voter", Count: 1},
}
opt3 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "a", ID: "14", StartKeyHex: "1111", EndKeyHex: "3333", Role: "voter", Count: 1},
}
opt4 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "a", ID: "15", StartKeyHex: "1111", EndKeyHex: "3333", Role: "voter", Count: 1},
}
opt5 := placement.Batch{
Action: placement.BatchDel,
Rule: &placement.Rule{GroupID: "a", ID: "14"},
}
opt6 := placement.Batch{
Action: placement.BatchDel,
Rule: &placement.Rule{GroupID: "b", ID: "1"},
DeleteByIDPrefix: true,
}
opt7 := placement.Batch{
Action: placement.BatchDel,
Rule: &placement.Rule{GroupID: "a", ID: "1"},
}
opt8 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "a", ID: "16", StartKeyHex: "XXXX", EndKeyHex: "3333", Role: "voter", Count: 1},
}
opt9 := placement.Batch{
Action: placement.BatchAdd,
Rule: &placement.Rule{GroupID: "a", ID: "17", StartKeyHex: "1111", EndKeyHex: "3333", Role: "voter", Count: -1},
}

successData1, err := json.Marshal([]placement.Batch{opt1, opt2, opt3})
c.Assert(err, IsNil)

successData2, err := json.Marshal([]placement.Batch{opt5, opt7})
c.Assert(err, IsNil)

successData3, err := json.Marshal([]placement.Batch{opt4, opt6})
c.Assert(err, IsNil)

checkErrData, err := json.Marshal([]placement.Batch{opt8})
c.Assert(err, IsNil)

setErrData, err := json.Marshal([]placement.Batch{opt9})
c.Assert(err, IsNil)

testcases := []struct {
name string
rawData []byte
success bool
response string
}{
{
name: "Batch adds successfully",
rawData: successData1,
success: true,
response: "",
},
{
name: "Batch removes successfully",
rawData: successData2,
success: true,
response: "",
},
{
name: "Batch add and remove successfully",
rawData: successData3,
success: true,
response: "",
},
{
name: "Parse Json failed",
rawData: []byte("foo"),
success: false,
response: `{
"code": "input",
"msg": "invalid character 'o' in literal false (expecting 'a')",
"data": {
"Offset": 2
}
}
`,
},
{
name: "Check rule failed",
rawData: checkErrData,
success: false,
response: `"start key is not in hex format: encoding/hex: invalid byte: U+0058 'X'"
`,
},
{
name: "Set Rule Failed",
rawData: setErrData,
success: false,
response: `"invalid count -1"
`,
},
}

for _, testcase := range testcases {
c.Log(testcase.name)
err := postJSON(testDialClient, s.urlPrefix+"/rules/batch", testcase.rawData)
if testcase.success {
c.Assert(err, IsNil)
} else {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, testcase.response)
}
}
}
131 changes: 114 additions & 17 deletions server/schedule/placement/rule_manager.go
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/pingcap/log"
Expand Down Expand Up @@ -275,10 +276,51 @@ func (m *RuleManager) FitRegion(stores StoreSet, region *core.RegionInfo) *Regio
return FitRegion(stores, region, rules)
}

func (m *RuleManager) swapRule(rule *Rule) *Rule {
func (m *RuleManager) tryBuildSave(oldRules map[[2]string]*Rule) error {
ruleList, err := buildRuleList(m.rules)
if err == nil {
for key := range oldRules {
rule := m.rules[key]
if rule != nil {
err = m.store.SaveRule(rule.StoreKey(), rule)
} else {
r := Rule{
GroupID: key[0],
ID: key[1],
}
err = m.store.DeleteRule(r.StoreKey())
}
if err != nil {
// TODO: it is not completely safe
// 1. in case that half of rules applied, error.. we have to cancel persisted rules
// but that may fail too, causing memory/disk inconsistency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is admirable for you to notice this issue! I think we can add some note in the API annotation.

// either rely a transaction API, or clients to request again until success
// 2. in case that PD is suddenly down in the loop, inconsistency again
// now we can only rely clients to request again
break
Comment on lines +293 to +300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a transaction API is needed, please create issue to track this (ignore me if already existed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unexpected. Anyway, I think it is enough to use my old issue to track it.

}
}
}

if err != nil {
for key, rule := range oldRules {
if rule == nil {
delete(m.rules, key)
} else {
m.rules[key] = rule
}
}
return err
}

m.ruleList = ruleList
return nil
}

func (m *RuleManager) addRule(rule *Rule, oldRules map[[2]string]*Rule) {
old := m.rules[rule.Key()]
m.rules[rule.Key()] = rule
return old
oldRules[rule.Key()] = old
}

// SetRules inserts or updates lots of Rules at once.
Expand All @@ -296,31 +338,86 @@ func (m *RuleManager) SetRules(rules []*Rule) error {
oldRules := make(map[[2]string]*Rule)

for _, rule := range rules {
oldRules[rule.Key()] = m.swapRule(rule)
m.addRule(rule, oldRules)
}

ruleList, err := buildRuleList(m.rules)
if err == nil {
for _, rule := range rules {
err = m.store.SaveRule(rule.StoreKey(), rule)
if err != nil {
break
if err := m.tryBuildSave(oldRules); err != nil {
return err
}

log.Info("placement rule updated", zap.String("rules", fmt.Sprint(rules)))
return nil
}

func (m *RuleManager) delRuleByID(group, id string, oldRules map[[2]string]*Rule) {
key := [2]string{group, id}
old, ok := m.rules[key]
if ok {
delete(m.rules, key)
}
oldRules[key] = old
}

func (m *RuleManager) delRule(t *Batch, oldRules map[[2]string]*Rule) {
if !t.DeleteByIDPrefix {
m.delRuleByID(t.GroupID, t.ID, oldRules)
} else {
for key := range m.rules {
if key[0] == t.GroupID && strings.HasPrefix(key[1], t.ID) {
m.delRuleByID(key[0], key[1], oldRules)
}
}
}
}

if err != nil {
for key, rule := range oldRules {
if rule == nil {
delete(m.rules, key)
} else {
m.rules[key] = rule
// BatchAction indicates the operation type
type BatchAction string

const (
// BatchAdd a placement rule, only need to specify the field *Rule
BatchAdd BatchAction = "add"
// BatchDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID`
BatchDel BatchAction = "del"
)

// Batch is for batching placement rule actions. The action type is
// distinguished by the field `Action`.
type Batch struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to say Batch here is a bit of misleading as one Batch only contains one rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any good idea? RuleOp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuleOp is ok to me. one Batchcontains multiple RuleOp seems more reasonable.

*Rule // information of the placement rule to add/delete
Action BatchAction `json:"action"` // the operation type
DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id
}

// Batch executes a series of actions at once.
func (m *RuleManager) Batch(todo []Batch) error {
for _, t := range todo {
switch t.Action {
case BatchAdd:
err := m.adjustRule(t.Rule)
if err != nil {
return err
}
}
}

m.Lock()
defer m.Unlock()

oldRules := make(map[[2]string]*Rule)

for _, t := range todo {
switch t.Action {
case BatchAdd:
m.addRule(t.Rule, oldRules)
case BatchDel:
m.delRule(&t, oldRules)
}
}

if err := m.tryBuildSave(oldRules); err != nil {
return err
}

m.ruleList = ruleList
log.Info("placement rule updated", zap.String("rules", fmt.Sprint(rules)))
log.Info("placement rules updated", zap.String("batch", fmt.Sprint(todo)))
return nil
}
15 changes: 13 additions & 2 deletions server/schedule/placement/rule_manager_test.go
Expand Up @@ -93,12 +93,23 @@ func (s *testManagerSuite) TestKeys(c *C) {
{GroupID: "1", ID: "1", Role: "voter", Count: 1, StartKeyHex: "", EndKeyHex: ""},
{GroupID: "2", ID: "2", Role: "voter", Count: 1, StartKeyHex: "11", EndKeyHex: "ff"},
{GroupID: "2", ID: "3", Role: "voter", Count: 1, StartKeyHex: "22", EndKeyHex: "dd"},
{GroupID: "3", ID: "4", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "ee"},
{GroupID: "3", ID: "5", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "dd"},
}

toDelete := []Batch{}
for _, r := range rules {
s.manager.SetRule(r)
toDelete = append(toDelete, Batch{
Rule: r,
Action: BatchDel,
DeleteByIDPrefix: false,
})
}
s.manager.Batch(toDelete)

rules = append(rules, &Rule{GroupID: "3", ID: "4", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "ee"},
&Rule{GroupID: "3", ID: "5", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "dd"})
s.manager.SetRules(rules)

s.manager.DeleteRule("pd", "default")

splitKeys := [][]string{
Expand Down