Skip to content

Commit 54c5909

Browse files
committed
satellite/repair: use placement level repair threshold
Change-Id: Iec3135203d2e918f7ce37416caa336c61d63447e
1 parent 84ec4ac commit 54c5909

File tree

5 files changed

+344
-66
lines changed

5 files changed

+344
-66
lines changed

satellite/repair/checker/observer.go

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ func (observer *Observer) getObserverStats(redundancy redundancyStyle) *observer
251251

252252
observerStats, exists := observer.statsCollector[redundancy]
253253
if !exists {
254-
rsString := getRSString(loadRedundancy(redundancy.Scheme, observer.repairThresholdOverrides, observer.repairTargetOverrides))
254+
adjustedRedundancy := AdjustRedundancy(redundancy.Scheme, observer.repairThresholdOverrides, observer.repairTargetOverrides, observer.placements[redundancy.Placement])
255+
rsString := fmt.Sprintf("%d/%d/%d/%d", adjustedRedundancy.RequiredShares, adjustedRedundancy.RepairShares, adjustedRedundancy.OptimalShares, adjustedRedundancy.TotalShares)
255256
observerStats = &observerRSStats{aggregateStats{}, newIterationRSStats(rsString), newSegmentRSStats(rsString, redundancy.Placement)}
256257
mon.Chain(observerStats)
257258
observer.statsCollector[redundancy] = observerStats
@@ -260,33 +261,6 @@ func (observer *Observer) getObserverStats(redundancy redundancyStyle) *observer
260261
return observerStats
261262
}
262263

263-
func loadRedundancy(redundancy storj.RedundancyScheme, repairThresholdOverrides RepairThresholdOverrides, repairTargetOverrides RepairTargetOverrides) (int, int, int, int) {
264-
repair := int(redundancy.RepairShares)
265-
optimal := int(redundancy.OptimalShares)
266-
total := int(redundancy.TotalShares)
267-
268-
if overrideValue := repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
269-
repair = int(overrideValue)
270-
}
271-
272-
if overrideValue := repairTargetOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
273-
optimal = int(overrideValue)
274-
}
275-
276-
if optimal <= repair {
277-
// if a segment has exactly repair segments, we consider it in need of
278-
// repair. we don't want to upload a new object right into the state of
279-
// needing repair, so we need at least one more, though arguably this is
280-
// a misconfiguration.
281-
optimal = repair + 1
282-
}
283-
if total < optimal {
284-
total = optimal
285-
}
286-
287-
return int(redundancy.RequiredShares), repair, optimal, total
288-
}
289-
290264
// RefreshReliabilityCache forces refreshing node online status cache.
291265
func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {
292266
return observer.nodesCache.Refresh(ctx)
@@ -505,8 +479,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
505479
stats.segmentStats.yearOldSegmentPiecesLostPerWeek.Observe(piecesLostPerWeek)
506480
}
507481

508-
required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairThresholdOverrides, fork.repairTargetOverrides)
509-
segmentHealth := fork.health.Calculate(ctx, numHealthy, required, piecesCheck.ForcingRepair.Count())
482+
adjustedRedundancy := AdjustRedundancy(segment.Redundancy, fork.repairThresholdOverrides, fork.repairTargetOverrides, fork.placements[segment.Placement])
483+
segmentHealth := fork.health.Calculate(ctx, numHealthy, int(adjustedRedundancy.RequiredShares), piecesCheck.ForcingRepair.Count())
510484
segmentHealthFloatVal.Observe(segmentHealth)
511485
stats.segmentStats.segmentHealth.Observe(segmentHealth)
512486

@@ -515,7 +489,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
515489
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
516490
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
517491
// into queue right away.
518-
repairDueToHealth := numHealthy <= repairThreshold && numHealthy < successThreshold
492+
repairDueToHealth := numHealthy <= int(adjustedRedundancy.RepairShares) && numHealthy < int(adjustedRedundancy.OptimalShares)
519493
repairDueToForcing := piecesCheck.ForcingRepair.Count() > 0
520494
if repairDueToHealth || repairDueToForcing {
521495

@@ -549,11 +523,11 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
549523
return nil
550524
}
551525

552-
log := log.With(zap.Int("Repair Threshold", repairThreshold), zap.Int("Success Threshold", successThreshold),
553-
zap.Int("Total Pieces", len(pieces)), zap.Int("Min Required", required))
526+
log := log.With(zap.Int16("Repair Threshold", adjustedRedundancy.RepairShares), zap.Int16("Success Threshold", adjustedRedundancy.OptimalShares),
527+
zap.Int("Total Pieces", len(pieces)), zap.Int16("Min Required", adjustedRedundancy.RequiredShares))
554528

