-
Notifications
You must be signed in to change notification settings - Fork 397
/
selector.go
245 lines (213 loc) · 7.26 KB
/
selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package nodeselection
import (
mathrand "math/rand"
"storj.io/common/storj"
)
// UnvettedSelector selects new nodes first based on newNodeFraction, and selects old nodes for the remaining.
func UnvettedSelector(newNodeFraction float64, init NodeSelectorInit) NodeSelectorInit {
return func(nodes []*SelectedNode, filter NodeFilter) NodeSelector {
var newNodes []*SelectedNode
var oldNodes []*SelectedNode
for _, node := range nodes {
if node.Vetted {
oldNodes = append(oldNodes, node)
} else {
newNodes = append(newNodes, node)
}
}
newSelector := init(newNodes, filter)
oldSelector := init(oldNodes, filter)
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) ([]*SelectedNode, error) {
newNodeCount := int(float64(n) * newNodeFraction)
selectedNewNodes, err := newSelector(newNodeCount, excluded, alreadySelected)
if err != nil {
return selectedNewNodes, err
}
remaining := n - len(selectedNewNodes)
selectedOldNodes, err := oldSelector(remaining, excluded, alreadySelected)
if err != nil {
return selectedNewNodes, err
}
return append(selectedOldNodes, selectedNewNodes...), nil
}
}
}
// FilteredSelector uses another selector on the filtered list of nodes.
func FilteredSelector(preFilter NodeFilter, init NodeSelectorInit) NodeSelectorInit {
return func(nodes []*SelectedNode, filter NodeFilter) NodeSelector {
var filteredNodes []*SelectedNode
for _, node := range nodes {
if preFilter.Match(node) {
filteredNodes = append(filteredNodes, node)
}
}
return init(filteredNodes, filter)
}
}
// AttributeGroupSelector first selects a group with equal chance (like last_net) and choose node from the group randomly.
func AttributeGroupSelector(attribute NodeAttribute) NodeSelectorInit {
return func(nodes []*SelectedNode, filter NodeFilter) NodeSelector {
nodeByAttribute := make(map[string][]*SelectedNode)
for _, node := range nodes {
if filter != nil && !filter.Match(node) {
continue
}
a := attribute(*node)
if _, found := nodeByAttribute[a]; !found {
nodeByAttribute[a] = make([]*SelectedNode, 0)
}
nodeByAttribute[a] = append(nodeByAttribute[a], node)
}
var attributes []string
for k := range nodeByAttribute {
attributes = append(attributes, k)
}
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}
r := NewRandomOrder(len(nodeByAttribute))
for r.Next() {
nodes := nodeByAttribute[attributes[r.At()]]
if includedInNodes(alreadySelected, nodes...) {
continue
}
rs := NewRandomOrder(len(nodes))
for rs.Next() {
candidate := nodes[rs.At()].Clone()
if !included(excluded, candidate) && !includedInNodes(selected, candidate) {
selected = append(selected, nodes[rs.At()].Clone())
}
break
}
if len(selected) >= n {
break
}
}
return selected, nil
}
}
}
func included(alreadySelected []storj.NodeID, nodes ...*SelectedNode) bool {
for _, node := range nodes {
for _, id := range alreadySelected {
if node.ID == id {
return true
}
}
}
return false
}
func includedInNodes(alreadySelected []*SelectedNode, nodes ...*SelectedNode) bool {
for _, node := range nodes {
for _, as := range alreadySelected {
if node.ID == as.ID {
return true
}
}
}
return false
}
// RandomSelector selects any nodes with equal chance.
func RandomSelector() NodeSelectorInit {
return func(nodes []*SelectedNode, filter NodeFilter) NodeSelector {
var filteredNodes []*SelectedNode
for _, node := range nodes {
if filter != nil && !filter.Match(node) {
continue
}
filteredNodes = append(filteredNodes, node)
}
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}
r := NewRandomOrder(len(filteredNodes))
for r.Next() {
candidate := filteredNodes[r.At()]
if includedInNodes(alreadySelected, candidate) || included(excluded, candidate) || includedInNodes(selected, candidate) {
continue
}
selected = append(selected, candidate.Clone())
if len(selected) >= n {
break
}
}
return selected, nil
}
}
}
// FilterSelector is a specific selector, which can filter out nodes from the upload selection.
// Note: this is different from the generic filter attribute of the NodeSelectorInit, as that is applied to all node selection (upload/download/repair).
func FilterSelector(loadTimeFilter NodeFilter, init NodeSelectorInit) NodeSelectorInit {
return func(nodes []*SelectedNode, selectionFilter NodeFilter) NodeSelector {
var filtered []*SelectedNode
for _, n := range nodes {
if loadTimeFilter.Match(n) {
filtered = append(filtered, n)
}
}
return init(filtered, selectionFilter)
}
}
// BalancedGroupBasedSelector first selects a group with equal chance (like last_net) and choose one single node randomly. .
// One group can be tried multiple times, and if the node is already selected, it will be ignored.
func BalancedGroupBasedSelector(attribute NodeAttribute) NodeSelectorInit {
rng := mathrand.New(mathrand.NewSource(mathrand.Int63()))
return func(nodes []*SelectedNode, filter NodeFilter) NodeSelector {
mon.IntVal("selector_balanced_input_nodes").Observe(int64(len(nodes)))
nodeByAttribute := make(map[string][]*SelectedNode)
for _, node := range nodes {
if filter != nil && !filter.Match(node) {
continue
}
a := attribute(*node)
if _, found := nodeByAttribute[a]; !found {
nodeByAttribute[a] = make([]*SelectedNode, 0)
}
nodeByAttribute[a] = append(nodeByAttribute[a], node)
}
var groupedNodes [][]*SelectedNode
count := 0
for _, nodeList := range nodeByAttribute {
groupedNodes = append(groupedNodes, nodeList)
count += len(nodeList)
}
mon.IntVal("selector_balanced_filtered_nodes").Observe(int64(count))
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}
// for each node attribute --> how many nodes are selected already
var alreadySelectedGroup map[string]int
if len(alreadySelected) > 0 {
alreadySelectedGroup = make(map[string]int)
for _, node := range alreadySelected {
alreadySelectedGroup[attribute(*node)]++
}
}
// upper limit: we should find at least one node in each full group loop.
// Ideally we find len(group) in each iteration, so we stop earlier
for i := 0; i < n; i++ {
r := NewRandomOrder(len(groupedNodes))
// check all the groups in random order
for r.Next() {
nodes := groupedNodes[r.At()]
// this group has one chance to give a candidate
randomOne := nodes[rng.Intn(len(nodes))].Clone()
if !includedInNodes(alreadySelected, randomOne) && !included(excluded, randomOne) && !includedInNodes(selected, randomOne) {
selected = append(selected, randomOne)
}
if len(selected) >= n {
mon.IntVal("selector_balanced_requested").Observe(int64(n))
mon.IntVal("selector_balanced_found").Observe(int64(len(selected)))
return selected, nil
}
}
}
return nil, nil
}
}
}