/
peer.go
132 lines (116 loc) · 3.02 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* Xenon
*
* Copyright 2018 The Xenon Authors.
* Code is licensed under the GPLv3.
*
*/
package raft
import (
"model"
"xbase/xrpc"
)
// Peer tuple.
type Peer struct {
raft *Raft
requestTimeout int // peer client request timneout
heartbeatTimeout int
connectionStr string // peer connection string
}
// NewPeer creates new Peer.
func NewPeer(raft *Raft, connectionStr string, requestTimeout int, heartbeatTimeout int) *Peer {
return &Peer{
raft: raft,
connectionStr: connectionStr,
requestTimeout: requestTimeout,
heartbeatTimeout: heartbeatTimeout,
}
}
// sendHeartbeat
// send heartbeat rpc request
func (p *Peer) sendHeartbeat(c chan *model.RaftRPCResponse) {
// response
rsp := model.NewRaftRPCResponse(model.OK)
// request body
req := model.NewRaftRPCRequest()
req.Raft.EpochID = p.raft.getEpochID()
req.Raft.ViewID = p.raft.getViewID()
req.Raft.From = p.raft.getID()
req.Raft.To = p.getID()
req.Raft.Leader = p.raft.getLeader()
req.Peers = p.raft.getPeers()
req.Repl = p.raft.mysql.GetRepl()
client, cleanup, err := p.NewClient()
if err != nil {
p.raft.ERROR("send.heartbeat.to.peer[%v].new.client.error[%v]", p.getID(), err)
rsp.RetCode = model.ErrorRPCCall
c <- rsp
return
}
defer cleanup()
method := model.RPCRaftHeartbeat
err = client.CallTimeout(p.requestTimeout, method, req, rsp)
if err != nil {
p.raft.ERROR("send.heartbeat.to[%v].client.call.error[%v]", p.getID(), err)
rsp.RetCode = model.ErrorRPCCall
c <- rsp
return
}
c <- rsp
}
// sendRequestVote
// send vote rpc request
func (p *Peer) sendRequestVote(c chan *model.RaftRPCResponse) {
var err error
// response
rsp := model.NewRaftRPCResponse(model.OK)
// request body
req := model.NewRaftRPCRequest()
req.Raft.EpochID = p.raft.meta.EpochID
req.Raft.ViewID = p.raft.meta.ViewID
req.Raft.From = p.raft.getID()
req.Raft.To = p.connectionStr
req.Raft.Leader = p.raft.getLeader()
req.GTID, err = p.raft.getGTID()
if err != nil {
p.raft.ERROR("send.requestvote.to.peer[%v].get.gtid.error[%v]", p.getID(), err)
rsp.RetCode = model.ErrorMySQLDown
c <- rsp
return
}
p.raft.WARNING("send.requestvote.to.peer[%v].request.gtid[%v]", p.getID(), req.GTID)
client, cleanup, err := p.NewClient()
if err != nil {
p.raft.ERROR("send.requestvote.to.peer[%v].new.client.error[%v]", p.getID(), err)
rsp.RetCode = model.ErrorRPCCall
c <- rsp
return
}
defer cleanup()
method := model.RPCRaftRequestVote
err = client.CallTimeout(p.requestTimeout, method, req, rsp)
if err != nil {
p.raft.ERROR("send.requestvote.to.peer[%v].client.call.error[%v]", p.getID(), err)
rsp.RetCode = model.ErrorRPCCall
c <- rsp
return
}
c <- rsp
}
// NewClient creates new client.
func (p *Peer) NewClient() (*xrpc.Client, func(), error) {
client, err := xrpc.NewClient(p.connectionStr, p.requestTimeout)
if err != nil {
return nil, nil, err
}
return client, func() {
client.Close()
}, nil
}
// attributes
func (p *Peer) freePeer() {
// nop
}
func (p *Peer) getID() string {
return p.connectionStr
}