Skip to content

Commit

Permalink
Merge 08dc98d into 4f8e7da
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Sep 18, 2017
2 parents 4f8e7da + 08dc98d commit df724fd
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
storeHandler := newStoreHandler(svr, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/state", storeHandler.SetState).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
Expand Down
31 changes: 31 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

stateStr := r.URL.Query().Get("state")
state, ok := metapb.StoreState_value[stateStr]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "invalid state")
return
}

err = cluster.SetStoreState(storeID, metapb.StoreState(state))
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
Expand Down
31 changes: 31 additions & 0 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,37 @@ func (s *testStoreSuite) TestStoreDelete(c *C) {
}
}

func (s *testStoreSuite) TestStoreSetState(c *C) {
url := fmt.Sprintf("%s/store/1", s.urlPrefix)
var info storeInfo
err := readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)

client := newUnixSocketClient()

// Set to Offline.
err = postJSON(client, url+"/state?state=Offline", nil)
c.Assert(err, IsNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)

// Invalid state.
err = postJSON(client, url+"/state?state=Foo", nil)
c.Assert(err, NotNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)

// Set back to Up.
err = postJSON(client, url+"/state?state=Up", nil)
c.Assert(err, IsNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)
}

func (s *testStoreSuite) TestUrlStoreFilter(c *C) {
table := []struct {
u string
Expand Down
38 changes: 37 additions & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,30 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error {
return cluster.putStore(store)
}

// SetStoreState sets up a store's state.
func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) error {
c.Lock()
defer c.Unlock()

cluster := c.cachedCluster

store := cluster.getStore(storeID)
if store == nil {
return errors.Trace(errStoreNotFound(storeID))
}

store.State = state
log.Warnf("[store %d] set state to %v", storeID, state.String())
return cluster.putStore(store)
}

func (c *RaftCluster) checkStores() {
cluster := c.cachedCluster
for _, store := range cluster.getMetaStores() {
if store.GetState() != metapb.StoreState_Offline {
continue
}
if cluster.getStoreRegionCount(store.GetId()) == 0 {
if c.storeIsEmpty(store.GetId()) {
err := c.BuryStore(store.GetId(), false)
if err != nil {
log.Errorf("bury store %v failed: %v", store, err)
Expand All @@ -520,6 +537,25 @@ func (c *RaftCluster) checkStores() {
}
}

func (c *RaftCluster) storeIsEmpty(storeID uint64) bool {
cluster := c.cachedCluster
if cluster.getStoreRegionCount(storeID) > 0 {
return false
}
// If pd-server is started recently, or becomes leader recently, the check may
// happen before any heartbeat from tikv. So we need to check region metas to
// verify no region's peer is on the store.
regions := cluster.getMetaRegions()
for _, region := range regions {
for _, p := range region.GetPeers() {
if p.GetStoreId() == storeID {
return false
}
}
}
return true
}

func (c *RaftCluster) collectMetrics() {
cluster := c.cachedCluster

Expand Down

0 comments on commit df724fd

Please sign in to comment.