Skip to content

Commit

Permalink
tools/simulator: add store api and replace simulator http with SDK (#…
Browse files Browse the repository at this point in the history
…8245)

ref #8135

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] committed Jun 5, 2024
1 parent 82d3a4a commit 301fabb
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 58 deletions.
9 changes: 9 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Client interface {
GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
GetStore(context.Context, uint64) (*StoreInfo, error)
DeleteStore(context.Context, uint64) error
SetStoreLabels(context.Context, int64, map[string]string) error
GetHealthStatus(context.Context) ([]Health, error)
/* Config-related interfaces */
Expand Down Expand Up @@ -440,6 +441,14 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, erro
return &store, nil
}

// DeleteStore deletes the store by ID.
func (c *client) DeleteStore(ctx context.Context, storeID uint64) error {
return c.request(ctx, newRequestInfo().
WithName(deleteStoreName).
WithURI(StoreByID(storeID)).
WithMethod(http.MethodDelete))
}

// GetClusterVersion gets the cluster version.
func (c *client) GetClusterVersion(ctx context.Context) (string, error) {
var version string
Expand Down
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
getRegionStatusByKeyRangeName = "GetRegionStatusByKeyRange"
getStoresName = "GetStores"
getStoreName = "GetStore"
deleteStoreName = "DeleteStore"
setStoreLabelsName = "SetStoreLabels"
getHealthStatusName = "GetHealthStatus"
getConfigName = "GetConfig"
Expand Down
34 changes: 25 additions & 9 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -80,6 +81,15 @@ func (suite *httpClientTestSuite) SetupSuite() {
leaderServer := cluster.GetLeaderServer()

err = leaderServer.BootstrapCluster()
// Add 2 more stores to the cluster.
for i := 2; i <= 4; i++ {
tests.MustPutStore(re, cluster, &metapb.Store{
Id: uint64(i),
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
})
}
re.NoError(err)
for _, region := range []*core.RegionInfo{
core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")),
Expand Down Expand Up @@ -165,29 +175,29 @@ func (suite *httpClientTestSuite) TestMeta() {
re.Empty(regionStats.StoreLeaderCount)
hotReadRegions, err := client.GetHotReadRegions(ctx)
re.NoError(err)
re.Len(hotReadRegions.AsPeer, 1)
re.Len(hotReadRegions.AsLeader, 1)
re.Len(hotReadRegions.AsPeer, 4)
re.Len(hotReadRegions.AsLeader, 4)
hotWriteRegions, err := client.GetHotWriteRegions(ctx)
re.NoError(err)
re.Len(hotWriteRegions.AsPeer, 1)
re.Len(hotWriteRegions.AsLeader, 1)
re.Len(hotWriteRegions.AsPeer, 4)
re.Len(hotWriteRegions.AsLeader, 4)
historyHorRegions, err := client.GetHistoryHotRegions(ctx, &pd.HistoryHotRegionsRequest{
StartTime: 0,
EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond),
})
re.NoError(err)
re.Empty(historyHorRegions.HistoryHotRegion)
store, err := client.GetStores(ctx)
stores, err := client.GetStores(ctx)
re.NoError(err)
re.Equal(1, store.Count)
re.Len(store.Stores, 1)
storeID := uint64(store.Stores[0].Store.ID) // TODO: why type is different?
re.Equal(4, stores.Count)
re.Len(stores.Stores, 4)
storeID := uint64(stores.Stores[0].Store.ID) // TODO: why type is different?
store2, err := client.GetStore(ctx, storeID)
re.NoError(err)
re.EqualValues(storeID, store2.Store.ID)
version, err := client.GetClusterVersion(ctx)
re.NoError(err)
re.Equal("0.0.0", version)
re.Equal("1.0.0", version)
rgs, _ := client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a"), []byte("a1")), 100)
re.Equal(int64(0), rgs.Count)
rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), 100)
Expand All @@ -196,6 +206,12 @@ func (suite *httpClientTestSuite) TestMeta() {
re.Equal(int64(1), rgs.Count)
rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte(""), []byte("")), 100)
re.Equal(int64(2), rgs.Count)
// store 2 origin status:offline
err = client.DeleteStore(ctx, 2)
re.NoError(err)
store2, err = client.GetStore(ctx, 2)
re.NoError(err)
re.Equal(int64(metapb.StoreState_Offline), store2.Store.State)
}

