/
nodes.go
247 lines (215 loc) · 5.73 KB
/
nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package http
import (
"bytes"
"encoding/json"
"io"
"sort"
"sync"
"time"
"github.com/rqlite/rqlite/v8/store"
)
// Node represents a single node in the cluster and can include
// information about the node's reachability and leadership status.
// If there was an error communicating with the node, the Error
// field will be populated.
type Node struct {
ID string `json:"id,omitempty"`
APIAddr string `json:"api_addr,omitempty"`
Addr string `json:"addr,omitempty"`
Voter bool `json:"voter"`
Reachable bool `json:"reachable"`
Leader bool `json:"leader"`
Time float64 `json:"time,omitempty"`
TimeS string `json:"time_s,omitempty"`
Error string `json:"error,omitempty"`
mu sync.Mutex
}
// NewNodeFromServer creates a Node from a Server.
func NewNodeFromServer(s *store.Server) *Node {
return &Node{
ID: s.ID,
Addr: s.Addr,
Voter: s.Suffrage == "Voter",
}
}
// Test tests the node's reachability and leadership status. If an error
// occurs, the Error field will be populated.
func (n *Node) Test(ga GetAddresser, leaderAddr string, retries int, timeout time.Duration) {
start := time.Now()
n.Time = time.Since(start).Seconds()
n.TimeS = time.Since(start).String()
n.Reachable = false
n.Leader = false
timer := time.NewTimer(timeout)
defer timer.Stop()
done := make(chan struct{})
go func() {
defer close(done)
apiAddr, err := ga.GetNodeAPIAddr(n.Addr, retries, timeout)
if err != nil {
n.SetError(err.Error())
return
}
n.APIAddr = apiAddr
n.Reachable = true
n.Leader = n.Addr == leaderAddr
}()
select {
case <-timer.C:
n.SetError("timeout waiting for node to respond")
case <-done:
}
}
// SetError sets the Error field of the Node in a synchronized manner.
func (n *Node) SetError(err string) {
n.mu.Lock()
defer n.mu.Unlock()
n.Error = err
}
// MarshalJSON implements the json.Marshaler interface.
func (n *Node) MarshalJSON() ([]byte, error) {
n.mu.Lock()
defer n.mu.Unlock()
type Alias Node
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(n),
})
}
type Nodes []*Node
func (n Nodes) Len() int { return len(n) }
func (n Nodes) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
func (n Nodes) Less(i, j int) bool { return n[i].ID < n[j].ID }
// NewNodesFromServers creates a slice of Nodes from a slice of Servers.
func NewNodesFromServers(servers []*store.Server) Nodes {
nodes := make([]*Node, len(servers))
for i, s := range servers {
nodes[i] = NewNodeFromServer(s)
}
sort.Sort(Nodes(nodes))
return nodes
}
// Voters returns a slice of Nodes that are voters.
func (n Nodes) Voters() Nodes {
v := make(Nodes, 0)
for _, node := range n {
if node.Voter {
v = append(v, node)
}
}
sort.Sort(v)
return v
}
// HasAddr returns whether any node in the Nodes slice has the given Raft address.
func (n Nodes) HasAddr(addr string) bool {
for _, node := range n {
if node.Addr == addr {
return true
}
}
return false
}
// GetNode returns the Node with the given ID, or nil if no such node exists.
func (n Nodes) GetNode(id string) *Node {
for _, node := range n {
if node.ID == id {
return node
}
}
return nil
}
// Test tests the reachability and leadership status of all nodes. It does this
// in parallel, and blocks until all nodes have been tested.
func (n Nodes) Test(ga GetAddresser, leaderAddr string, retries int, timeout time.Duration) {
var wg sync.WaitGroup
for _, nn := range n {
wg.Add(1)
go func(nnn *Node) {
defer wg.Done()
nnn.Test(ga, leaderAddr, retries, timeout)
}(nn)
}
wg.Wait()
}
// NodesRespEncoder encodes Nodes into JSON with an option for legacy format.
type NodesRespEncoder struct {
writer io.Writer
legacy bool
prefix string
indent string
}
// NewNodesRespEncoder creates a new NodesRespEncoder instance with the specified
// io.Writer and legacy flag.
func NewNodesRespEncoder(w io.Writer, legacy bool) *NodesRespEncoder {
return &NodesRespEncoder{
writer: w,
legacy: legacy,
}
}
// SetIndent sets the indentation format for the JSON output.
func (e *NodesRespEncoder) SetIndent(prefix, indent string) {
e.prefix = prefix
e.indent = indent
}
// Encode takes a slice of Nodes and encodes it into JSON,
// writing the output to the Encoder's writer.
func (e *NodesRespEncoder) Encode(nodes Nodes) error {
var data []byte
var err error
if e.legacy {
data, err = e.encodeLegacy(nodes)
} else {
data, err = e.encode(nodes)
}
if err != nil {
return err
}
if e.indent != "" {
var buf bytes.Buffer
err = json.Indent(&buf, data, e.prefix, e.indent)
if err != nil {
return err
}
data = buf.Bytes()
}
_, err = e.writer.Write(data)
return err
}
// encode encodes the nodes in the standard format.
func (e *NodesRespEncoder) encode(nodes Nodes) ([]byte, error) {
nodeOutput := &struct {
Nodes Nodes `json:"nodes"`
}{
Nodes: nodes,
}
return json.Marshal(nodeOutput)
}
// encodeLegacy encodes the nodes in the legacy format.
func (e *NodesRespEncoder) encodeLegacy(nodes Nodes) ([]byte, error) {
legacyOutput := make(map[string]*Node)
for _, node := range nodes {
legacyOutput[node.ID] = node
}
return json.Marshal(legacyOutput)
}
// NodesRespDecoder decodes JSON data into a slice of Nodes.
type NodesRespDecoder struct {
reader io.Reader
}
// NewNodesRespDecoder creates a new Decoder instance with the specified io.Reader.
func NewNodesRespDecoder(r io.Reader) *NodesRespDecoder {
return &NodesRespDecoder{reader: r}
}
// Decode reads JSON from its reader and decodes it into the provided Nodes slice.
func (d *NodesRespDecoder) Decode(nodes *Nodes) error {
// Temporary structure to facilitate decoding.
var data struct {
Nodes Nodes `json:"nodes"`
}
if err := json.NewDecoder(d.reader).Decode(&data); err != nil {
return err
}
*nodes = data.Nodes
return nil
}