Skip to content

Commit

Permalink
satellite/nodeselection: differentiate between excluded and alreadySe…
Browse files Browse the repository at this point in the history
…lected nodes

Before this patch, the NodeSelector interface was this:

```
type NodeSelector func(n int, alreadySelected []storj.NodeID) ([]*SelectedNode, error)
```

The problem is that we don't differentiate between alreadySelected and excluded nodes.

 1. from excluded nodes, we only need the ID. These should always be excluded (for example, because a retry)
 2. alreadySelected nodes are different, we need the full node information to make better decision

 A good example for the better decision is the updated `BalancedGroupBasedSelector`.

 Let's say we have nodes from three groups:
  * 1 nodes from group A
  * 10 nodes from group B
  * 10 nodes from group C

If we need 9 nodes, based on the current logic, we try to select 3 nodes, from each groups.
It's not possible with group A, therefore the selection will be sg like  to A=1, B=4, C=5.

But what happens if we already have 4 B nodes, and we need to select 5.

The current logic will try to select 5 equally from A and B and C. But that's not what we need:
our goal is having an equal balance after the selection (which means that we need A + C nodes instead)

Change-Id: If4c1deab08ea69f86bc8f3a7f2a4324ba99d6137
  • Loading branch information
elek authored and Storj Robot committed Mar 6, 2024
1 parent 054f7b7 commit 32b05eb
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 67 deletions.
4 changes: 2 additions & 2 deletions satellite/audit/disqualification_test.go
Expand Up @@ -174,8 +174,8 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
require.NoError(t, err)

request := overlay.FindStorageNodesRequest{
RequestedCount: 4,
ExcludedIDs: nil,
RequestedCount: 4,
AlreadySelected: nil,
}
nodes, err := satellitePeer.Overlay.Service.FindStorageNodesForUpload(ctx, request)
assert.True(t, overlay.ErrNotEnoughNodes.Has(err))
Expand Down
4 changes: 2 additions & 2 deletions satellite/nodeselection/config_test.go
Expand Up @@ -78,7 +78,7 @@ func TestParsedConfig(t *testing.T) {
{
Vetted: false,
},
}, nil)(1, nil)
}, nil)(1, nil, nil)

// having: new, requires: 0% unvetted = 100% vetted
require.Len(t, selected, 0)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestSelectorFromString(t *testing.T) {
initialized := selector(nodes, nil)

for i := 0; i < 100; i++ {
selected, err := initialized(1, []storj.NodeID{})
selected, err := initialized(1, []storj.NodeID{}, nil)
require.NoError(t, err)
require.Len(t, selected, 1)
require.NotEqual(t, testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion()).ID, selected[0].ID)
Expand Down
4 changes: 3 additions & 1 deletion satellite/nodeselection/placement.go
Expand Up @@ -58,7 +58,9 @@ var _ NodeFilterWithAnnotation = Placement{}
type NodeSelectorInit func([]*SelectedNode, NodeFilter) NodeSelector

// NodeSelector pick random nodes based on a specific algorithm.
type NodeSelector func(n int, alreadySelected []storj.NodeID) ([]*SelectedNode, error)
// Nodes from excluded should never be used. Same is true for alreadySelected, but it may also trigger other restrictions
// (for example, when a last_net is already selected, all the nodes from the same net should be excluded as well.
type NodeSelector func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) ([]*SelectedNode, error)

// ErrPlacement is used for placement definition related parsing errors.
var ErrPlacement = errs.Class("placement")
Expand Down
32 changes: 22 additions & 10 deletions satellite/nodeselection/selector.go
Expand Up @@ -24,16 +24,16 @@ func UnvettedSelector(newNodeFraction float64, init NodeSelectorInit) NodeSelect

