forked from pingcap/br
/
utils.go
120 lines (110 loc) · 3.3 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package pdutil
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/tablecodec"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/server/schedule/placement"
berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/httputil"
)
// UndoFunc is a 'undo' operation of some undoable command.
// (e.g. RemoveSchedulers).
type UndoFunc func(context.Context) error
// Nop is the 'zero value' of undo func.
var Nop UndoFunc = func(context.Context) error { return nil }
const (
resetTSURL = "/pd/api/v1/admin/reset-ts"
placementRuleURL = "/pd/api/v1/config/rules"
)
// ResetTS resets the timestamp of PD to a bigger value.
func ResetTS(ctx context.Context, pdAddr string, ts uint64, tlsConf *tls.Config) error {
payload, err := json.Marshal(struct {
TSO string `json:"tso,omitempty"`
}{TSO: fmt.Sprintf("%d", ts)})
if err != nil {
return errors.Trace(err)
}
cli := httputil.NewClient(tlsConf)
prefix := "http://"
if tlsConf != nil {
prefix = "https://"
}
reqURL := prefix + pdAddr + resetTSURL
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, strings.NewReader(string(payload)))
if err != nil {
return errors.Trace(err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := cli.Do(req)
if err != nil {
return errors.Trace(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusForbidden {
buf := new(bytes.Buffer)
_, _ = buf.ReadFrom(resp.Body)
return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd resets TS failed: req=%v, resp=%v, err=%v", string(payload), buf.String(), err)
}
return nil
}
// GetPlacementRules return the current placement rules.
func GetPlacementRules(ctx context.Context, pdAddr string, tlsConf *tls.Config) ([]placement.Rule, error) {
cli := httputil.NewClient(tlsConf)
prefix := "http://"
if tlsConf != nil {
prefix = "https://"
}
reqURL := prefix + pdAddr + placementRuleURL
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, errors.Trace(err)
}
resp, err := cli.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}
if resp.StatusCode == http.StatusPreconditionFailed {
return []placement.Rule{}, nil
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse, "get placement rules failed: resp=%v, err=%v, code=%d", buf.String(), err, resp.StatusCode)
}
var rules []placement.Rule
err = json.Unmarshal(buf.Bytes(), &rules)
if err != nil {
return nil, errors.Trace(err)
}
return rules, nil
}
// SearchPlacementRule returns the placement rule matched to the table or nil.
func SearchPlacementRule(tableID int64, placementRules []placement.Rule, role placement.PeerRoleType) *placement.Rule {
for _, rule := range placementRules {
key, err := hex.DecodeString(rule.StartKeyHex)
if err != nil {
continue
}
_, decoded, err := codec.DecodeBytes(key)
if err != nil {
continue
}
if rule.Role == role && tableID == tablecodec.DecodeTableID(decoded) {
return &rule
}
}
return nil
}