diff --git a/cmd/satellite/repair_segment.go b/cmd/satellite/repair_segment.go index fcb7f039365b..5f1f0fc9226f 100644 --- a/cmd/satellite/repair_segment.go +++ b/cmd/satellite/repair_segment.go @@ -130,7 +130,7 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) { overlayService, nil, // TODO add noop version ecRepairer, - placement.CreateFilters, + placement, config.Checker.RepairOverrides, config.Repairer, ) diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index 49d1e5326f98..0cc6041f8019 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -18,7 +18,6 @@ import ( "go.uber.org/zap/zaptest" "storj.io/common/memory" - "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" @@ -420,9 +419,7 @@ func TestAllInOne(t *testing.T) { log.Named("repair:checker"), satellite.DB.RepairQueue(), satellite.Overlay.Service, - func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) { - return nodeselection.AnyFilter{} - }, + nodeselection.TestPlacementDefinitions(), satellite.Config.Checker, ), }) diff --git a/satellite/nodeselection/invariant.go b/satellite/nodeselection/invariant.go new file mode 100644 index 000000000000..caba4aca9368 --- /dev/null +++ b/satellite/nodeselection/invariant.go @@ -0,0 +1,55 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodeselection + +import ( + "storj.io/storj/private/intset" + "storj.io/storj/satellite/metabase" +) + +// Invariant checks the current placement, and identifies the pieces which should be moved. +// Used by repair jobs. +type Invariant func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set + +// AllGood is an invariant, which accepts all piece sets as good. +func AllGood() Invariant { + return func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set { + return intset.NewSet(0) + } +} + +// ClumpingByAttribute allows only one selected piece by attribute groups. +func ClumpingByAttribute(attr NodeAttribute, maxAllowed int) Invariant { + return func(pieces metabase.Pieces, nodes []SelectedNode) intset.Set { + usedGroups := make(map[string]int, len(pieces)) + + maxPieceNum := 0 + for _, piece := range pieces { + if int(piece.Number) > maxPieceNum { + maxPieceNum = int(piece.Number) + } + } + maxPieceNum++ + + res := intset.NewSet(maxPieceNum) + + for index, nodeRecord := range nodes { + attribute := attr(nodeRecord) + if attribute == "" { + continue + } + pieceNum := pieces[index].Number + count := usedGroups[attribute] + if count >= maxAllowed { + // this group was already seen, enough times + res.Include(int(pieceNum)) + } else { + // add to the list of seen groups + usedGroups[attribute] = count + 1 + } + } + + return res + } +} diff --git a/satellite/nodeselection/placement.go b/satellite/nodeselection/placement.go index 795e18c2bddb..9027e9a5d3cd 100644 --- a/satellite/nodeselection/placement.go +++ b/satellite/nodeselection/placement.go @@ -25,6 +25,8 @@ type Placement struct { NodeFilter NodeFilter // selector is the method how the nodes are selected from the full node space (eg. pick a subnet first, and pick a node from the subnet) Selector NodeSelectorInit + // checked by repair job, applied to the full selection. Out of placement items will be replaced by new, selected by the Selector. + Invariant Invariant } // Match implements NodeFilter. @@ -120,6 +122,7 @@ func TestPlacementDefinitions() PlacementDefinitions { ID: storj.DefaultPlacement, NodeFilter: AnyFilter{}, Selector: AttributeGroupSelector(LastNetAttribute), + Invariant: ClumpingByAttribute(LastNetAttribute, 1), }, } } @@ -173,6 +176,7 @@ func (d PlacementDefinitions) AddPlacementRule(id storj.PlacementConstraint, fil placement := Placement{ NodeFilter: filter, Selector: AttributeGroupSelector(LastNetAttribute), + Invariant: ClumpingByAttribute(LastNetAttribute, 1), } if GetAnnotation(filter, AutoExcludeSubnet) == AutoExcludeSubnetOFF { placement.Selector = RandomSelector() @@ -292,6 +296,7 @@ func (d PlacementDefinitions) AddPlacementFromString(definitions string) error { if GetAnnotation(placement.NodeFilter, AutoExcludeSubnet) != AutoExcludeSubnetOFF { placement.Selector = AttributeGroupSelector(LastNetAttribute) + placement.Invariant = ClumpingByAttribute(LastNetAttribute, 1) } else { placement.Selector = RandomSelector() } diff --git a/satellite/overlay/config.go b/satellite/overlay/config.go index f2a2812eb688..62b7ea3dfbb4 100644 --- a/satellite/overlay/config.go +++ b/satellite/overlay/config.go @@ -88,6 +88,7 @@ func (c NodeSelectionConfig) CreateDefaultPlacement() (nodeselection.Placement, placement := nodeselection.Placement{ NodeFilter: nodeselection.AnyFilter{}, Selector: nodeselection.UnvettedSelector(c.NewNodeFraction, nodeselection.AttributeGroupSelector(nodeselection.LastNetAttribute)), + Invariant: nodeselection.ClumpingByAttribute(nodeselection.LastNetAttribute, 1), } if len(c.UploadExcludedCountryCodes) > 0 { countryFilter, err := nodeselection.NewCountryFilterFromString(c.UploadExcludedCountryCodes) diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index 2c9a841db61a..12d604498d8b 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -177,7 +177,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf peer.Log.Named("repair:checker"), peer.DB.RepairQueue(), peer.Overlay.Service, - placement.CreateFilters, + placement, config.Checker, ) } diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index 95b23e785794..0f173a984af1 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -45,7 +45,7 @@ type Observer struct { excludedCountryCodes map[location.CountryCode]struct{} doDeclumping bool doPlacementCheck bool - placementRules nodeselection.PlacementRules + placements nodeselection.PlacementDefinitions // the following are reset on each iteration startTime time.Time @@ -56,7 +56,7 @@ type Observer struct { } // NewObserver creates new checker observer instance. -func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules nodeselection.PlacementRules, config Config) *Observer { +func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placements nodeselection.PlacementDefinitions, config Config) *Observer { excludedCountryCodes := make(map[location.CountryCode]struct{}) for _, countryCode := range config.RepairExcludedCountryCodes { if cc := location.ToCountryCode(countryCode); cc != location.None { @@ -76,7 +76,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove excludedCountryCodes: excludedCountryCodes, doDeclumping: config.DoDeclumping, doPlacementCheck: config.DoPlacementCheck, - placementRules: placementRules, + placements: placements, statsCollector: make(map[storj.RedundancyScheme]*observerRSStats), } } @@ -267,7 +267,7 @@ type observerFork struct { excludedCountryCodes map[location.CountryCode]struct{} doDeclumping bool doPlacementCheck bool - placementRules nodeselection.PlacementRules + placements nodeselection.PlacementDefinitions getObserverStats func(storj.RedundancyScheme) *observerRSStats } @@ -287,7 +287,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial { excludedCountryCodes: observer.excludedCountryCodes, doDeclumping: observer.doDeclumping, doPlacementCheck: observer.doPlacementCheck, - placementRules: observer.placementRules, + placements: observer.placements, getObserverStats: observer.getObserverStats, } } @@ -389,7 +389,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme return Error.New("error getting node information for pieces: %w", err) } piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck, - fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs) + fork.doDeclumping, fork.placements[segment.Placement], fork.nodeIDs) numHealthy := piecesCheck.Healthy.Count() segmentTotalCountIntVal.Observe(int64(len(pieces))) diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index 368b011ea0c6..25bb3936dc39 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -557,7 +557,7 @@ func BenchmarkRemoteSegment(b *testing.B) { } observer := checker.NewObserver(zap.NewNop(), planet.Satellites[0].DB.RepairQueue(), - planet.Satellites[0].Auditor.Overlay, nodeselection.TestPlacementDefinitionsWithFraction(0.05).CreateFilters, planet.Satellites[0].Config.Checker) + planet.Satellites[0].Auditor.Overlay, nodeselection.TestPlacementDefinitionsWithFraction(0.05), planet.Satellites[0].Config.Checker) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) diff --git a/satellite/repair/checker/observer_unit_test.go b/satellite/repair/checker/observer_unit_test.go index ac4be7bf1fdd..3ecedbbef45c 100644 --- a/satellite/repair/checker/observer_unit_test.go +++ b/satellite/repair/checker/observer_unit_test.go @@ -52,9 +52,7 @@ func TestObserverForkProcess(t *testing.T) { nodesCache: &ReliabilityCache{ staleness: time.Hour, }, - placementRules: func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) { - return nodeselection.AnyFilter{} - }, + placements: nodeselection.TestPlacementDefinitions(), } o.nodesCache.state.Store(&reliabilityState{ @@ -73,7 +71,7 @@ func TestObserverForkProcess(t *testing.T) { rsStats: make(map[storj.RedundancyScheme]*partialRSStats), doDeclumping: o.doDeclumping, doPlacementCheck: o.doPlacementCheck, - placementRules: o.placementRules, + placements: o.placements, getNodesEstimate: o.getNodesEstimate, nodesCache: o.nodesCache, repairQueue: queue.NewInsertBuffer(q, 1000), @@ -148,7 +146,7 @@ func TestObserverForkProcess(t *testing.T) { require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF))) parsed, err := placements.Parse(nil) require.NoError(t, err) - o.placementRules = parsed.CreateFilters + o.placements = parsed q := queue.MockRepairQueue{} fork := createFork(o, &q) diff --git a/satellite/repair/classification.go b/satellite/repair/classification.go index f9a562069012..9102d0971132 100644 --- a/satellite/repair/classification.go +++ b/satellite/repair/classification.go @@ -60,7 +60,7 @@ type PiecesCheckResult struct { // represented by a PiecesCheckResult. Pieces may be put into multiple // categories. func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.SelectedNode, excludedCountryCodes map[location.CountryCode]struct{}, - doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) { + doPlacementCheck, doDeclumping bool, placement nodeselection.Placement, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) { result.ExcludeNodeIDs = excludeNodeIDs maxPieceNum := 0 @@ -105,36 +105,8 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte } } - if doDeclumping && nodeselection.GetAnnotation(filter, nodeselection.AutoExcludeSubnet) != nodeselection.AutoExcludeSubnetOFF { - // if multiple pieces are on the same last_net, keep only the first one. The rest are - // to be considered retrievable but unhealthy. - - lastNets := make(map[string]struct{}, len(pieces)) - result.Clumped = intset.NewSet(maxPieceNum) - - collectClumpedPieces := func(onlineness bool) { - for index, nodeRecord := range nodes { - if nodeRecord.Online != onlineness { - continue - } - if nodeRecord.LastNet == "" { - continue - } - pieceNum := pieces[index].Number - _, ok := lastNets[nodeRecord.LastNet] - if ok { - // this LastNet was already seen - result.Clumped.Include(int(pieceNum)) - } else { - // add to the list of seen LastNets - lastNets[nodeRecord.LastNet] = struct{}{} - } - } - } - // go over online nodes first, so that if we have to remove clumped pieces, we prefer - // to remove offline ones over online ones. - collectClumpedPieces(true) - collectClumpedPieces(false) + if doDeclumping && placement.Invariant != nil { + result.Clumped = placement.Invariant(pieces, nodes) } if doPlacementCheck { @@ -145,7 +117,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte if nodeRecord.ID.IsZero() { continue } - if filter.Match(&nodeRecord) { + if placement.NodeFilter.Match(&nodeRecord) { continue } pieceNum := pieces[index].Number diff --git a/satellite/repair/classification_test.go b/satellite/repair/classification_test.go index de72a1e81ac3..38ccc5cc4e63 100644 --- a/satellite/repair/classification_test.go +++ b/satellite/repair/classification_test.go @@ -37,7 +37,7 @@ func TestClassifySegmentPieces(t *testing.T) { }) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4) - result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, nodeselection.AnyFilter{}, piecesToNodeIDs(pieces)) + result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, nodeselection.TestPlacementDefinitions()[0], piecesToNodeIDs(pieces)) require.Equal(t, 0, result.Missing.Count()) require.Equal(t, 0, result.Clumped.Count()) @@ -63,7 +63,7 @@ func TestClassifySegmentPieces(t *testing.T) { require.NoError(t, err) pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8) - result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces)) + result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c[10], piecesToNodeIDs(pieces)) require.Equal(t, 0, result.Missing.Count()) require.Equal(t, 0, result.Clumped.Count()) @@ -86,7 +86,7 @@ func TestClassifySegmentPieces(t *testing.T) { require.NoError(t, err) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) - result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces)) + result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c[10], piecesToNodeIDs(pieces)) // offline nodes require.Equal(t, 5, result.Missing.Count()) @@ -109,7 +109,7 @@ func TestClassifySegmentPieces(t *testing.T) { // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) - result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0), piecesToNodeIDs(pieces)) + result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c[0], piecesToNodeIDs(pieces)) // offline nodes require.Equal(t, 2, result.Missing.Count()) @@ -136,7 +136,7 @@ func TestClassifySegmentPieces(t *testing.T) { // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) - result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10), piecesToNodeIDs(pieces)) + result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c[10], piecesToNodeIDs(pieces)) // offline nodes require.Equal(t, 2, result.Missing.Count()) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 2f8272e12ce6..739b7b8e6d5b 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -105,7 +105,7 @@ type SegmentRepairer struct { nowFn func() time.Time OnTestingCheckSegmentAlteredHook func() OnTestingPiecesReportHook func(pieces FetchResultReport) - placementRules nodeselection.PlacementRules + placements nodeselection.PlacementDefinitions } // NewSegmentRepairer creates a new instance of SegmentRepairer. @@ -120,7 +120,7 @@ func NewSegmentRepairer( overlay *overlay.Service, reporter audit.Reporter, ecRepairer *ECRepairer, - placementRules nodeselection.PlacementRules, + placements nodeselection.PlacementDefinitions, repairOverrides checker.RepairOverrides, config Config, ) *SegmentRepairer { @@ -152,7 +152,7 @@ func NewSegmentRepairer( reputationUpdateEnabled: config.ReputationUpdateEnabled, doDeclumping: config.DoDeclumping, doPlacementCheck: config.DoPlacementCheck, - placementRules: placementRules, + placements: placements, nowFn: time.Now, } @@ -224,7 +224,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, overlayQueryError.New("GetNodes returned an invalid result") } pieces := segment.Pieces - piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement), allNodeIDs) + piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placements[segment.Placement], allNodeIDs) // irreparable segment if piecesCheck.Retrievable.Count() < int(segment.Redundancy.RequiredShares) { diff --git a/satellite/repairer.go b/satellite/repairer.go index 695c454f0b3d..941a5ad01569 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -235,7 +235,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, peer.Overlay, peer.Audit.Reporter, peer.EcRepairer, - placement.CreateFilters, + placement, config.Checker.RepairOverrides, config.Repairer, )