newSelector := init(newNodes, filter)
oldSelector := init(oldNodes, filter)
return func(n int, alreadySelected []storj.NodeID) ([]*SelectedNode, error) {
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) ([]*SelectedNode, error) {
newNodeCount := int(float64(n) * newNodeFraction)

selectedNewNodes, err := newSelector(newNodeCount, alreadySelected)
selectedNewNodes, err := newSelector(newNodeCount, excluded, alreadySelected)
if err != nil {
return selectedNewNodes, err
}

remaining := n - len(selectedNewNodes)
selectedOldNodes, err := oldSelector(remaining, alreadySelected)
selectedOldNodes, err := oldSelector(remaining, excluded, alreadySelected)
if err != nil {
return selectedNewNodes, err
}
Expand Down Expand Up @@ -75,21 +75,24 @@ func AttributeGroupSelector(attribute NodeAttribute) NodeSelectorInit {
attributes = append(attributes, k)
}

return func(n int, alreadySelected []storj.NodeID) (selected []*SelectedNode, err error) {
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}
r := NewRandomOrder(len(nodeByAttribute))
for r.Next() {
nodes := nodeByAttribute[attributes[r.At()]]

if included(alreadySelected, nodes...) {
if includedInNodes(alreadySelected, nodes...) {
continue
}

rs := NewRandomOrder(len(nodes))
for rs.Next() {
selected = append(selected, nodes[rs.At()].Clone())
candidate := nodes[rs.At()].Clone()
if !included(excluded, candidate) && !includedInNodes(selected, candidate) {
selected = append(selected, nodes[rs.At()].Clone())
}
break

}
Expand Down Expand Up @@ -136,15 +139,15 @@ func RandomSelector() NodeSelectorInit {
filteredNodes = append(filteredNodes, node)
}

return func(n int, alreadySelected []storj.NodeID) (selected []*SelectedNode, err error) {
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}
r := NewRandomOrder(len(filteredNodes))
for r.Next() {
candidate := filteredNodes[r.At()]

if included(alreadySelected, candidate) {
if includedInNodes(alreadySelected, candidate) || included(excluded, candidate) || includedInNodes(selected, candidate) {
continue
}

Expand Down Expand Up @@ -195,11 +198,20 @@ func BalancedGroupBasedSelector(attribute NodeAttribute) NodeSelectorInit {
groupedNodes = append(groupedNodes, nodeList)
}

return func(n int, alreadySelected []storj.NodeID) (selected []*SelectedNode, err error) {
return func(n int, excluded []storj.NodeID, alreadySelected []*SelectedNode) (selected []*SelectedNode, err error) {
if n == 0 {
return selected, nil
}

// for each node attribute --> how many nodes are selected already
var alreadySelectedGroup map[string]int
if len(alreadySelected) > 0 {
alreadySelectedGroup = make(map[string]int)
for _, node := range alreadySelected {
alreadySelectedGroup[attribute(*node)]++
}
}

// upper limit: we should find at least one node in each full group loop.
// Ideally we find len(group) in each iteration, so we stop earlier
for i := 0; i < n; i++ {
Expand All @@ -212,7 +224,7 @@ func BalancedGroupBasedSelector(attribute NodeAttribute) NodeSelectorInit {
// this group has one chance to give a candidate
randomOne := nodes[rng.Intn(len(nodes))].Clone()

if !included(alreadySelected, randomOne) && !includedInNodes(selected, randomOne) {
if !includedInNodes(alreadySelected, randomOne) && !included(excluded, randomOne) && !includedInNodes(selected, randomOne) {
selected = append(selected, randomOne)
}

Expand Down
78 changes: 69 additions & 9 deletions satellite/nodeselection/selector_test.go
Expand Up @@ -57,7 +57,7 @@ func TestSelectByID(t *testing.T) {

// perform many node selections that selects 2 nodes
for i := 0; i < executionCount; i++ {
selectedNodes, err := selector(reqCount, nil)
selectedNodes, err := selector(reqCount, nil, nil)
require.NoError(t, err)
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSelectBySubnet(t *testing.T) {

// perform many node selections that selects 2 nodes
for i := 0; i < executionCount; i++ {
selectedNodes, err := selector(reqCount, nil)
selectedNodes, err := selector(reqCount, nil, nil)
require.NoError(t, err)
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestSelectBySubnetOneAtATime(t *testing.T) {

// perform many node selections that selects 1 node
for i := 0; i < executionCount; i++ {
selectedNodes, err := selector(reqCount, nil)
selectedNodes, err := selector(reqCount, nil, nil)
require.NoError(t, err)
require.Len(t, selectedNodes, reqCount)
for _, node := range selectedNodes {
Expand Down Expand Up @@ -254,15 +254,15 @@ func TestSelectFiltered(t *testing.T) {
nodes := []*nodeselection.SelectedNode{subnetA1, subnetA2, subnetB1}

selector := nodeselection.RandomSelector()(nodes, nil)
selected, err := selector(3, nil)
selected, err := selector(3, nil, nil)
require.NoError(t, err)
assert.Len(t, selected, 3)
selected, err = selector(3, nil)
selected, err = selector(3, nil, nil)
require.NoError(t, err)
assert.Len(t, selected, 3)

selector = nodeselection.RandomSelector()(nodes, nodeselection.NodeFilters{}.WithExcludedIDs([]storj.NodeID{firstID, secondID}))
selected, err = selector(3, nil)
selected, err = selector(3, nil, nil)
require.NoError(t, err)
assert.Len(t, selected, 1)
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestSelectFilteredMulti(t *testing.T) {
require.NoError(t, err)
selector := nodeselection.AttributeGroupSelector(attribute)(nodes, filter)
for i := 0; i < 100; i++ {
selected, err := selector(4, nil)
selected, err := selector(4, nil, nil)
require.NoError(t, err)
assert.Len(t, selected, 4)
}
Expand All @@ -316,7 +316,7 @@ func TestFilterSelector(t *testing.T) {

initialized := selector(nodes, nil)
for i := 0; i < 100; i++ {
selected, err := initialized(3, []storj.NodeID{})
selected, err := initialized(3, []storj.NodeID{}, nil)
require.NoError(t, err)
for _, s := range selected {
for _, w := range list {
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestBalancedSelector(t *testing.T) {

badSelection := 0
for i := 0; i < 1000; i++ {
selectedNodes, err := selector(10, nil)
selectedNodes, err := selector(10, nil, nil)
require.NoError(t, err)

require.Len(t, selectedNodes, 10)
Expand All @@ -374,3 +374,63 @@ func TestBalancedSelector(t *testing.T) {
// and all the other random selection will select the same node again
require.True(t, badSelection < 5)
}

func TestBalancedSelectorWithExisting(t *testing.T) {
attribute, err := nodeselection.CreateNodeAttribute("tag:owner")
require.NoError(t, err)

ownerCounts := map[string]int{"A": 3, "B": 10, "C": 30, "D": 5, "E": 1}
var nodes []*nodeselection.SelectedNode

var excluded []storj.NodeID
var alreadySelected []*nodeselection.SelectedNode

idIndex := 0
for owner, count := range ownerCounts {
for i := 0; i < count; i++ {
nodes = append(nodes, &nodeselection.SelectedNode{
ID: testidentity.MustPregeneratedIdentity(idIndex, storj.LatestIDVersion()).ID,
Tags: nodeselection.NodeTags{
{
Name: "owner",
Value: []byte(owner),
},
},
})
idIndex++
if owner == "A" {
excluded = append(excluded, nodes[len(nodes)-1].ID)
}
if owner == "B" && len(alreadySelected) < 9 {
alreadySelected = append(alreadySelected, nodes[len(nodes)-1])
}
}
}

selector := nodeselection.BalancedGroupBasedSelector(attribute)(nodes, nil)

histogram := map[string]int{}
for i := 0; i < 1000; i++ {
selectedNodes, err := selector(7, excluded, alreadySelected)
require.NoError(t, err)

require.Len(t, selectedNodes, 7)

for _, node := range selectedNodes {
histogram[attribute(*node)]++
}
}
// from the initial {"A": 3, "B": 10, "C": 30, "D": 5, "E": 1}

// A is fully excluded
require.Equal(t, 0, histogram["A"])

require.Greater(t, histogram["B"], 100)
require.Less(t, histogram["B"], 800)

require.Greater(t, histogram["C"], 1000)
require.Greater(t, histogram["D"], 1000)

require.Equal(t, 1000, histogram["E"])

}
4 changes: 2 additions & 2 deletions satellite/nodeselection/state.go
Expand Up @@ -29,12 +29,12 @@ func NewState(nodes []*SelectedNode, placements PlacementDefinitions) State {
}

// Select picks the required nodes given a specific placement.
func (s State) Select(p storj.PlacementConstraint, count int, alreadySelected []storj.NodeID) ([]*SelectedNode, error) {
func (s State) Select(p storj.PlacementConstraint, count int, excluded []storj.NodeID, alreadySelected []*SelectedNode) ([]*SelectedNode, error) {
selector, found := s[p]
if !found {
return nil, Error.New("Placement is not defined: %d", p)
}
nodes, err := selector(count, alreadySelected)
nodes, err := selector(count, excluded, alreadySelected)
if len(nodes) < count {
return nodes, ErrNotEnoughNodes.New("requested from cache %d, found %d", count, len(nodes))
}
Expand Down
16 changes: 8 additions & 8 deletions satellite/nodeselection/state_test.go
Expand Up @@ -41,7 +41,7 @@ func TestState_SelectNonDistinct(t *testing.T) {
},
})
const selectCount = 5
selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.NoError(t, err)
require.Len(t, selected, selectCount)
}
Expand All @@ -54,7 +54,7 @@ func TestState_SelectNonDistinct(t *testing.T) {
Selector: nodeselection.UnvettedSelector(0.5, nodeselection.AttributeGroupSelector(lastNet)),
},
})
selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.NoError(t, err)
require.Len(t, selected, selectCount)
require.Len(t, intersectLists(selected, reputableNodes), selectCount*(1-newFraction))
Expand All @@ -70,7 +70,7 @@ func TestState_SelectNonDistinct(t *testing.T) {
},
})

selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.NoError(t, err)

require.Len(t, selected, selectCount)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestState_SelectDistinct(t *testing.T) {
},
})

selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.NoError(t, err)

require.Len(t, selected, selectCount)
Expand All @@ -118,7 +118,7 @@ func TestState_SelectDistinct(t *testing.T) {
},
})

selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.Error(t, err)
require.Len(t, selected, 2)
}
Expand All @@ -132,7 +132,7 @@ func TestState_SelectDistinct(t *testing.T) {
},
})

selected, err := state.Select(0, selectCount, nil)
selected, err := state.Select(0, selectCount, nil, nil)
require.NoError(t, err)
require.Len(t, selected, selectCount, nil)
require.Len(t, intersectLists(selected, reputableNodes), selectCount*(1-newFraction))
Expand Down Expand Up @@ -164,14 +164,14 @@ func TestState_Select_Concurrent(t *testing.T) {
var group errgroup.Group
group.Go(func() error {
const selectCount = 5
nodes, err := state.Select(0, selectCount, nil)
nodes, err := state.Select(0, selectCount, nil, nil)
require.Len(t, nodes, selectCount)
return err
})

group.Go(func() error {
const selectCount = 4
nodes, err := state.Select(0, selectCount, nil)
nodes, err := state.Select(0, selectCount, nil, nil)
require.Len(t, nodes, selectCount)
return err
})
Expand Down
8 changes: 4 additions & 4 deletions satellite/overlay/benchmark_test.go
Expand Up @@ -378,8 +378,8 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("FindStorageNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{
RequestedCount: SelectCount,
ExcludedIDs: nil,
RequestedCount: SelectCount,
AlreadySelected: nil,
})
require.NoError(b, err)
require.NotEmpty(b, selected)
Expand All @@ -400,8 +400,8 @@ func BenchmarkNodeSelection(b *testing.B) {
b.Run("UploadSelectionCacheGetNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.UploadSelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
RequestedCount: SelectCount,
ExcludedIDs: nil,
RequestedCount: SelectCount,
AlreadySelected: nil,
})
require.NoError(b, err)
require.NotEmpty(b, selected)
Expand Down

0 comments on commit 32b05eb

Please sign in to comment.