Skip to content

Commit

Permalink
Revert "satellite/repair/checker: support overriding optimal shares"
Browse files Browse the repository at this point in the history
This reverts commit 5dc2bc4.
  • Loading branch information
jtolio committed Jan 25, 2024
1 parent f07c0a6 commit 4342f2e
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 180 deletions.
3 changes: 1 addition & 2 deletions cmd/satellite/repair_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
nil, // TODO add noop version
ecRepairer,
placement,
config.Checker.RepairThresholdOverrides,
config.Checker.RepairTargetOverrides,
config.Checker.RepairOverrides,
config.Repairer,
)

Expand Down
143 changes: 109 additions & 34 deletions satellite/repair/checker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ type Config struct {
Interval time.Duration `help:"how frequently checker should check for bad segments" releaseDefault:"30s" devDefault:"0h0m10s" testDefault:"$TESTINTERVAL"`

ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m" testDefault:"1m"`
RepairOverrides RepairOverrides `help:"[DEPRECATED] comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"" devDefault:""`
RepairThresholdOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"29-52" devDefault:""`
RepairTargetOverrides RepairOverrides `help:"comma-separated override values for repair success target in the format k-target" releaseDefault:"29-65" devDefault:""`
RepairOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)" releaseDefault:"29/80/110-52,29/80/95-52,29/80/130-52" devDefault:""`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
Expand All @@ -30,12 +28,82 @@ type Config struct {
DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"`
}

// RepairOverride is a configuration struct that contains an override repair
// value for a given RS k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverride struct {
Min int
Success int
Total int
Override int32
}

// Type implements pflag.Value.
func (RepairOverride) Type() string { return "checker.RepairOverride" }

// String is required for pflag.Value.
func (ro *RepairOverride) String() string {
return fmt.Sprintf("%d/%d/%d-%d",
ro.Min,
ro.Success,
ro.Total,
ro.Override)
}

// Set sets the value from a string in the format k/o/n-override (min/optimal/total-repairOverride).
func (ro *RepairOverride) Set(s string) error {
// Split on dash. Expect two items. First item is RS numbers. Second item is Override.
info := strings.Split(s, "-")
if len(info) != 2 {
return Error.New("Invalid default repair override config (expect format k/o/n-override, got %s)", s)
}
rsNumbersString := info[0]
overrideString := info[1]

// Split on forward slash. Expect exactly three positive non-decreasing integers.
rsNumbers := strings.Split(rsNumbersString, "/")
if len(rsNumbers) != 3 {
return Error.New("Invalid default RS numbers (wrong size, expect 3): %s", rsNumbersString)
}

minValue := 1
values := []int{}
for _, nextValueString := range rsNumbers {
nextValue, err := strconv.Atoi(nextValueString)
if err != nil {
return Error.New("Invalid default RS numbers (should all be valid integers): %s, %w", rsNumbersString, err)
}
if nextValue < minValue {
return Error.New("Invalid default RS numbers (should be non-decreasing): %s", rsNumbersString)
}
values = append(values, nextValue)
minValue = nextValue
}

ro.Min = values[0]
ro.Success = values[1]
ro.Total = values[2]

// Attempt to parse "-override" part of config.
override, err := strconv.Atoi(overrideString)
if err != nil {
return Error.New("Invalid override value (should be valid integer): %s, %w", overrideString, err)
}
if override < ro.Min || override >= ro.Success {
return Error.New("Invalid override value (should meet criteria min <= override < success). Min: %d, Override: %d, Success: %d.", ro.Min, override, ro.Success)
}
ro.Override = int32(override)

return nil
}

// RepairOverrides is a configuration struct that contains a list of override repair
// values for various given RS combinations of k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverrides struct {
Values map[int]int
List []RepairOverride
}

// Type implements pflag.Value.
Expand All @@ -44,58 +112,65 @@ func (RepairOverrides) Type() string { return "checker.RepairOverrides" }
// String is required for pflag.Value. It is a comma separated list of RepairOverride configs.
func (ros *RepairOverrides) String() string {
var s strings.Builder
i := 0
for k, v := range ros.Values {
for i, ro := range ros.List {
if i > 0 {
s.WriteString(",")
}
fmt.Fprintf(&s, "%d-%d", k, v)
i++
s.WriteString(ro.String())
}
return s.String()
}

// Set sets the value from a string in the format "k-override,k-override,...".
// Set sets the value from a string in the format "k/o/n-override,k/o/n-override,...".
func (ros *RepairOverrides) Set(s string) error {
ros.Values = map[int]int{}
ros.List = nil
roStrings := strings.Split(s, ",")
for _, roString := range roStrings {
roString = strings.TrimSpace(roString)
if roString == "" {
continue
}
parts := strings.Split(roString, "-")
if len(parts) != 2 {
return fmt.Errorf("invalid repair override value %q", s)
}
key, err := strconv.Atoi(strings.Split(parts[0], "/")[0]) // backwards compat
newRo := RepairOverride{}
err := newRo.Set(roString)
if err != nil {
return fmt.Errorf("invalid repair override value %q: %w", s, err)
}
if key <= 0 {
return fmt.Errorf("invalid k, must be at least 1: %d", key)
}
val, err := strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("invalid repair override value %q: %w", s, err)
}
if existingVal, exists := ros.Values[key]; exists && existingVal != val {
return fmt.Errorf("key %d defined twice with different values: %q", key, s)
return err
}
if val < key {
return fmt.Errorf("key %d defined with value type lower than min: %q", key, s)
}
ros.Values[key] = val
ros.List = append(ros.List, newRo)
}
return nil
}

