Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix a bug that may set not empty store to tombstone. #760

Merged
merged 3 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
storeHandler := newStoreHandler(svr, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/state", storeHandler.SetState).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
Expand Down
31 changes: 31 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

stateStr := r.URL.Query().Get("state")
state, ok := metapb.StoreState_value[stateStr]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "invalid state")
return
}

err = cluster.SetStoreState(storeID, metapb.StoreState(state))
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
Expand Down
31 changes: 31 additions & 0 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,37 @@ func (s *testStoreSuite) TestStoreDelete(c *C) {
}
}

func (s *testStoreSuite) TestStoreSetState(c *C) {
url := fmt.Sprintf("%s/store/1", s.urlPrefix)
var info storeInfo
err := readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)

client := newUnixSocketClient()

// Set to Offline.
err = postJSON(client, url+"/state?state=Offline", nil)
c.Assert(err, IsNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)

// Invalid state.
err = postJSON(client, url+"/state?state=Foo", nil)
c.Assert(err, NotNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)

// Set back to Up.
err = postJSON(client, url+"/state?state=Up", nil)
c.Assert(err, IsNil)
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)
}

func (s *testStoreSuite) TestUrlStoreFilter(c *C) {
table := []struct {
u string
Expand Down
38 changes: 37 additions & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,30 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error {
return cluster.putStore(store)
}

// SetStoreState sets up a store's state.
func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) error {
c.Lock()
defer c.Unlock()

cluster := c.cachedCluster

store := cluster.getStore(storeID)
if store == nil {
return errors.Trace(errStoreNotFound(storeID))
}

store.State = state
log.Warnf("[store %d] set state to %v", storeID, state.String())
return cluster.putStore(store)
}

func (c *RaftCluster) checkStores() {
cluster := c.cachedCluster
for _, store := range cluster.getMetaStores() {
if store.GetState() != metapb.StoreState_Offline {
continue
}
if cluster.getStoreRegionCount(store.GetId()) == 0 {
if c.storeIsEmpty(store.GetId()) {
err := c.BuryStore(store.GetId(), false)
if err != nil {
log.Errorf("bury store %v failed: %v", store, err)
Expand All @@ -520,6 +537,25 @@ func (c *RaftCluster) checkStores() {
}
}

func (c *RaftCluster) storeIsEmpty(storeID uint64) bool {
cluster := c.cachedCluster
if cluster.getStoreRegionCount(storeID) > 0 {
return false
}
// If pd-server is started recently, or becomes leader recently, the check may
// happen before any heartbeat from tikv. So we need to check region metas to
// verify no region's peer is on the store.
regions := cluster.getMetaRegions()
for _, region := range regions {
for _, p := range region.GetPeers() {
if p.GetStoreId() == storeID {
return false
}
}
}
return true
}

func (c *RaftCluster) collectMetrics() {
cluster := c.cachedCluster

Expand Down