Skip to content

Commit

Permalink
Merge branch 'master' into disksing/transfer-leader
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Mar 23, 2017
2 parents 270dae7 + 42f48a6 commit 38abfcb
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 62 deletions.
27 changes: 27 additions & 0 deletions pd-client/leader_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ func (s *testLeaderChangeSuite) prepareClusterN(c *C, n int) (svrs map[string]*s
return
}

func (s *testLeaderChangeSuite) TestLeaderConfigChange(c *C) {
svrs, endpoints, closeFunc := s.prepareClusterN(c, 3)
defer closeFunc()

leader, err := getLeader(endpoints)
c.Assert(err, IsNil)
mustConnectLeader(c, endpoints, leader.GetAddr())

r := server.ReplicationConfig{MaxReplicas: 5}
svrs[leader.GetAddr()].SetReplication(r)
svrs[leader.GetAddr()].Close()
// wait leader changes
changed := false
for i := 0; i < 20; i++ {
newLeader, _ := getLeader(endpoints)
if newLeader != nil && newLeader.GetAddr() != leader.GetAddr() {
mustConnectLeader(c, endpoints, newLeader.GetAddr())
changed = true
nr := svrs[newLeader.GetAddr()].GetConfig().Replication.MaxReplicas
c.Assert(nr, Equals, uint64(5))
break
}
time.Sleep(500 * time.Millisecond)
}
c.Assert(changed, IsTrue)
}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
svrs, endpoints, closeFunc := s.prepareClusterN(c, 3)
defer closeFunc()
Expand Down
18 changes: 17 additions & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, &h.svr.GetConfig().Schedule)
}

func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
config := &server.ScheduleConfig{}
err := readJSON(r.Body, config)
if err != nil {
Expand All @@ -51,3 +51,19 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
h.svr.SetScheduleConfig(*config)
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, &h.svr.GetConfig().Replication)
}

func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) {
config := &server.ReplicationConfig{}
err := readJSON(r.Body, config)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.svr.SetReplication(*config)
h.rd.JSON(w, http.StatusOK, nil)
}
41 changes: 33 additions & 8 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package api

import (
"bytes"
"encoding/json"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -68,24 +67,50 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) {
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
buf, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)

sc := &server.ScheduleConfig{}
err = json.Unmarshal(buf, sc)
c.Assert(err, IsNil)
readJSON(resp.Body, sc)

sc.MaxStoreDownTime.Duration = time.Second
postData, err := json.Marshal(sc)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
resp, err = s.hc.Post(postAddr, "application/json", bytes.NewBuffer(postData))
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)

resp, err = s.hc.Get(addr)
sc1 := &server.ScheduleConfig{}
json.NewDecoder(resp.Body).Decode(sc1)
readJSON(resp.Body, sc1)

c.Assert(*sc, Equals, *sc1)
}
}

func (s *testConfigSuite) TestConfigReplication(c *C) {
numbers := []int{1, 3}
for _, num := range numbers {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)

rc := &server.ReplicationConfig{}
err = readJSON(resp.Body, rc)
c.Assert(err, IsNil)

rc.MaxReplicas = 5
postData, err := json.Marshal(rc)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)

resp, err = s.hc.Get(addr)
rc1 := &server.ReplicationConfig{}
err = readJSON(resp.Body, rc1)

c.Assert(*rc, DeepEquals, *rc1)
}
}
4 changes: 3 additions & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

