Skip to content

Commit

Permalink
stability: add fault-trigger client (pingcap#326)
Browse files Browse the repository at this point in the history
* stability: add fault-trigger client
  • Loading branch information
cwen0 authored and weekface committed Mar 21, 2019
1 parent 0908884 commit 3e9c07b
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 11 deletions.
8 changes: 4 additions & 4 deletions tests/actions.go
Expand Up @@ -419,7 +419,7 @@ func (oa *operatorActions) pdMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, err
replicas := tc.Spec.PD.Replicas + int32(failureCount)
if *pdSet.Spec.Replicas != replicas {
glog.Infof("statefulset: %s/%s .spec.Replicas(%d) != %d",
ns, pdSetName, *pdSet.Spec.Replicas, ns, tcName, replicas)
ns, pdSetName, *pdSet.Spec.Replicas, replicas)
return false, nil
}
if pdSet.Status.ReadyReplicas != tc.Spec.PD.Replicas {
Expand Down Expand Up @@ -589,7 +589,7 @@ func (oa *operatorActions) reclaimPolicySyncFn(tc *v1alpha1.TidbCluster) (bool,
for _, pvc := range pvcList.Items {
pvName := pvc.Spec.VolumeName
if pv, err := oa.kubeCli.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}); err != nil {
glog.Errorf("failed to get pv: %s", pvName, err)
glog.Errorf("failed to get pv: %s, error: %v", pvName, err)
return false, nil
} else if pv.Spec.PersistentVolumeReclaimPolicy != tc.Spec.PVReclaimPolicy {
glog.Errorf("pv: %s's reclaimPolicy is not Retain", pvName)
Expand All @@ -608,7 +608,7 @@ func (oa *operatorActions) metaSyncFn(tc *v1alpha1.TidbCluster) (bool, error) {
var cluster *metapb.Cluster
var err error
if cluster, err = pdCli.GetCluster(); err != nil {
glog.Errorf("failed to get cluster from pdControl: %s/%s", ns, tcName, err)
glog.Errorf("failed to get cluster from pdControl: %s/%s, error: %v", ns, tcName, err)
return false, nil
}

Expand Down Expand Up @@ -808,7 +808,7 @@ func (oa *operatorActions) schedulerHAFn(tc *v1alpha1.TidbCluster) (bool, error)
}
nodeMap[nodeName] = append(nodeMap[nodeName], pod.GetName())
if len(nodeMap[nodeName]) > totalCount/2 {
return false, fmt.Errorf("node % have %d pods, greater than %d/2",
return false, fmt.Errorf("node %s have %d pods, greater than %d/2",
nodeName, len(nodeMap[nodeName]), totalCount)
}
}
Expand Down
27 changes: 26 additions & 1 deletion tests/pkg/fault-trigger/api/response.go
Expand Up @@ -13,7 +13,13 @@

package api

import "net/http"
import (
"encoding/json"
"net/http"

"github.com/golang/glog"
"github.com/juju/errors"
)

// Response defines a new response struct for http
type Response struct {
Expand Down Expand Up @@ -41,3 +47,22 @@ func (r *Response) payload(payload interface{}) *Response {
r.Payload = payload
return r
}

// ExtractResponse extract response from api
func ExtractResponse(data []byte) ([]byte, error) {
respData := &Response{}
if err := json.Unmarshal(data, respData); err != nil {
return nil, errors.Trace(err)
}

if respData.StatusCode != http.StatusOK {
d, err := json.Marshal(respData.Payload)
if err != nil {
glog.Errorf("marshal data failed %v", d)
}

return d, errors.New(respData.Message)
}

return json.Marshal(respData.Payload)
}
5 changes: 3 additions & 2 deletions tests/pkg/fault-trigger/api/router.go
Expand Up @@ -16,13 +16,14 @@ package api
import restful "github.com/emicklei/go-restful"

const (
apiPrefix = "/pingcap.com/api/v1"
// APIPrefix defines a prefix string for fault-trigger api
APIPrefix = "/pingcap.com/api/v1"
)

func (s *Server) newService() *restful.WebService {
ws := new(restful.WebService)
ws.
Path(apiPrefix).
Path(APIPrefix).
Consumes(restful.MIME_JSON).
Produces(restful.MIME_JSON)

Expand Down
194 changes: 194 additions & 0 deletions tests/pkg/fault-trigger/client/client.go
@@ -0,0 +1,194 @@
package client

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/api"
"github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/manager"
"github.com/pingcap/tidb-operator/tests/pkg/util"
)

// Client is a fault-trigger client
type Client interface {
// ListVMs lists all virtual machines
ListVMs() ([]*manager.VM, error)
// StartVM start a specified virtual machine
StartVM(vm *manager.VM) error
// StopVM stops a specified virtual machine
StopVM(vm *manager.VM) error
// StartETCD starts the etcd service
StartETCD() error
// StopETCD stops the etcd service
StopETCD() error
// StartKubelet starts the kubelet service
StartKubelet() error
// StopKubelet stops the kubelet service
StopKubelet() error
}

// client is used to communicate with the fault-trigger
type client struct {
cfg Config
httpCli *http.Client
}

// Config defines for fault-trigger client
type Config struct {
Addr string
}

// NewClient creates a new fault-trigger client from a given address
func NewClient(cfg Config) Client {
return &client{
cfg: cfg,
httpCli: http.DefaultClient,
}
}

type clientError struct {
code int
msg string
}

func (e *clientError) Error() string {
return fmt.Sprintf("%s (code: %d)", e.msg, e.code)
}

func (c client) do(req *http.Request) (*http.Response, []byte, error) {
resp, err := c.httpCli.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

code := resp.StatusCode

if code != http.StatusOK {
return resp, nil, &clientError{
code: code,
msg: "fail to request to http service",
}
}

bodyByte, err := ioutil.ReadAll(resp.Body)
if err != nil {
return resp, nil, &clientError{
code: code,
msg: fmt.Sprintf("failed to read data from resp body, error: %v", err),
}
}

data, err := api.ExtractResponse(bodyByte)
if err != nil {
return resp, nil, &clientError{
code: code,
msg: err.Error(),
}
}

return resp, data, err
}

func (c client) get(url string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}

_, body, err := c.do(req)
if err != nil {
return nil, err
}

return body, nil
}

func (c *client) ListVMs() ([]*manager.VM, error) {
url := util.GenURL(fmt.Sprintf("%s%s/vms", c.cfg.Addr, api.APIPrefix))
data, err := c.get(url)
if err != nil {
return nil, err
}

var vms []*manager.VM
if err = json.Unmarshal(data, &vms); err != nil {
return nil, err
}

return vms, nil
}

func (c *client) StartVM(vm *manager.VM) error {
if err := vm.Verify(); err != nil {
return err
}

vmName := vm.Name
if len(vmName) == 0 {
vmName = vm.IP
}

url := util.GenURL(fmt.Sprintf("%s/%s/vm/%s/start", c.cfg.Addr, api.APIPrefix, vmName))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

func (c *client) StopVM(vm *manager.VM) error {
if err := vm.Verify(); err != nil {
return err
}

vmName := vm.Name
if len(vmName) == 0 {
vmName = vm.IP
}

url := util.GenURL(fmt.Sprintf("%s/%s/vm/%s/stop", c.cfg.Addr, api.APIPrefix, vmName))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

func (c *client) StartETCD() error {
url := util.GenURL(fmt.Sprintf("%s/%s/etcd/start", c.cfg.Addr, api.APIPrefix))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

func (c *client) StopETCD() error {
url := util.GenURL(fmt.Sprintf("%s/%s/etcd/stop", c.cfg.Addr, api.APIPrefix))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

func (c *client) StartKubelet() error {
url := util.GenURL(fmt.Sprintf("%s/%s/kubelet/start", c.cfg.Addr, api.APIPrefix))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

func (c *client) StopKubelet() error {
url := util.GenURL(fmt.Sprintf("%s/%s/kubelet/stop", c.cfg.Addr, api.APIPrefix))
if _, err := c.get(url); err != nil {
return err
}

return nil
}

0 comments on commit 3e9c07b

Please sign in to comment.