Skip to content

Commit

Permalink
server/api: add scheduler api (#407)
Browse files Browse the repository at this point in the history
* server/api: add scheduler api

We can now use API to list all schedulers, add or remove a scheduler.
  • Loading branch information
huachaohuang committed Dec 8, 2016
1 parent 3cb0604 commit 6f8b47b
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 0 deletions.
7 changes: 7 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
})

router := mux.NewRouter().PathPrefix(prefix).Subrouter()

handler := svr.GetHandler()
schedulerHandler := newSchedulerHandler(handler, rd)
router.HandleFunc("/api/v1/schedulers", schedulerHandler.List).Methods("GET")
router.HandleFunc("/api/v1/schedulers", schedulerHandler.Post).Methods("POST")
router.HandleFunc("/api/v1/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE")

router.Handle("/api/v1/cluster", newClusterHandler(svr, rd)).Methods("GET")

confHandler := newConfHandler(svr, rd)
Expand Down
88 changes: 88 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/gorilla/mux"
"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

type schedulerHandler struct {
*server.Handler
r *render.Render
}

func newSchedulerHandler(handler *server.Handler, r *render.Render) *schedulerHandler {
return &schedulerHandler{
Handler: handler,
r: r,
}
}

func (h *schedulerHandler) List(w http.ResponseWriter, r *http.Request) {
schedulers, err := h.GetSchedulers()
if err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.r.JSON(w, http.StatusOK, schedulers)
}

func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := readJSON(r.Body, &input); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

name, ok := input["name"].(string)
if !ok {
h.r.JSON(w, http.StatusBadRequest, "missing scheduler name")
return
}

switch name {
case "leader-balancer":
if err := h.AddLeaderBalancer(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "grant-leader-scheduler":
storeID, ok := input["store_id"].(float64)
if !ok {
h.r.JSON(w, http.StatusBadRequest, "missing store id")
return
}
if err := h.AddGrantLeaderScheduler(uint64(storeID)); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.r.JSON(w, http.StatusOK, nil)
}

func (h *schedulerHandler) Delete(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]

if err := h.RemoveScheduler(name); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.r.JSON(w, http.StatusOK, nil)
}
80 changes: 80 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import "github.com/juju/errors"

var (
errNotBootstrapped = errors.New("TiKV cluster not bootstrapped")
)

// Handler is a helper to export methods to handle API/RPC requests.
type Handler struct {
s *Server
}

func newHandler(s *Server) *Handler {
return &Handler{s: s}
}

func (h *Handler) getCoordinator() (*coordinator, error) {
cluster := h.s.GetRaftCluster()
if cluster == nil {
return nil, errors.Trace(errNotBootstrapped)
}
return cluster.coordinator, nil
}

// GetSchedulers returns all names of schedulers.
func (h *Handler) GetSchedulers() ([]string, error) {
c, err := h.getCoordinator()
if err != nil {
return nil, errors.Trace(err)
}
return c.getSchedulers(), nil
}

// RemoveScheduler removes a scheduler by name.
func (h *Handler) RemoveScheduler(name string) error {
c, err := h.getCoordinator()
if err != nil {
return errors.Trace(err)
}
if !c.removeScheduler(name) {
return errors.Errorf("scheduler %q not found", name)
}
return nil
}

// AddLeaderScheduler adds a leader scheduler.
func (h *Handler) AddLeaderScheduler(s Scheduler) error {
c, err := h.getCoordinator()
if err != nil {
return errors.Trace(err)
}
if !c.addScheduler(newLeaderScheduleController(c, s)) {
return errors.Errorf("scheduler %q exists", s.GetName())
}
return nil
}

// AddLeaderBalancer adds a leader-balancer.
func (h *Handler) AddLeaderBalancer() error {
return h.AddLeaderScheduler(newLeaderBalancer(h.s.scheduleOpt))
}

// AddGrantLeaderScheduler adds a grant-leader-scheduler.
func (h *Handler) AddGrantLeaderScheduler(storeID uint64) error {
return h.AddLeaderScheduler(newGrantLeaderScheduler(storeID))
}
9 changes: 9 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Server struct {
// for kv operation.
kv *kv

// for API operation.
handler *Handler

// for raft cluster
clusterLock sync.RWMutex
cluster *RaftCluster
Expand Down Expand Up @@ -104,6 +107,7 @@ func CreateServer(cfg *Config) (*Server, error) {
closed: 1,
}

s.handler = newHandler(s)
return s, nil
}

Expand Down Expand Up @@ -277,6 +281,11 @@ func (s *Server) GetAddr() string {
return s.cfg.AdvertiseClientUrls
}

// GetHandler returns the handler for API.
func (s *Server) GetHandler() *Handler {
return s.handler
}

// GetEndpoints returns the etcd endpoints for outer use.
func (s *Server) GetEndpoints() []string {
return s.client.Endpoints()
Expand Down

0 comments on commit 6f8b47b

Please sign in to comment.