Skip to content

Commit

Permalink
satellite/{nodeselection,repair}: Placement level invariant for custo…
Browse files Browse the repository at this point in the history
…mizable declumping

This patch extends the placement with the 'Invariant' interface. It's used by the repairer/repair checker to decide if declumping is needed. This will extend declumping, and will make it possible to use any invariant,
not just last_node based rules.

Change-Id: Ic8537ecd19022a2cfa06445f47a00d1110a7afa6
  • Loading branch information
elek authored and Storj Robot committed Jan 10, 2024
1 parent 5effb82 commit b41cf66
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cmd/satellite/repair_segment.go
Expand Up @@ -130,7 +130,7 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
overlayService,
nil, // TODO add noop version
ecRepairer,
placement.CreateFilters,
placement,
config.Checker.RepairOverrides,
config.Repairer,
)
Expand Down
5 changes: 1 addition & 4 deletions satellite/metabase/rangedloop/service_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"go.uber.org/zap/zaptest"

"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
Expand Down Expand Up @@ -420,9 +419,7 @@ func TestAllInOne(t *testing.T) {
log.Named("repair:checker"),
satellite.DB.RepairQueue(),
satellite.Overlay.Service,
func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) {
return nodeselection.AnyFilter{}
},
nodeselection.TestPlacementDefinitions(),
satellite.Config.Checker,
),
})
Expand Down
55 changes: 55 additions & 0 deletions satellite/nodeselection/invariant.go
@@ -0,0 +1,55 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package nodeselection

import (
"storj.io/storj/private/intset"
"storj.io/storj/satellite/metabase"
)

// Invariant checks the current placement, and identifies the pieces which should be moved.
// Used by repair jobs.
type Invariant func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set

// AllGood is an invariant, which accepts all piece sets as good.
func AllGood() Invariant {
return func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set {
return intset.NewSet(0)
}
}

// ClumpingByAttribute allows only one selected piece by attribute groups.
func ClumpingByAttribute(attr NodeAttribute, maxAllowed int) Invariant {
return func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set {
usedGroups := make(map[string]int, len(pieces))

maxPieceNum := 0
for _, piece := range pieces {
if int(piece.Number) > maxPieceNum {
maxPieceNum = int(piece.Number)
}
}
maxPieceNum++

res := intset.NewSet(maxPieceNum)

for index, nodeRecord := range nodes {
attribute := attr(nodeRecord)
if attribute == "" {
continue
}
pieceNum := pieces[index].Number
count := usedGroups[attribute]
if count >= maxAllowed {
// this group was already seen, enough times
res.Include(int(pieceNum))
} else {
// add to the list of seen groups
usedGroups[attribute] = count + 1
}
}

return res
}
}
5 changes: 5 additions & 0 deletions satellite/nodeselection/placement.go
Expand Up @@ -25,6 +25,8 @@ type Placement struct {
NodeFilter NodeFilter
// selector is the method how the nodes are selected from the full node space (eg. pick a subnet first, and pick a node from the subnet)
Selector NodeSelectorInit
// checked by repair job, applied to the full selection. Out of placement items will be replaced by new, selected by the Selector.
Invariant Invariant
}