// GetMap creates a RepairOverridesMap from the config.
func (ros *RepairOverrides) GetMap() RepairOverridesMap {
newMap := RepairOverridesMap{
overrideMap: make(map[string]int32),
}
for _, ro := range ros.List {
key := getRepairOverrideKey(ro.Min, ro.Success, ro.Total)
newMap.overrideMap[key] = ro.Override
}
return newMap
}

// RepairOverridesMap is derived from the RepairOverrides config, and is used for quickly retrieving
// repair override values.
type RepairOverridesMap struct {
// map of "k/o/n" -> override value
overrideMap map[string]int32
}

// GetOverrideValuePB returns the override value for a pb RS scheme if it exists, or 0 otherwise.
func (ros *RepairOverrides) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
return int32(ros.Values[int(rs.MinReq)])
func (rom *RepairOverridesMap) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
key := getRepairOverrideKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total))
return rom.overrideMap[key]
}

// GetOverrideValue returns the override value for an RS scheme if it exists, or 0 otherwise.
func (ros *RepairOverrides) GetOverrideValue(rs storj.RedundancyScheme) int32 {
return int32(ros.Values[int(rs.RequiredShares)])
func (rom *RepairOverridesMap) GetOverrideValue(rs storj.RedundancyScheme) int32 {
key := getRepairOverrideKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares))
return rom.overrideMap[key]
}

func getRepairOverrideKey(min, success, total int) string {
return fmt.Sprintf("%d/%d/%d", min, success, total)
}
123 changes: 52 additions & 71 deletions satellite/repair/checker/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,17 @@ var _ rangedloop.Partial = (*observerFork)(nil)
//
// architecture: Observer
type Observer struct {
logger *zap.Logger
repairQueue queue.RepairQueue
nodesCache *ReliabilityCache
overlayService *overlay.Service
repairThresholdOverrides RepairOverrides
repairTargetOverrides RepairOverrides
nodeFailureRate float64
repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placements nodeselection.PlacementDefinitions
logger *zap.Logger
repairQueue queue.RepairQueue
nodesCache *ReliabilityCache
overlayService *overlay.Service
repairOverrides RepairOverridesMap
nodeFailureRate float64
repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placements nodeselection.PlacementDefinitions

// the following are reset on each iteration
startTime time.Time
Expand All @@ -65,29 +64,21 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
}
}

rv := &Observer{
return &Observer{
logger: logger,

repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairThresholdOverrides: config.RepairThresholdOverrides,
repairTargetOverrides: config.RepairTargetOverrides,
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placements: placements,
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
}

if config.RepairOverrides.String() != "" {
// backwards compat
rv.repairThresholdOverrides = config.RepairOverrides
repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placements: placements,
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
}

return rv
}

// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed
Expand Down Expand Up @@ -230,7 +221,7 @@ func (observer *Observer) getObserverStats(redundancy storj.RedundancyScheme) *o

observerStats, exists := observer.statsCollector[redundancy]
if !exists {
rsString := getRSString(loadRedundancy(redundancy, observer.repairThresholdOverrides, observer.repairTargetOverrides))
rsString := getRSString(loadRedundancy(redundancy, observer.repairOverrides))
observerStats = &observerRSStats{aggregateStats{}, newIterationRSStats(rsString), newSegmentRSStats(rsString)}
mon.Chain(observerStats)
observer.statsCollector[redundancy] = observerStats
Expand All @@ -239,23 +230,15 @@ func (observer *Observer) getObserverStats(redundancy storj.RedundancyScheme) *o
return observerStats
}

func loadRedundancy(redundancy storj.RedundancyScheme, repairThresholdOverrides, repairTargetOverrides RepairOverrides) (int, int, int, int) {
func loadRedundancy(redundancy storj.RedundancyScheme, repairOverrides RepairOverridesMap) (int, int, int, int) {
repair := int(redundancy.RepairShares)
optimal := int(redundancy.OptimalShares)
total := int(redundancy.TotalShares)

if overrideValue := repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
overrideValue := repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
if overrideValue := repairTargetOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
optimal = int(overrideValue)
}

if optimal > total {
total = optimal
}

return int(redundancy.RequiredShares), repair, optimal, total
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}

// RefreshReliabilityCache forces refreshing node online status cache.
Expand All @@ -265,17 +248,16 @@ func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {

// observerFork implements the ranged loop Partial interface.
type observerFork struct {
repairQueue *queue.InsertBuffer
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[storj.RedundancyScheme]*partialRSStats
repairThresholdOverrides RepairOverrides
repairTargetOverrides RepairOverrides
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
totalStats aggregateStats
repairQueue *queue.InsertBuffer
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[storj.RedundancyScheme]*partialRSStats
repairOverrides RepairOverridesMap
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
totalStats aggregateStats

// reuse those slices to optimize memory usage
nodeIDs []storj.NodeID
Expand All @@ -294,20 +276,19 @@ type observerFork struct {
func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects.
return &observerFork{
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
repairThresholdOverrides: observer.repairThresholdOverrides,
repairTargetOverrides: observer.repairTargetOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placements: observer.placements,
getObserverStats: observer.getObserverStats,
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placements: observer.placements,
getObserverStats: observer.getObserverStats,
}
}

Expand Down Expand Up @@ -429,7 +410,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
segmentAgeIntVal.Observe(int64(segmentAge.Seconds()))
stats.segmentStats.segmentAge.Observe(int64(segmentAge.Seconds()))

required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairThresholdOverrides, fork.repairTargetOverrides)
required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairOverrides)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, fork.nodeFailureRate)
segmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.segmentHealth.Observe(segmentHealth)
Expand Down
Loading

0 comments on commit 4342f2e

Please sign in to comment.