555529
switch {
556-
case piecesCheck.Retrievable.Count() < required:
530+
case piecesCheck.Retrievable.Count() < int(adjustedRedundancy.RequiredShares):
557531
// monitor irreparable segments
558532
if !slices.Contains(fork.totalStats[segment.Placement].objectsLost, segment.StreamID) {
559533
fork.totalStats[segment.Placement].objectsLost = append(
@@ -583,7 +557,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
583557
log.Warn("checker found irreparable segment",
584558
zap.String("Unavailable Node IDs", strings.Join(missingNodes, ",")))
585559

586-
case piecesCheck.Clumped.Count() > 0 && piecesCheck.Healthy.Count()+piecesCheck.Clumped.Count() > repairThreshold &&
560+
case piecesCheck.Clumped.Count() > 0 && piecesCheck.Healthy.Count()+piecesCheck.Clumped.Count() > int(adjustedRedundancy.RepairShares) &&
587561
piecesCheck.ForcingRepair.Count() == 0:
588562

589563
// This segment is to be repaired because of clumping (it wouldn't need repair yet
@@ -603,24 +577,24 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
603577
return nil
604578
}
605579

606-
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(
580+
if numHealthy > int(adjustedRedundancy.RepairShares) && numHealthy <= (int(adjustedRedundancy.RepairShares)+len(
607581
fork.totalStats[segment.Placement].remoteSegmentsOverThreshold,
608582
)) {
609583
// record metrics for segments right above repair threshold
610584
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
611585
for i := range fork.totalStats[segment.Placement].remoteSegmentsOverThreshold {
612-
if numHealthy == (repairThreshold + i + 1) {
586+
if numHealthy == (int(adjustedRedundancy.RepairShares) + i + 1) {
613587
fork.totalStats[segment.Placement].remoteSegmentsOverThreshold[i]++
614588
break
615589
}
616590
}
617591
}
618592

619-
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(stats.iterationAggregates.remoteSegmentsOverThreshold)) {
593+
if numHealthy > int(adjustedRedundancy.RepairShares) && numHealthy <= (int(adjustedRedundancy.RepairShares)+len(stats.iterationAggregates.remoteSegmentsOverThreshold)) {
620594
// record metrics for segments right above repair threshold
621595
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
622596
for i := range stats.iterationAggregates.remoteSegmentsOverThreshold {
623-
if numHealthy == (repairThreshold + i + 1) {
597+
if numHealthy == (int(adjustedRedundancy.RepairShares) + i + 1) {
624598
stats.iterationAggregates.remoteSegmentsOverThreshold[i]++
625599
break
626600
}

satellite/repair/checker/observerstats.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package checker
55

66
import (
7-
"fmt"
87
"strconv"
98

109
"github.com/spacemonkeygo/monkit/v3"
@@ -229,7 +228,3 @@ func (a *aggregateStats) combine(stats aggregateStats) {
229228
a.remoteSegmentsOverThreshold[3] += stats.remoteSegmentsOverThreshold[3]
230229
a.remoteSegmentsOverThreshold[4] += stats.remoteSegmentsOverThreshold[4]
231230
}
232-
233-
func getRSString(min, repair, success, total int) string {
234-
return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total)
235-
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (C) 2025 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
package checker
5+
6+
import (
7+
"storj.io/common/storj"
8+
"storj.io/storj/satellite/nodeselection"
9+
)
10+
11+
// AdjustRedundancy modifies the redundancy scheme based on repair threshold and target overrides.
12+
func AdjustRedundancy(redundancy storj.RedundancyScheme, repairThresholdOverrides RepairThresholdOverrides, repairTargetOverrides RepairTargetOverrides, placement nodeselection.Placement) storj.RedundancyScheme {
13+
repair := int(redundancy.RepairShares)
14+
optimal := int(redundancy.OptimalShares)
15+
total := int(redundancy.TotalShares)
16+
17+
if overrideValue := repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
18+
repair = int(overrideValue)
19+
}
20+
21+
if overrideValue := repairTargetOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
22+
optimal = int(overrideValue)
23+
}
24+
25+
if placement.EC.Repair != nil {
26+
if r := placement.EC.Repair(int(redundancy.RequiredShares)); r > 0 {
27+
repair = r
28+
}
29+
}
30+
31+
if optimal <= repair {
32+
// if a segment has exactly repair segments, we consider it in need of
33+
// repair. we don't want to upload a new object right into the state of
34+
// needing repair, so we need at least one more, though arguably this is
35+
// a misconfiguration.
36+
optimal = repair + 1
37+
}
38+
if total < optimal {
39+
total = optimal
40+
}
41+
42+
return storj.RedundancyScheme{
43+
Algorithm: redundancy.Algorithm,
44+
ShareSize: redundancy.ShareSize,
45+
RequiredShares: redundancy.RequiredShares,
46+
RepairShares: int16(repair),
47+
OptimalShares: int16(optimal),
48+
TotalShares: int16(total),
49+
}
50+
}

0 commit comments

Comments
 (0)