// Match implements NodeFilter.
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestPlacementDefinitions() PlacementDefinitions {
ID: storj.DefaultPlacement,
NodeFilter: AnyFilter{},
Selector: AttributeGroupSelector(LastNetAttribute),
Invariant: ClumpingByAttribute(LastNetAttribute, 1),
},
}
}
Expand Down Expand Up @@ -173,6 +176,7 @@ func (d PlacementDefinitions) AddPlacementRule(id storj.PlacementConstraint, fil
placement := Placement{
NodeFilter: filter,
Selector: AttributeGroupSelector(LastNetAttribute),
Invariant: ClumpingByAttribute(LastNetAttribute, 1),
}
if GetAnnotation(filter, AutoExcludeSubnet) == AutoExcludeSubnetOFF {
placement.Selector = RandomSelector()
Expand Down Expand Up @@ -292,6 +296,7 @@ func (d PlacementDefinitions) AddPlacementFromString(definitions string) error {

if GetAnnotation(placement.NodeFilter, AutoExcludeSubnet) != AutoExcludeSubnetOFF {
placement.Selector = AttributeGroupSelector(LastNetAttribute)
placement.Invariant = ClumpingByAttribute(LastNetAttribute, 1)
} else {
placement.Selector = RandomSelector()
}
Expand Down
1 change: 1 addition & 0 deletions satellite/overlay/config.go
Expand Up @@ -88,6 +88,7 @@ func (c NodeSelectionConfig) CreateDefaultPlacement() (nodeselection.Placement,
placement := nodeselection.Placement{
NodeFilter: nodeselection.AnyFilter{},
Selector: nodeselection.UnvettedSelector(c.NewNodeFraction, nodeselection.AttributeGroupSelector(nodeselection.LastNetAttribute)),
Invariant: nodeselection.ClumpingByAttribute(nodeselection.LastNetAttribute, 1),
}
if len(c.UploadExcludedCountryCodes) > 0 {
countryFilter, err := nodeselection.NewCountryFilterFromString(c.UploadExcludedCountryCodes)
Expand Down
2 changes: 1 addition & 1 deletion satellite/rangedloop.go
Expand Up @@ -177,7 +177,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
peer.Log.Named("repair:checker"),
peer.DB.RepairQueue(),
peer.Overlay.Service,
placement.CreateFilters,
placement,
config.Checker,
)
}
Expand Down
12 changes: 6 additions & 6 deletions satellite/repair/checker/observer.go
Expand Up @@ -45,7 +45,7 @@ type Observer struct {
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placementRules nodeselection.PlacementRules
placements nodeselection.PlacementDefinitions

// the following are reset on each iteration
startTime time.Time
Expand All @@ -56,7 +56,7 @@ type Observer struct {
}

// NewObserver creates new checker observer instance.
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules nodeselection.PlacementRules, config Config) *Observer {
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placements nodeselection.PlacementDefinitions, config Config) *Observer {
excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range config.RepairExcludedCountryCodes {
if cc := location.ToCountryCode(countryCode); cc != location.None {
Expand All @@ -76,7 +76,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placementRules: placementRules,
placements: placements,
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ type observerFork struct {
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placementRules nodeselection.PlacementRules
placements nodeselection.PlacementDefinitions

getObserverStats func(storj.RedundancyScheme) *observerRSStats
}
Expand All @@ -287,7 +287,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial {
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placementRules: observer.placementRules,
placements: observer.placements,
getObserverStats: observer.getObserverStats,
}
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("error getting node information for pieces: %w", err)
}
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck,
fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs)
fork.doDeclumping, fork.placements[segment.Placement], fork.nodeIDs)

numHealthy := piecesCheck.Healthy.Count()
segmentTotalCountIntVal.Observe(int64(len(pieces)))
Expand Down
2 changes: 1 addition & 1 deletion satellite/repair/checker/observer_test.go
Expand Up @@ -557,7 +557,7 @@ func BenchmarkRemoteSegment(b *testing.B) {
}

observer := checker.NewObserver(zap.NewNop(), planet.Satellites[0].DB.RepairQueue(),
planet.Satellites[0].Auditor.Overlay, nodeselection.TestPlacementDefinitionsWithFraction(0.05).CreateFilters, planet.Satellites[0].Config.Checker)
planet.Satellites[0].Auditor.Overlay, nodeselection.TestPlacementDefinitionsWithFraction(0.05), planet.Satellites[0].Config.Checker)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)

Expand Down
8 changes: 3 additions & 5 deletions satellite/repair/checker/observer_unit_test.go
Expand Up @@ -52,9 +52,7 @@ func TestObserverForkProcess(t *testing.T) {
nodesCache: &ReliabilityCache{
staleness: time.Hour,
},
placementRules: func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) {
return nodeselection.AnyFilter{}
},
placements: nodeselection.TestPlacementDefinitions(),
}

o.nodesCache.state.Store(&reliabilityState{
Expand All @@ -73,7 +71,7 @@ func TestObserverForkProcess(t *testing.T) {
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
doDeclumping: o.doDeclumping,
doPlacementCheck: o.doPlacementCheck,
placementRules: o.placementRules,
placements: o.placements,
getNodesEstimate: o.getNodesEstimate,
nodesCache: o.nodesCache,
repairQueue: queue.NewInsertBuffer(q, 1000),
Expand Down Expand Up @@ -148,7 +146,7 @@ func TestObserverForkProcess(t *testing.T) {
require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF)))
parsed, err := placements.Parse(nil)
require.NoError(t, err)
o.placementRules = parsed.CreateFilters
o.placements = parsed

q := queue.MockRepairQueue{}
fork := createFork(o, &q)
Expand Down
36 changes: 4 additions & 32 deletions satellite/repair/classification.go
Expand Up @@ -60,7 +60,7 @@ type PiecesCheckResult struct {
// represented by a PiecesCheckResult. Pieces may be put into multiple
// categories.
func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.SelectedNode, excludedCountryCodes map[location.CountryCode]struct{},
doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) {
doPlacementCheck, doDeclumping bool, placement nodeselection.Placement, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) {
result.ExcludeNodeIDs = excludeNodeIDs

maxPieceNum := 0
Expand Down Expand Up @@ -105,36 +105,8 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
}
}