func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() {
Expand Down
6 changes: 6 additions & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/log"
flag "github.com/spf13/pflag"
pdHttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -92,6 +93,7 @@ func main() {

func run(simCase string, simConfig *sc.SimConfig) {
if *pdAddr != "" {
simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{*pdAddr})
simStart(*pdAddr, *statusAddress, simCase, simConfig)
} else {
local, clean := NewSingleServer(context.Background(), simConfig)
Expand All @@ -105,6 +107,7 @@ func run(simCase string, simConfig *sc.SimConfig) {
}
time.Sleep(100 * time.Millisecond)
}
simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{local.GetAddr()})
simStart(local.GetAddr(), "", simCase, simConfig, clean)
}
}
Expand Down Expand Up @@ -183,6 +186,9 @@ EXIT:
analysis.GetTransferCounter().PrintResult()
}

if simulator.PDHTTPClient != nil {
simulator.PDHTTPClient.Close()
}
if simResult != "OK" {
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package cases

import (
"github.com/pingcap/kvproto/pkg/metapb"
pdHttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
Expand Down Expand Up @@ -57,7 +57,7 @@ type Case struct {
TableNumber int

Checker CheckerFunc // To check the schedule is finished.
Rules []*placement.Rule
Rules []*pdHttp.Rule
Labels typeutil.StringSlice
}

Expand Down
23 changes: 12 additions & 11 deletions tools/pd-simulator/simulator/cases/diagnose_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
pdHttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/placement"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
Expand All @@ -30,30 +31,30 @@ import (
func newRule1(_ *sc.SimConfig) *Case {
var simCase Case

simCase.Rules = make([]*placement.Rule, 0)
simCase.Rules = append(simCase.Rules, &placement.Rule{
simCase.Rules = make([]*pdHttp.Rule, 0)
simCase.Rules = append(simCase.Rules, &pdHttp.Rule{
GroupID: "test1",
ID: "test1",
StartKeyHex: "",
EndKeyHex: "",
Role: placement.Learner,
Role: pdHttp.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
LabelConstraints: []pdHttp.LabelConstraint{
{
Key: "region",
Op: "in",
Values: []string{"region1"},
},
},
LocationLabels: []string{"host"},
}, &placement.Rule{
}, &pdHttp.Rule{
GroupID: placement.DefaultGroupID,
ID: placement.DefaultRuleID,
StartKeyHex: "",
EndKeyHex: "",
Role: placement.Voter,
Role: pdHttp.Voter,
Count: 5,
LabelConstraints: []placement.LabelConstraint{
LabelConstraints: []pdHttp.LabelConstraint{
{
Key: "region",
Op: "in",
Expand Down Expand Up @@ -130,16 +131,16 @@ func newRule1(_ *sc.SimConfig) *Case {
func newRule2(_ *sc.SimConfig) *Case {
var simCase Case

simCase.Rules = make([]*placement.Rule, 0)
simCase.Rules = make([]*pdHttp.Rule, 0)
simCase.Rules = append(simCase.Rules,
&placement.Rule{
&pdHttp.Rule{
GroupID: "test1",
ID: "test1",
StartKeyHex: "",
EndKeyHex: "",
Role: placement.Leader,
Role: pdHttp.Leader,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
LabelConstraints: []pdHttp.LabelConstraint{
{
Key: "region",
Op: "in",
Expand Down
43 changes: 9 additions & 34 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
package simulator

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
pdHttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/typeutil"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
Expand All @@ -54,20 +50,19 @@ type Client interface {
const (
pdTimeout = time.Second
maxInitClusterRetries = 100
httpPrefix = "pd/api/v1"
)

var (
// errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
errFailInitClusterID = errors.New("[pd] failed to get cluster id")
PDHTTPClient pdHttp.Client
)

type client struct {
url string
tag string
clusterID uint64
clientConn *grpc.ClientConn
httpClient *http.Client

reportRegionHeartbeatCh chan *core.RegionInfo
receiveRegionHeartbeatCh chan *pdpb.RegionHeartbeatResponse
Expand All @@ -88,7 +83,6 @@ func NewClient(pdAddr string, tag string) (Client, <-chan *pdpb.RegionHeartbeatR
ctx: ctx,
cancel: cancel,
tag: tag,
httpClient: &http.Client{},
}
cc, err := c.createConn()
if err != nil {
Expand Down Expand Up @@ -319,46 +313,27 @@ func (c *client) PutStore(ctx context.Context, store *metapb.Store) error {

func (c *client) PutPDConfig(config *sc.PDConfig) error {
if len(config.PlacementRules) > 0 {
path := fmt.Sprintf("%s/%s/config/rules/batch", c.url, httpPrefix)
ruleOps := make([]*placement.RuleOp, 0)
ruleOps := make([]*pdHttp.RuleOp, 0)
for _, rule := range config.PlacementRules {
ruleOps = append(ruleOps, &placement.RuleOp{
ruleOps = append(ruleOps, &pdHttp.RuleOp{
Rule: rule,
Action: placement.RuleOpAdd,
Action: pdHttp.RuleOpAdd,
})
}
content, _ := json.Marshal(ruleOps)
req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content))
req.Header.Add("Content-Type", "application/json")
err := PDHTTPClient.SetPlacementRuleInBatch(c.ctx, ruleOps)
if err != nil {
return err
}
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
simutil.Logger.Info("add placement rule success", zap.String("rules", string(content)))
simutil.Logger.Info("add placement rule success", zap.Any("rules", config.PlacementRules))
}
if len(config.LocationLabels) > 0 {
path := fmt.Sprintf("%s/%s/config", c.url, httpPrefix)
data := make(map[string]any)
data["location-labels"] = config.LocationLabels
content, err := json.Marshal(data)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content))
req.Header.Add("Content-Type", "application/json")
if err != nil {
return err
}
res, err := c.httpClient.Do(req)
err := PDHTTPClient.SetConfig(c.ctx, data)
if err != nil {
return err
}
defer res.Body.Close()
simutil.Logger.Info("add location labels success", zap.String("labels", string(content)))
simutil.Logger.Info("add location labels success", zap.Any("labels", config.LocationLabels))
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-simulator/simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/BurntSushi/toml"
"github.com/docker/go-units"
pdHttp "github.com/tikv/pd/client/http"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -133,6 +133,6 @@ func (sc *SimConfig) Speed() uint64 {

// PDConfig saves some config which may be changed in PD.
type PDConfig struct {
PlacementRules []*placement.Rule
PlacementRules []*pdHttp.Rule
LocationLabels typeutil.StringSlice
}
6 changes: 6 additions & 0 deletions tools/pd-simulator/simulator/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func (*DownNode) Run(raft *RaftEngine, _ int64) bool {
return false
}
delete(raft.conn.Nodes, node.Id)
// delete store
err := PDHTTPClient.DeleteStore(context.Background(), node.Id)
if err != nil {
simutil.Logger.Error("put store failed", zap.Uint64("node-id", node.Id), zap.Error(err))
return false
}
node.Stop()

regions := raft.GetRegions()
Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewNode(s *cases.Store, pdAddr string, config *sc.SimConfig) (*Node, error)
StoreId: s.ID,
Capacity: uint64(config.RaftStore.Capacity),
StartTime: uint32(time.Now().Unix()),
Available: uint64(config.RaftStore.Capacity),
},
}
tag := fmt.Sprintf("store %d", s.ID)
Expand Down

0 comments on commit 301fabb

Please sign in to comment.