Skip to content

Commit

Permalink
satellite/repair/checker: change override flag format
Browse files Browse the repository at this point in the history
This is initial change to support overriding optimal shares while repairing. This change simplifies flag format to specify override from
"29/80/110-52" to "29-52". We will use the same format later to specify
target repair value.

Change is backward compatible.

#6594

Change-Id: I7d057745e6f7292bf8d292dd7d0fc9c6eaf186a5
  • Loading branch information
jtolio authored and mniewrzal committed Mar 21, 2024
1 parent 11124b4 commit fd0b23a
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 200 deletions.
142 changes: 33 additions & 109 deletions satellite/repair/checker/config.go
Expand Up @@ -18,7 +18,8 @@ type Config struct {
Interval time.Duration `help:"how frequently checker should check for bad segments" releaseDefault:"30s" devDefault:"0h0m10s" testDefault:"$TESTINTERVAL"`

ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m" testDefault:"1m"`
RepairOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)" releaseDefault:"29/80/110-52,29/80/95-52,29/80/130-52" devDefault:""`
RepairOverrides RepairOverrides `help:"[DEPRECATED] comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"" devDefault:"" deprecated:"true"`
RepairThresholdOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"29-52" devDefault:""`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
Expand All @@ -28,82 +29,12 @@ type Config struct {
DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"`
}

// RepairOverride is a configuration struct that contains an override repair
// value for a given RS k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverride struct {
Min int
Success int
Total int
Override int32
}

// Type implements pflag.Value.
func (RepairOverride) Type() string { return "checker.RepairOverride" }

// String is required for pflag.Value.
func (ro *RepairOverride) String() string {
return fmt.Sprintf("%d/%d/%d-%d",
ro.Min,
ro.Success,
ro.Total,
ro.Override)
}

// Set sets the value from a string in the format k/o/n-override (min/optimal/total-repairOverride).
func (ro *RepairOverride) Set(s string) error {
// Split on dash. Expect two items. First item is RS numbers. Second item is Override.
info := strings.Split(s, "-")
if len(info) != 2 {
return Error.New("Invalid default repair override config (expect format k/o/n-override, got %s)", s)
}
rsNumbersString := info[0]
overrideString := info[1]

// Split on forward slash. Expect exactly three positive non-decreasing integers.
rsNumbers := strings.Split(rsNumbersString, "/")
if len(rsNumbers) != 3 {
return Error.New("Invalid default RS numbers (wrong size, expect 3): %s", rsNumbersString)
}

minValue := 1
values := []int{}
for _, nextValueString := range rsNumbers {
nextValue, err := strconv.Atoi(nextValueString)
if err != nil {
return Error.New("Invalid default RS numbers (should all be valid integers): %s, %w", rsNumbersString, err)
}
if nextValue < minValue {
return Error.New("Invalid default RS numbers (should be non-decreasing): %s", rsNumbersString)
}
values = append(values, nextValue)
minValue = nextValue
}

ro.Min = values[0]
ro.Success = values[1]
ro.Total = values[2]

// Attempt to parse "-override" part of config.
override, err := strconv.Atoi(overrideString)
if err != nil {
return Error.New("Invalid override value (should be valid integer): %s, %w", overrideString, err)
}
if override < ro.Min || override >= ro.Success {
return Error.New("Invalid override value (should meet criteria min <= override < success). Min: %d, Override: %d, Success: %d.", ro.Min, override, ro.Success)
}
ro.Override = int32(override)

return nil
}

// RepairOverrides is a configuration struct that contains a list of override repair
// values for various given RS combinations of k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverrides struct {
List []RepairOverride
Values map[int]int
}

// Type implements pflag.Value.
Expand All @@ -112,65 +43,58 @@ func (RepairOverrides) Type() string { return "checker.RepairOverrides" }
// String is required for pflag.Value. It is a comma separated list of RepairOverride configs.
func (ros *RepairOverrides) String() string {
var s strings.Builder
for i, ro := range ros.List {
i := 0
for k, v := range ros.Values {
if i > 0 {
s.WriteString(",")
}
s.WriteString(ro.String())
fmt.Fprintf(&s, "%d-%d", k, v)
i++
}
return s.String()
}

// Set sets the value from a string in the format "k/o/n-override,k/o/n-override,...".
// Set sets the value from a string in the format "k-override,k-override,...".
func (ros *RepairOverrides) Set(s string) error {
ros.List = nil
roStrings := strings.Split(s, ",")
ros.Values = make(map[int]int, len(roStrings))
for _, roString := range roStrings {
roString = strings.TrimSpace(roString)
if roString == "" {
continue
}
newRo := RepairOverride{}
err := newRo.Set(roString)
parts := strings.Split(roString, "-")
if len(parts) != 2 {
return fmt.Errorf("invalid repair override value %q", s)
}
key, err := strconv.Atoi(strings.Split(parts[0], "/")[0]) // backwards compat
if err != nil {
return err
return fmt.Errorf("invalid repair override value %q: %w", s, err)
}
if key <= 0 {
return fmt.Errorf("invalid k, must be at least 1: %d", key)
}
val, err := strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("invalid repair override value %q: %w", s, err)
}
if existingVal, exists := ros.Values[key]; exists && existingVal != val {
return fmt.Errorf("key %d defined twice with different values: %q", key, s)
}
ros.List = append(ros.List, newRo)
if val < key {
return fmt.Errorf("key %d defined with value lower than min: %q", key, s)
}
ros.Values[key] = val
}
return nil
}