confHandler := newConfHandler(svr, rd)
router.HandleFunc("/api/v1/config", confHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/config", confHandler.Post).Methods("POST")
router.HandleFunc("/api/v1/config", confHandler.SetSchedule).Methods("POST")
router.HandleFunc("/api/v1/config/schedule", confHandler.GetSchedule).Methods("GET")
router.HandleFunc("/api/v1/config/replicate", confHandler.SetReplication).Methods("POST")
router.HandleFunc("/api/v1/config/replicate", confHandler.GetReplication).Methods("GET")

storeHandler := newStoreHandler(svr, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
Expand Down
4 changes: 2 additions & 2 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type storeStatus struct {
StoreID uint64 `json:"store_id"`
Capacity typeutil.ByteSize `json:"capacity"`
Available typeutil.ByteSize `json:"available"`
LeaderCount uint32 `json:"leader_count"`
RegionCount uint32 `json:"region_count"`
LeaderCount int `json:"leader_count"`
RegionCount int `json:"region_count"`
SendingSnapCount uint32 `json:"sending_snap_count"`
ReceivingSnapCount uint32 `json:"receiving_snap_count"`
ApplyingSnapCount uint32 `json:"applying_snap_count"`
Expand Down
12 changes: 12 additions & 0 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package api

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net"
"net/http"

"github.com/juju/errors"
)
Expand All @@ -38,6 +40,16 @@ func readJSON(r io.ReadCloser, data interface{}) error {
return nil
}

func postJSON(cli *http.Client, url string, data []byte) error {
resp, err := cli.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return errors.Trace(err)
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
return nil
}

func unixDial(_, addr string) (net.Conn, error) {
return net.Dial("unix", addr)
}
8 changes: 4 additions & 4 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func (c *testClusterInfo) setStoreBusy(storeID uint64, busy bool) {
func (c *testClusterInfo) addLeaderStore(storeID uint64, leaderCount int) {
store := newStoreInfo(&metapb.Store{Id: storeID})
store.status.LastHeartbeatTS = time.Now()
store.status.LeaderCount = uint32(leaderCount)
store.status.LeaderCount = leaderCount
c.putStore(store)
}

func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) {
store := newStoreInfo(&metapb.Store{Id: storeID})
store.status.LastHeartbeatTS = time.Now()
store.status.RegionCount = uint32(regionCount)
store.status.RegionCount = regionCount
store.status.Capacity = uint64(1024)
store.status.Available = store.status.Capacity
c.putStore(store)
Expand Down Expand Up @@ -96,13 +96,13 @@ func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, foll

func (c *testClusterInfo) updateLeaderCount(storeID uint64, leaderCount int) {
store := c.getStore(storeID)
store.status.LeaderCount = uint32(leaderCount)
store.status.LeaderCount = leaderCount
c.putStore(store)
}

func (c *testClusterInfo) updateRegionCount(storeID uint64, regionCount int) {
store := c.getStore(storeID)
store.status.RegionCount = uint32(regionCount)
store.status.RegionCount = regionCount
c.putStore(store)
}

Expand Down
83 changes: 61 additions & 22 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func (s *storesInfo) getStoreCount() int {
return len(s.stores)
}

func (s *storesInfo) setLeaderCount(storeID uint64, leaderCount int) {
if store, ok := s.stores[storeID]; ok {
store.status.LeaderCount = leaderCount
}
}

func (s *storesInfo) setRegionCount(storeID uint64, regionCount int) {
if store, ok := s.stores[storeID]; ok {
store.status.RegionCount = regionCount
}
}

// regionMap wraps a map[uint64]*regionInfo and supports randomly pick a region.
type regionMap struct {
m map[uint64]*regionEntry
Expand Down Expand Up @@ -551,13 +563,17 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
}

store.status.StoreStats = proto.Clone(stats).(*pdpb.StoreStats)
store.status.LeaderCount = uint32(c.regions.getStoreLeaderCount(storeID))
store.status.LastHeartbeatTS = time.Now()

c.stores.setStore(store)
return nil
}

func (c *clusterInfo) updateStoreStatus(id uint64) {
c.stores.setLeaderCount(id, c.regions.getStoreLeaderCount(id))
c.stores.setRegionCount(id, c.regions.getStoreRegionCount(id))
}

// handleRegionHeartbeat updates the region information.
func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) error {
c.Lock()
Expand All @@ -566,35 +582,58 @@ func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) error {
region = region.clone()
origin := c.regions.getRegion(region.GetId())

// Region does not exist, add it.
// Save to KV if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
var saveKV, saveCache bool
if origin == nil {
log.Infof("[region %d] Insert new region {%v}", region.GetId(), region)
return c.putRegionLocked(region)
saveKV, saveCache = true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return errors.Trace(errRegionIsStale(region.Region, origin.Region))
}
if r.GetVersion() > o.GetVersion() {
log.Infof("[region %d] %s, Version changed from {%d} to {%d}", region.GetId(), diffRegionKeyInfo(origin, region), o.GetVersion(), r.GetVersion())
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
log.Infof("[region %d] %s, ConfVer changed from {%d} to {%d}", region.GetId(), diffRegionPeersInfo(origin, region), o.GetConfVer(), r.GetConfVer())
saveKV, saveCache = true, true
}
if region.Leader.GetId() != origin.Leader.GetId() {
log.Infof("[region %d] Leader changed from {%v} to {%v}", region.GetId(), origin.GetPeer(origin.Leader.GetId()), region.GetPeer(region.Leader.GetId()))
saveCache = true
}
if len(region.DownPeers) > 0 || len(region.PendingPeers) > 0 {
saveCache = true
}
if len(origin.DownPeers) > 0 || len(origin.DownPeers) > 0 {
saveCache = true
}
}

r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()

// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return errors.Trace(errRegionIsStale(region.Region, origin.Region))
if saveKV && c.kv != nil {
if err := c.kv.saveRegion(region.Region); err != nil {
return errors.Trace(err)
}
}

// Region meta is updated, update kv and cache.
if r.GetVersion() > o.GetVersion() {
log.Infof("[region %d] %s, Version changed from {%d} to {%d}", region.GetId(), diffRegionKeyInfo(origin, region), o.GetVersion(), r.GetVersion())
return c.putRegionLocked(region)
}
if r.GetConfVer() > o.GetConfVer() {
log.Infof("[region %d] %s, ConfVer changed from {%d} to {%d}", region.GetId(), diffRegionPeersInfo(origin, region), o.GetConfVer(), r.GetConfVer())
return c.putRegionLocked(region)
}
if saveCache {
c.regions.setRegion(region)

if region.Leader.GetId() != origin.Leader.GetId() {
log.Infof("[region %d] Leader changed from {%v} to {%v}", region.GetId(), origin.GetPeer(origin.Leader.GetId()), region.GetPeer(region.Leader.GetId()))
// Update related stores.
if origin != nil {
for _, p := range origin.Peers {
c.updateStoreStatus(p.GetStoreId())
}
}
for _, p := range region.Peers {
c.updateStoreStatus(p.GetStoreId())
}
}

// Region meta is the same, update cache only.
c.regions.setRegion(region)
return nil
}
Loading

0 comments on commit 38abfcb

Please sign in to comment.