/
join.go
102 lines (88 loc) · 2.74 KB
/
join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package cluster
import (
"context"
"errors"
"log"
"os"
"time"
"github.com/rqlite/rqlite/v8/cluster/proto"
command "github.com/rqlite/rqlite/v8/command/proto"
)
var (
// ErrNodeIDRequired is returned a join request doesn't supply a node ID
ErrNodeIDRequired = errors.New("node required")
// ErrJoinFailed is returned when a node fails to join a cluster
ErrJoinFailed = errors.New("failed to join cluster")
// ErrJoinCanceled is returned when a join operation is canceled
ErrJoinCanceled = errors.New("join operation canceled")
// ErrNotifyFailed is returned when a node fails to notify another node
ErrNotifyFailed = errors.New("failed to notify node")
)
// Joiner executes a node-join operation.
type Joiner struct {
numAttempts int
attemptInterval time.Duration
client *Client
creds *proto.Credentials
logger *log.Logger
}
// NewJoiner returns an instantiated Joiner.
func NewJoiner(client *Client, numAttempts int, attemptInterval time.Duration) *Joiner {
return &Joiner{
client: client,
numAttempts: numAttempts,
attemptInterval: attemptInterval,
logger: log.New(os.Stderr, "[cluster-join] ", log.LstdFlags),
}
}
// SetCredentials sets the credentials for the Joiner.
func (j *Joiner) SetCredentials(creds *proto.Credentials) {
j.creds = creds
}
// Do makes the actual join request. If the join is successful with any address,
// that address is returned. Otherwise, an error is returned.
func (j *Joiner) Do(ctx context.Context, targetAddrs []string, id, addr string, suf Suffrage) (string, error) {
if id == "" {
return "", ErrNodeIDRequired
}
var err error
var joinee string
for i := 0; i < j.numAttempts; i++ {
for _, ta := range targetAddrs {
select {
case <-ctx.Done():
return "", ErrJoinCanceled
default:
joinee, err = j.join(ta, id, addr, suf)
if err == nil {
return joinee, nil
}
j.logger.Printf("failed to join via node at %s: %s", ta, err)
}
}
if i+1 < j.numAttempts {
// This logic message only make sense if performing more than 1 join-attempt.
j.logger.Printf("failed to join cluster at %s, sleeping %s before retry", targetAddrs, j.attemptInterval)
select {
case <-ctx.Done():
return "", ErrJoinCanceled
case <-time.After(j.attemptInterval):
continue // Proceed with the next attempt
}
}
}
j.logger.Printf("failed to join cluster at %s, after %d attempt(s)", targetAddrs, j.numAttempts)
return "", ErrJoinFailed
}
func (j *Joiner) join(targetAddr, id, addr string, suf Suffrage) (string, error) {
req := &command.JoinRequest{
Id: id,
Address: addr,
Voter: suf.IsVoter(),
}
// Attempt to join.
if err := j.client.Join(req, targetAddr, j.creds, time.Second); err != nil {
return "", err
}
return targetAddr, nil
}