forked from etcd-io/etcd
/
raft_server.go
221 lines (173 loc) · 4.79 KB
/
raft_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package main
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
"github.com/coreos/go-raft"
)
var raftTransporter transporter
var raftServer *raft.Server
// Start the raft server
func startRaft(tlsConfig TLSConfig) {
if veryVerbose {
raft.SetLogLevel(raft.Debug)
}
var err error
raftName := info.Name
// Create transporter for raft
raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
// Create raft server
raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
if err != nil {
fatal(err)
}
// LoadSnapshot
if snapshot {
err = raftServer.LoadSnapshot()
if err == nil {
debugf("%s finished load snapshot", raftServer.Name())
} else {
debug(err)
}
}
raftServer.SetElectionTimeout(ElectionTimeout)
raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
raftServer.Start()
if raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
time.Sleep(time.Millisecond * 20)
// leader need to join self as a peer
for {
command := &JoinCommand{
Name: raftServer.Name(),
RaftURL: argInfo.RaftURL,
EtcdURL: argInfo.EtcdURL,
}
_, err := raftServer.Do(command)
if err == nil {
break
}
}
debugf("%s start as a leader", raftServer.Name())
// start as a follower in a existing cluster
} else {
time.Sleep(time.Millisecond * 20)
for i := 0; i < retryTimes; i++ {
success := false
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err = joinCluster(raftServer, machine, tlsConfig.Scheme)
if err != nil {
if err.Error() == errors[103] {
fatal(err)
}
debugf("cannot join to cluster via machine %s %s", machine, err)
} else {
success = true
break
}
}
if success {
break
}
warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
if err != nil {
fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
}
debugf("%s success join to the cluster", raftServer.Name())
}
} else {
// rejoin the previous cluster
debugf("%s restart as a follower", raftServer.Name())
}
// open the snapshot
if snapshot {
go monitorSnapshot()
}
// start to response to raft requests
go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
}
// Start to listen and response raft command
func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
u, _ := url.Parse(info.RaftURL)
infof("raft server [%s:%s]", info.Name, u)
raftMux := http.NewServeMux()
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: u.Host,
}
// internal commands
raftMux.HandleFunc("/name", NameHttpHandler)
raftMux.HandleFunc("/join", JoinHttpHandler)
raftMux.HandleFunc("/vote", VoteHttpHandler)
raftMux.HandleFunc("/log", GetLogHttpHandler)
raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
}
}
// Send join requests to the leader.
func joinCluster(s *raft.Server, raftURL string, scheme string) error {
var b bytes.Buffer
command := &JoinCommand{
Name: s.Name(),
RaftURL: info.RaftURL,
EtcdURL: info.EtcdURL,
}
json.NewEncoder(&b).Encode(command)
// t must be ok
t, ok := raftServer.Transporter().(transporter)
if !ok {
panic("wrong type")
}
joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
debugf("Send Join Request to %s", raftURL)
resp, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
resp, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
return fmt.Errorf(errors[103])
} else {
return fmt.Errorf("Unable to join")
}
}
}
return fmt.Errorf("Unable to join: %v", err)
}
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&TestAndSetCommand{})
}