-
Notifications
You must be signed in to change notification settings - Fork 1
/
peer.go
134 lines (113 loc) · 3.18 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package cluster
import (
"net"
"strconv"
"time"
"github.com/SimonRichardson/coherence/pkg/cluster/members"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
const (
defaultBroadcastTimeout = time.Second * 10
defaultMembersBroadcastInterval = time.Second * 5
defaultLowMembersThreshold = 1
)
const (
// PeerTypeStore serves the store API
PeerTypeStore members.PeerType = "store"
)
// ParsePeerType parses a potential peer type and errors out if it's not a known
// valid type.
func ParsePeerType(t string) (members.PeerType, error) {
switch t {
case "store":
return members.PeerType(t), nil
default:
return "", errors.Errorf("invalid peer type (%s)", t)
}
}
// peer represents the node with in the cluster.
type peer struct {
members members.Members
logger log.Logger
}
// NewPeer creates or joins a cluster with the existing peers.
// We will listen for cluster communications on the bind addr:port.
// We advertise a PeerType HTTP API, reachable on apiPort.
func NewPeer(
members members.Members,
logger log.Logger,
) Peer {
return &peer{
members: members,
logger: logger,
}
}
// Close out the API
func (p *peer) Close() {}
func (p *peer) Join() (int, error) {
numNodes, err := p.members.Join()
if err != nil {
return 0, err
}
return numNodes, nil
}
// Leave the cluster.
func (p *peer) Leave() error {
// Ignore this timeout for now, serf uses a config timeout.
return p.members.Leave()
}
// Name returns unique ID of this peer in the cluster.
func (p *peer) Name() string {
return p.members.MemberList().LocalNode().Name()
}
// Address returns host:port of this peer in the cluster.
func (p *peer) Address() string {
return p.members.MemberList().LocalNode().Address()
}
// ClusterSize returns the total size of the cluster from this node's
// perspective.
func (p *peer) ClusterSize() int {
return p.members.MemberList().NumMembers()
}
// State returns a JSON-serializable dump of cluster state.
// Useful for debug.
func (p *peer) State() map[string]interface{} {
members := p.members.MemberList()
return map[string]interface{}{
"self": members.LocalNode().Name(),
"members": memberNames(members.Members()),
"num_members": members.NumMembers(),
}
}
// Current API host:ports for the given type of node.
// IncludeLocal doesn't add the local cluster node to the resulting set.
func (p *peer) Current(peerType members.PeerType, includeLocal bool) (res []string, err error) {
localName := p.Name()
err = p.members.Walk(func(info members.PeerInfo) error {
if !includeLocal && info.Name == localName {
return nil
}
if peerType == PeerTypeStore && info.Type == PeerTypeStore {
res = append(res, net.JoinHostPort(info.APIAddr, strconv.Itoa(info.APIPort)))
}
return nil
})
return
}
func (p *peer) RegisterEventHandler(fn members.EventHandler) error {
return p.members.RegisterEventHandler(fn)
}
func (p *peer) DeregisterEventHandler(fn members.EventHandler) error {
return p.members.DeregisterEventHandler(fn)
}
func (p *peer) DispatchEvent(e members.Event) error {
return p.members.DispatchEvent(e)
}
func memberNames(m []members.Member) []string {
res := make([]string, len(m))
for k, v := range m {
res[k] = v.Name()
}
return res
}