-
Notifications
You must be signed in to change notification settings - Fork 4
/
cluster.go
216 lines (179 loc) · 4.73 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package cluster
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"sort"
"time"
"github.com/cespare/xxhash"
"github.com/hashicorp/memberlist"
"github.com/mattbostock/timbala/internal/hashring"
"github.com/sirupsen/logrus"
)
const (
primaryKeyDateFormat = "20060102"
DefaultReplFactor = 3
)
func New(conf *Config, l *logrus.Logger) (*cluster, error) {
if conf.ReplicationFactor == 0 {
conf.ReplicationFactor = DefaultReplFactor
}
cluster := &cluster{
log: l,
replFactor: conf.ReplicationFactor,
ring: hashring.New(),
}
// FIXME(mbostock): Consider using a non-local config for memberlist
memberConf := memberlist.DefaultLocalConfig()
memberConf.AdvertiseAddr = conf.GossipAdvertiseAddr.IP.String()
memberConf.AdvertisePort = conf.GossipAdvertiseAddr.Port
memberConf.BindAddr = conf.GossipBindAddr.IP.String()
memberConf.BindPort = conf.GossipBindAddr.Port
memberConf.Delegate = &delegate{
localHTTPAdvertiseAddr: conf.HTTPAdvertiseAddr.String(),
}
memberConf.Events = &eventDelegate{
cluster: cluster,
log: l,
}
memberConf.LogOutput = ioutil.Discard
ml, err := memberlist.Create(memberConf)
if err != nil {
return nil, fmt.Errorf("failed to configure cluster settings: %s", err)
}
ml.Join(conf.Peers)
cluster.ml = &membership{ml}
return cluster, nil
}
func (c *cluster) LocalNode() *Node {
return c.ml.LocalNode()
}
func (c *cluster) Nodes() Nodes {
return c.ml.Nodes()
}
func (c *cluster) NodesByPartitionKey(pKey uint64) Nodes {
nodes := c.Nodes()
nodesUsed := make(map[*Node]bool, len(nodes))
retNodes := make(Nodes, 0, len(nodes))
// Sort nodes to ensure function is deterministic
sort.Stable(nodes)
for i := 0; i < c.ReplicationFactor(); i++ {
if len(nodesUsed) == c.ReplicationFactor() || len(nodesUsed) == len(nodes) {
break
}
hashedNodeIndex := c.HashRing().Get(uint64(i)+pKey, len(nodes))
useNextNode := false
nodeLoop:
for j := 0; ; j++ {
if j == 2 {
panic("iterated through all nodes twice and still couldn't find a match")
}
for k, n := range nodes {
if int32(k) == hashedNodeIndex || useNextNode {
if _, ok := nodesUsed[n]; ok {
useNextNode = true
continue
}
retNodes = append(retNodes, n)
nodesUsed[n] = true
break nodeLoop
}
}
}
}
return retNodes
}
func PartitionKey(end time.Time, metricHash uint64) uint64 {
// FIXME filter quantile and le when hashing for data locality?
return xxhash.Sum64String(end.Format(primaryKeyDateFormat)) + metricHash
}
func (c *cluster) ReplicationFactor() int {
return c.replFactor
}
func (c *cluster) HashRing() hashring.HashRing {
return c.ring
}
type Node struct {
mln *memberlist.Node
}
func (n *Node) meta() (m nodeMeta, err error) {
err = json.Unmarshal(n.mln.Meta, &m)
return
}
func (n *Node) Name() string {
return n.mln.Name
}
func (n *Node) Addr() string {
return n.mln.Address()
}
func (n *Node) HTTPAddr() (string, error) {
m, err := n.meta()
if err != nil {
return "", err
}
return m.HTTPAddr, nil
}
func (n *Node) String() string {
return n.Name()
}
type Nodes []*Node
func (nodes Nodes) Len() int { return len(nodes) }
func (nodes Nodes) Less(i, j int) bool { return nodes[i].Name() < nodes[j].Name() }
func (nodes Nodes) Swap(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }
type delegate struct {
localHTTPAdvertiseAddr string
}
func (d *delegate) NodeMeta(limit int) []byte {
// FIXME respect limit
j, _ := json.Marshal(&nodeMeta{
HTTPAddr: d.localHTTPAdvertiseAddr,
})
return j
}
func (d *delegate) NotifyMsg([]byte) {}
func (d *delegate) GetBroadcasts(overhead int, limit int) [][]byte {
return [][]byte{}
}
func (d *delegate) LocalState(join bool) []byte {
return []byte{}
}
func (d *delegate) MergeRemoteState(buf []byte, join bool) {}
type nodeMeta struct {
HTTPAddr string `json:"http_addr"`
}
type eventDelegate struct {
cluster *cluster
log *logrus.Logger
}
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
e.log.Infof("Node joined: %s on %s", n.Name, n.Address())
}
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.log.Infof("Node left cluster: %s on %s", n.Name, n.Address())
// FIXME remove node from ring
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
e.log.Infof("Node updated: %s on %s", n.Name, n.Address())
}
type cluster struct {
log *logrus.Logger
ml Membership
replFactor int
ring hashring.HashRing
}
type Config struct {
HTTPAdvertiseAddr net.TCPAddr
HTTPBindAddr net.TCPAddr
GossipAdvertiseAddr net.TCPAddr
GossipBindAddr net.TCPAddr
Peers []string
ReplicationFactor int
}
type Cluster interface {
HashRing() hashring.HashRing
LocalNode() *Node
Nodes() Nodes
NodesByPartitionKey(uint64) Nodes
ReplicationFactor() int
}