// GetMap creates a RepairOverridesMap from the config.
func (ros *RepairOverrides) GetMap() RepairOverridesMap {
newMap := RepairOverridesMap{
overrideMap: make(map[string]int32),
}
for _, ro := range ros.List {
key := getRepairOverrideKey(ro.Min, ro.Success, ro.Total)
newMap.overrideMap[key] = ro.Override
}
return newMap
}

// RepairOverridesMap is derived from the RepairOverrides config, and is used for quickly retrieving
// repair override values.
type RepairOverridesMap struct {
// map of "k/o/n" -> override value
overrideMap map[string]int32
}

// GetOverrideValuePB returns the override value for a pb RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
key := getRepairOverrideKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total))
return rom.overrideMap[key]
func (ros *RepairOverrides) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
return int32(ros.Values[int(rs.MinReq)])
}

// GetOverrideValue returns the override value for an RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValue(rs storj.RedundancyScheme) int32 {
key := getRepairOverrideKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares))
return rom.overrideMap[key]
}

func getRepairOverrideKey(min, success, total int) string {
return fmt.Sprintf("%d/%d/%d", min, success, total)
func (ros *RepairOverrides) GetOverrideValue(rs storj.RedundancyScheme) int32 {
return int32(ros.Values[int(rs.RequiredShares)])
}
104 changes: 54 additions & 50 deletions satellite/repair/checker/observer.go
Expand Up @@ -37,17 +37,17 @@ var (
//
// architecture: Observer
type Observer struct {
logger *zap.Logger
repairQueue queue.RepairQueue
nodesCache *ReliabilityCache
overlayService *overlay.Service
repairOverrides RepairOverridesMap
nodeFailureRate float64
repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placements nodeselection.PlacementDefinitions
logger *zap.Logger
repairQueue queue.RepairQueue
nodesCache *ReliabilityCache
overlayService *overlay.Service
repairThresholdOverrides RepairOverrides
nodeFailureRate float64
repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placements nodeselection.PlacementDefinitions

// the following are reset on each iteration
startTime time.Time
Expand All @@ -66,20 +66,25 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
}
}

if config.RepairOverrides.String() != "" {
// backwards compatibility
config.RepairThresholdOverrides = config.RepairOverrides
}

return &Observer{
logger: logger,

repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placements: placements,
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairThresholdOverrides: config.RepairThresholdOverrides,
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placements: placements,
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
}
}

Expand Down Expand Up @@ -230,7 +235,7 @@ func (observer *Observer) getObserverStats(redundancy storj.RedundancyScheme) *o

observerStats, exists := observer.statsCollector[redundancy]
if !exists {
rsString := getRSString(loadRedundancy(redundancy, observer.repairOverrides))
rsString := getRSString(loadRedundancy(redundancy, observer.repairThresholdOverrides))
observerStats = &observerRSStats{aggregateStats{}, newIterationRSStats(rsString), newSegmentRSStats(rsString)}
mon.Chain(observerStats)
observer.statsCollector[redundancy] = observerStats
Expand All @@ -239,11 +244,10 @@ func (observer *Observer) getObserverStats(redundancy storj.RedundancyScheme) *o
return observerStats
}

func loadRedundancy(redundancy storj.RedundancyScheme, repairOverrides RepairOverridesMap) (int, int, int, int) {
func loadRedundancy(redundancy storj.RedundancyScheme, repairThresholdOverrides RepairOverrides) (int, int, int, int) {
repair := int(redundancy.RepairShares)

overrideValue := repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
if overrideValue := repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
repair = int(overrideValue)
}

Expand All @@ -257,16 +261,16 @@ func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {

// observerFork implements the ranged loop Partial interface.
type observerFork struct {
repairQueue *queue.InsertBuffer
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[storj.RedundancyScheme]*partialRSStats
repairOverrides RepairOverridesMap
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
totalStats aggregateStatsPlacements
repairQueue *queue.InsertBuffer
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[storj.RedundancyScheme]*partialRSStats
repairThresholdOverrides RepairOverrides
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
totalStats aggregateStatsPlacements

// reuse those slices to optimize memory usage
nodeIDs []storj.NodeID
Expand All @@ -285,19 +289,19 @@ type observerFork struct {
func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects.
return &observerFork{
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placements: observer.placements,
getObserverStats: observer.getObserverStats,
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
repairThresholdOverrides: observer.repairThresholdOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placements: observer.placements,
getObserverStats: observer.getObserverStats,
}
}

Expand Down Expand Up @@ -426,7 +430,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
segmentAgeIntVal.Observe(int64(segmentAge.Seconds()))
stats.segmentStats.segmentAge.Observe(int64(segmentAge.Seconds()))

required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairOverrides)
required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairThresholdOverrides)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, fork.nodeFailureRate, piecesCheck.ForcingRepair.Count())
segmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.segmentHealth.Observe(segmentHealth)
Expand Down

0 comments on commit fd0b23a

Please sign in to comment.