Skip to content

Commit

Permalink
raft: enhancing cluster election
Browse files Browse the repository at this point in the history
Add parameter 'AdmitDefeatHtCnt' 'AdmitDefeatPingCnt' used for configuration,
and degrade leader to follower immediately when mysqld dead.
  • Loading branch information
MengZhe authored and Toknowledge committed Sep 12, 2018
1 parent a8f0a74 commit ccc971b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
28 changes: 18 additions & 10 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type RaftConfig struct {
// leader heartbeat interval(ms)
HeartbeatTimeout int `json:"heartbeat-timeout"`

// admit defeat count for hearbeat
AdmitDefeatHtCnt int `json:"admit-defeat-hearbeat-count"`

// election timeout(ms)
ElectionTimeout int `json:"election-timeout"`

Expand Down Expand Up @@ -76,6 +79,7 @@ func DefaultRaftConfig() *RaftConfig {
return &RaftConfig{
MetaDatadir: ".",
HeartbeatTimeout: 1000,
AdmitDefeatHtCnt: 10,
ElectionTimeout: 3000,
PurgeBinlogInterval: 1000 * 60 * 5,
LeaderStartCommand: "nop",
Expand Down Expand Up @@ -117,6 +121,9 @@ type MysqlConfig struct {
// ping mysql interval(ms)
PingTimeout int `json:"ping-timeout"`

// admit defeat count for ping mysql
AdmitDefeatPingCnt int `json:"admit-defeat-ping-count"`

// master system variables configure(separated by ;)
MasterSysVars string `json:"master-sysvars"`

Expand All @@ -135,16 +142,17 @@ type MysqlConfig struct {

func DefaultMysqlConfig() *MysqlConfig {
return &MysqlConfig{
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
PingTimeout: 1000,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
ReplHost: "127.0.0.1",
ReplUser: "repl",
ReplPasswd: "repl",
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
PingTimeout: 1000,
AdmitDefeatPingCnt: 2,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
ReplHost: "127.0.0.1",
ReplUser: "repl",
ReplPasswd: "repl",
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ const (
MysqlReadwrite Option = "READWRITE"
)

var (
downsLimits = 2
)

// PingEntry tuple.
type PingEntry struct {
Relay_Master_Log_File string
Expand Down Expand Up @@ -95,6 +91,8 @@ func (m *Mysql) Ping() {
var pe *PingEntry
log := m.log

downsLimits := m.conf.AdmitDefeatPingCnt

if db, err = m.getDB(); err != nil {
log.Error("mysql[%v].ping.getdb.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits)
if m.downs > downsLimits {
Expand Down
21 changes: 13 additions & 8 deletions src/raft/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Leader struct {
processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse

// leader send heartbeat request to other followers
sendHeartbeatHandler func(*int, chan *model.RaftRPCResponse)
sendHeartbeatHandler func(*bool, chan *model.RaftRPCResponse)

// leader process send heartbeat response
processHeartbeatResponseHandler func(*int, *model.RaftRPCResponse)
Expand All @@ -65,18 +65,23 @@ func (r *Leader) Loop() {
r.stateInit()
defer r.stateExit()

mysqlDowns := 0
mysqlDown := false
ackGranted := 1

lessHtAcks := 0
maxLessHtAcks := 10
maxLessHtAcks := r.Raft.conf.AdmitDefeatHtCnt

// send heartbeat
respChan := make(chan *model.RaftRPCResponse, r.getMembers())
r.sendHeartbeatHandler(&mysqlDowns, respChan)
r.sendHeartbeatHandler(&mysqlDown, respChan)
r.resetHeartbeatTimeout()

for r.getState() == LEADER {
if mysqlDown {
r.WARNING("feel.mysql.down.degrade.to.follower")
r.degradeToFollower()
}

select {
case <-r.fired:
r.WARNING("state.machine.loop.got.fired")
Expand All @@ -97,7 +102,7 @@ func (r *Leader) Loop() {

ackGranted = 1
respChan = make(chan *model.RaftRPCResponse, r.getMembers())
r.sendHeartbeatHandler(&mysqlDowns, respChan)
r.sendHeartbeatHandler(&mysqlDown, respChan)
r.resetHeartbeatTimeout()
case rsp := <-respChan:
r.processHeartbeatResponseHandler(&ackGranted, rsp)
Expand Down Expand Up @@ -248,10 +253,10 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf

// leaderSendHeartbeatHandler
// broadcast hearbeat requests to other peers of the cluster
func (r *Leader) sendHeartbeat(mysqlDowns *int, c chan *model.RaftRPCResponse) {
func (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) {
// check MySQL down
if r.mysql.GetState() == mysql.MysqlDead {
r.WARNING("feel.mysql.down")
*mysqlDown = true
return
}

Expand Down Expand Up @@ -495,7 +500,7 @@ func (r *Leader) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCReques
r.processRequestVoteRequestHandler = f
}

func (r *Leader) setSendHeartbeatHandler(f func(*int, chan *model.RaftRPCResponse)) {
func (r *Leader) setSendHeartbeatHandler(f func(*bool, chan *model.RaftRPCResponse)) {
r.sendHeartbeatHandler = f
}

Expand Down
2 changes: 1 addition & 1 deletion src/raft/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *Raft) mockLeaderProcessRequestVoteRequest(req *model.RaftRPCRequest) *m

// mock leader send heartbeat request
// nop here, so other followers will start a new leader election
func (r *Raft) mockLeaderSendHeartbeat(mysqlDownLimits *int, c chan *model.RaftRPCResponse) {
func (r *Raft) mockLeaderSendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) {
r.DEBUG("mock.send.nop.heartbeat.request")
}

Expand Down

0 comments on commit ccc971b

Please sign in to comment.