/
manager.go
162 lines (136 loc) · 3.93 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package chring
import (
"log"
"sync"
)
/*
RingManager still a WIP
*/
type RingManager struct {
sync.Mutex
// nodeNames []string
nodeRing *Ring
dataRing *Ring
keyFetcher func(nodeRing, dataRing *Ring, id string) (nodes, error)
keyStorer func(key string) error
keyRemover func(key string) error
}
func NewRingManager() *RingManager {
dr := NewRing()
return &RingManager{
nodeRing: NewRing(),
dataRing: dr,
keyFetcher: defaultKeyFetcher,
keyStorer: dr.defaultKeyStorer,
keyRemover: dr.defaultKeyRemover,
}
}
func (rm *RingManager) GetNodes() []string {
names := make([]string, len(rm.nodeRing.Nodes))
for i, n := range rm.nodeRing.Nodes {
names[i] = n.ID
}
return names
}
func (rm *RingManager) AddNode(nodeID string) error {
rm.Lock()
defer rm.Unlock()
rm.nodeRing.Add(nodeID)
return rm.keyStorer(nodeID)
}
func (rm *RingManager) RemoveNode(nodeID string) error {
rm.Lock()
defer rm.Unlock()
rm.nodeRing.Remove(nodeID)
return rm.keyRemover(nodeID)
}
func (rm *RingManager) AddKey(key string) error {
return rm.keyStorer(key)
}
func (rm *RingManager) RemoveKey(key string) error {
return rm.keyRemover(key)
}
func (rm *RingManager) GetKeys(nodeID string) (nodes, error) {
return rm.keyFetcher(rm.nodeRing, rm.dataRing, nodeID)
}
// SetKeyFetcher allows a user to override the default in memory ring store
func (rm *RingManager) SetKeyFetcher(fn func(nodeRing, dataRing *Ring, id string) (nodes, error)) {
rm.keyFetcher = fn
}
// SetKeyStorer allows a user to override the default in memory key store
func (rm *RingManager) SetKeyStorer(fn func(key string) error) {
rm.keyStorer = fn
}
// Debug if true, prints verbose logging
var Debug = false
func debugf(format string, v ...interface{}) {
if Debug {
log.Printf(format, v...)
}
}
func defaultKeyFetcher(nodeRing, dataRing *Ring, id string) (nodes, error) {
// r.Lock()
// defer r.Unlock()
startIndex := nodeRing.findNode(id)
if nodeRing.Nodes[startIndex].ID != id {
return nil, ErrNotFound
}
endIndex := startIndex + 1
wraps := false
if endIndex >= len(nodeRing.Nodes) {
endIndex = 0
wraps = true
}
debugf("looking for %q in", id)
for i := 0; i < len(nodeRing.Nodes); i++ {
debugf(">> node ring %+v", nodeRing.Nodes[i])
}
debugf("\nstartIndex (node %q): %d\nendIndex (the next node): %d", id, startIndex, endIndex)
debugf("node ring length: %d", len(nodeRing.Nodes))
debugf("data ring length: %d", len(dataRing.Nodes))
debugf("end := dataRing.searchByHashID(nodeRing.Nodes[endIndex].HashID)")
debugf("end := dataRing.searchByHashID(nodeRing.Nodes[%d].HashID)", endIndex)
debugf("end := dataRing.searchByHashID(%d)", nodeRing.Nodes[endIndex].HashID)
debugf("end := %d", dataRing.searchByHashID(nodeRing.Nodes[endIndex].HashID))
start := dataRing.searchByHashID(nodeRing.Nodes[startIndex].HashID)
end := dataRing.searchByHashID(nodeRing.Nodes[endIndex].HashID)
debugf("parsing dataNodes. [%d] -> [%d]", start, end)
for i := 0; i < len(dataRing.Nodes); i++ {
debugf(">> data ring [%d] %+v", i, dataRing.Nodes[i])
}
// we subtract 2 because we don't count the start and end keys themselves as they are the node hashes, not key hashes
size := start - end - 2
if size < 0 {
size = end - start - 2
}
if size < 0 {
size = 0
}
dataNodes := make(nodes, size)
if !wraps {
debugf("does not wrap")
for i := start + 1; i < end; i++ {
debugf("appending [%d] %+v", i, dataRing.Nodes[i])
dataNodes = append(dataNodes, dataRing.Nodes[i])
}
} else {
debugf("wraps")
for i := start + 1; i < len(dataRing.Nodes); i++ {
debugf("appending [%d] %+v", i, dataRing.Nodes[i])
dataNodes = append(dataNodes, dataRing.Nodes[i])
}
for i := 0; i < end; i++ {
debugf("appending [%d] %+v", i, dataRing.Nodes[i])
dataNodes = append(dataNodes, dataRing.Nodes[i])
}
}
return dataNodes, nil
}
func (r *Ring) defaultKeyStorer(key string) error {
r.Add(key)
return nil
}
func (r *Ring) defaultKeyRemover(key string) error {
r.Remove(key)
return nil
}