if doDeclumping && nodeselection.GetAnnotation(filter, nodeselection.AutoExcludeSubnet) != nodeselection.AutoExcludeSubnetOFF {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.

lastNets := make(map[string]struct{}, len(pieces))
result.Clumped = intset.NewSet(maxPieceNum)

collectClumpedPieces := func(onlineness bool) {
for index, nodeRecord := range nodes {
if nodeRecord.Online != onlineness {
continue
}
if nodeRecord.LastNet == "" {
continue
}
pieceNum := pieces[index].Number
_, ok := lastNets[nodeRecord.LastNet]
if ok {
// this LastNet was already seen
result.Clumped.Include(int(pieceNum))
} else {
// add to the list of seen LastNets
lastNets[nodeRecord.LastNet] = struct{}{}
}
}
}
// go over online nodes first, so that if we have to remove clumped pieces, we prefer
// to remove offline ones over online ones.
collectClumpedPieces(true)
collectClumpedPieces(false)
if doDeclumping && placement.Invariant != nil {
result.Clumped = placement.Invariant(pieces, nodes)
}

if doPlacementCheck {
Expand All @@ -145,7 +117,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
if nodeRecord.ID.IsZero() {
continue
}
if filter.Match(&nodeRecord) {
if placement.NodeFilter.Match(&nodeRecord) {
continue
}
pieceNum := pieces[index].Number
Expand Down
10 changes: 5 additions & 5 deletions satellite/repair/classification_test.go
Expand Up @@ -37,7 +37,7 @@ func TestClassifySegmentPieces(t *testing.T) {
})

pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, nodeselection.AnyFilter{}, piecesToNodeIDs(pieces))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, nodeselection.TestPlacementDefinitions()[0], piecesToNodeIDs(pieces))

require.Equal(t, 0, result.Missing.Count())
require.Equal(t, 0, result.Clumped.Count())
Expand All @@ -63,7 +63,7 @@ func TestClassifySegmentPieces(t *testing.T) {
require.NoError(t, err)

pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c[10], piecesToNodeIDs(pieces))

require.Equal(t, 0, result.Missing.Count())
require.Equal(t, 0, result.Clumped.Count())
Expand All @@ -86,7 +86,7 @@ func TestClassifySegmentPieces(t *testing.T) {
require.NoError(t, err)

pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c[10], piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 5, result.Missing.Count())
Expand All @@ -109,7 +109,7 @@ func TestClassifySegmentPieces(t *testing.T) {

// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0), piecesToNodeIDs(pieces))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c[0], piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 2, result.Missing.Count())
Expand All @@ -136,7 +136,7 @@ func TestClassifySegmentPieces(t *testing.T) {

// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10), piecesToNodeIDs(pieces))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c[10], piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 2, result.Missing.Count())
Expand Down
8 changes: 4 additions & 4 deletions satellite/repair/repairer/segments.go
Expand Up @@ -105,7 +105,7 @@ type SegmentRepairer struct {
nowFn func() time.Time
OnTestingCheckSegmentAlteredHook func()
OnTestingPiecesReportHook func(pieces FetchResultReport)
placementRules nodeselection.PlacementRules
placements nodeselection.PlacementDefinitions
}

// NewSegmentRepairer creates a new instance of SegmentRepairer.
Expand All @@ -120,7 +120,7 @@ func NewSegmentRepairer(
overlay *overlay.Service,
reporter audit.Reporter,
ecRepairer *ECRepairer,
placementRules nodeselection.PlacementRules,
placements nodeselection.PlacementDefinitions,
repairOverrides checker.RepairOverrides,
config Config,
) *SegmentRepairer {
Expand Down Expand Up @@ -152,7 +152,7 @@ func NewSegmentRepairer(
reputationUpdateEnabled: config.ReputationUpdateEnabled,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placementRules: placementRules,
placements: placements,

nowFn: time.Now,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, overlayQueryError.New("GetNodes returned an invalid result")
}
pieces := segment.Pieces
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement), allNodeIDs)
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placements[segment.Placement], allNodeIDs)

// irreparable segment
if piecesCheck.Retrievable.Count() < int(segment.Redundancy.RequiredShares) {
Expand Down
2 changes: 1 addition & 1 deletion satellite/repairer.go
Expand Up @@ -235,7 +235,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
peer.Overlay,
peer.Audit.Reporter,
peer.EcRepairer,
placement.CreateFilters,
placement,
config.Checker.RepairOverrides,
config.Repairer,
)
Expand Down

0 comments on commit b41cf66

Please sign in to comment.