Skip to content

Commit

Permalink
satellite/repair/repairer: allow overriding optimal shares while repair
Browse files Browse the repository at this point in the history
This change is adding new repair flag "checker.repair-target-overrides".
This flag defines override for segment RS optimal shares value. We can
use it to change optimal shares for existing segments while repair. We
want to do that for all segments from before introducing new RS schema
for new uploads (optimal shares = 65).

#6594

Change-Id: I9462ddbb5912838edc17c45c67fc6736f5ed77b9
  • Loading branch information
mniewrzal authored and Storj Robot committed Mar 25, 2024
1 parent 313b7e3 commit d07a221
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 84 deletions.
3 changes: 2 additions & 1 deletion cmd/satellite/repair_segment.go
Expand Up @@ -131,7 +131,8 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
nil, // TODO add noop version
ecRepairer,
placement,
config.Checker.RepairOverrides,
config.Checker.RepairThresholdOverrides,
config.Checker.RepairTargetOverrides,
config.Repairer,
)

Expand Down
1 change: 1 addition & 0 deletions satellite/repair/checker/config.go
Expand Up @@ -20,6 +20,7 @@ type Config struct {
ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m" testDefault:"1m"`
RepairOverrides RepairOverrides `help:"[DEPRECATED] comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"" devDefault:"" deprecated:"true"`
RepairThresholdOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"29-52" devDefault:""`
RepairTargetOverrides RepairOverrides `help:"comma-separated override values for repair success target in the format k-target" releaseDefault:"29-65" devDefault:""`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
Expand Down
71 changes: 71 additions & 0 deletions satellite/repair/repair_test.go
Expand Up @@ -1822,6 +1822,77 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) {
})
}

func TestRepairTargetOverrides(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 10,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, 7, 7),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Checker.RepairThresholdOverrides.Values[2] = 4
config.Checker.RepairTargetOverrides.Values[2] = 5
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()

satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()

err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testrand.Bytes(8*memory.KiB))
require.NoError(t, err)

segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket")

toMarkOffline := 3
for _, piece := range segment.Pieces[:toMarkOffline] {
node := planet.FindNode(piece.StorageNode)

err := planet.StopNodeAndUpdate(ctx, node)
require.NoError(t, err)

err = updateNodeCheckIn(ctx, satellite.DB.OverlayCache(), node, false, time.Now().Add(-24*time.Hour))
require.NoError(t, err)
}

// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)

// Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Equal(t, count, 1)

// Run the repairer
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs()

// Verify that repair queue is empty and segment was repaired
count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Zero(t, count)

segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket")
require.NotNil(t, segment.RepairedAt)

// for repair we aren't uploading number of pieces exact to optimal shares but we will
// upload between optimal shares and optimal shares * MaxExcessRateOptimalThreshold (e.g. 0.05)
// In production target number of pieces will be usually equal to optimal shares but on test env
// where things are going fast it may from time to time upload more pieces.
require.Contains(t, []int{5, 6}, len(segment.Pieces))
})
}

func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *testplanet.StorageNode, isUp bool, timestamp time.Time) error {
local := node.Contact.Service.Local()
checkInInfo := overlay.NodeCheckInInfo{
Expand Down
156 changes: 76 additions & 80 deletions satellite/repair/repairer/segments.go
Expand Up @@ -99,6 +99,8 @@ type SegmentRepairer struct {

// repairThresholdOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairThresholdOverrides checker.RepairOverrides
// repairTargetOverrides is similar but determines the optimum number of pieces per segment.
repairTargetOverrides checker.RepairOverrides

excludedCountryCodes map[location.CountryCode]struct{}

Expand All @@ -121,7 +123,7 @@ func NewSegmentRepairer(
reporter audit.Reporter,
ecRepairer *ECRepairer,
placements nodeselection.PlacementDefinitions,
repairThresholdOverrides checker.RepairOverrides,
repairThresholdOverrides, repairTargetOverrides checker.RepairOverrides,
config Config,
) *SegmentRepairer {

Expand All @@ -147,6 +149,7 @@ func NewSegmentRepairer(
timeout: config.Timeout,
multiplierOptimalThreshold: 1 + excessOptimalThreshold,
repairThresholdOverrides: repairThresholdOverrides,
repairTargetOverrides: repairTargetOverrides,
excludedCountryCodes: excludedCountryCodes,
reporter: reporter,
reputationUpdateEnabled: config.ReputationUpdateEnabled,
Expand Down Expand Up @@ -191,19 +194,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return true, nil
}

redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return true, invalidRepairError.New("invalid redundancy strategy: %w", err)
}

stats := repairer.getStatsByRS(&pb.RedundancyScheme{
Type: pb.RedundancyScheme_SchemeType(segment.Redundancy.Algorithm),
ErasureShareSize: segment.Redundancy.ShareSize,
MinReq: int32(segment.Redundancy.RequiredShares),
RepairThreshold: int32(segment.Redundancy.RepairShares),
SuccessThreshold: int32(segment.Redundancy.OptimalShares),
Total: int32(segment.Redundancy.TotalShares),
})
stats := repairer.getStatsByRS(segment.Redundancy)

mon.Meter("repair_attempts").Mark(1) //mon:locked
stats.repairAttempts.Mark(1)
Expand All @@ -226,16 +217,18 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
pieces := segment.Pieces
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placements[segment.Placement])

newRedundancy := repairer.newRedundancy(segment.Redundancy)

// irreparable segment
if piecesCheck.Retrievable.Count() < int(segment.Redundancy.RequiredShares) {
if piecesCheck.Retrievable.Count() < int(newRedundancy.RequiredShares) {
mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked
stats.repairerSegmentsBelowMinReq.Inc(1)
mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked
stats.repairerNodesUnavailable.Mark(1)

log.Warn("irreparable segment",
zap.Int("piecesAvailable", piecesCheck.Retrievable.Count()),
zap.Int16("piecesRequired", segment.Redundancy.RequiredShares),
zap.Int16("piecesRequired", newRedundancy.RequiredShares),
)
return false, nil
}
Expand All @@ -244,20 +237,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked
stats.repairerSegmentsBelowMinReq.Inc(0)

repairThreshold := int32(segment.Redundancy.RepairShares)

pbRedundancy := &pb.RedundancyScheme{
MinReq: int32(segment.Redundancy.RequiredShares),
RepairThreshold: int32(segment.Redundancy.RepairShares),
SuccessThreshold: int32(segment.Redundancy.OptimalShares),
Total: int32(segment.Redundancy.TotalShares),
}
overrideValue := repairer.repairThresholdOverrides.GetOverrideValuePB(pbRedundancy)
if overrideValue != 0 {
repairThreshold = overrideValue
}

if piecesCheck.Healthy.Count() > int(repairThreshold) {
if piecesCheck.Healthy.Count() > int(newRedundancy.RepairShares) {
// No repair is needed (note Healthy does not include pieces in ForcingRepair).

var dropPieces metabase.Pieces
Expand All @@ -280,7 +260,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
Position: segment.Position,

OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,
NewRedundancy: newRedundancy,
NewPieces: newPieces,

NewRepairedAt: time.Now(),
Expand All @@ -295,9 +275,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue

mon.Meter("repair_unnecessary").Mark(1) //mon:locked
stats.repairUnnecessary.Mark(1)
log.Info("segment above repair threshold", zap.Int("numHealthy", piecesCheck.Healthy.Count()), zap.Int32("repairThreshold", repairThreshold),
zap.Int("numClumped", piecesCheck.Clumped.Count()), zap.Int("numExiting", piecesCheck.Exiting.Count()), zap.Int("numOffPieces", piecesCheck.OutOfPlacement.Count()),
zap.Int("numExcluded", piecesCheck.InExcludedCountry.Count()), zap.Int("droppedPieces", len(dropPieces)))
log.Info("segment above repair threshold",
zap.Int("numHealthy", piecesCheck.Healthy.Count()),
zap.Int16("repairThreshold", newRedundancy.RepairShares),
zap.Int16("repairTarget", newRedundancy.OptimalShares),
zap.Int("numClumped", piecesCheck.Clumped.Count()),
zap.Int("numExiting", piecesCheck.Exiting.Count()),
zap.Int("numOffPieces", piecesCheck.OutOfPlacement.Count()),
zap.Int("numExcluded", piecesCheck.InExcludedCountry.Count()),
zap.Int("droppedPieces", len(dropPieces)))
return true, nil
}

Expand Down Expand Up @@ -347,13 +333,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue

var requestCount int
{
totalNeeded := int(math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold))
if totalNeeded > redundancy.TotalCount() {
totalNeeded = redundancy.TotalCount()
totalNeeded := int(math.Ceil(float64(newRedundancy.OptimalShares) * repairer.multiplierOptimalThreshold))
if totalNeeded > int(newRedundancy.TotalShares) {
totalNeeded = int(newRedundancy.TotalShares)
}
requestCount = totalNeeded - piecesCheck.Healthy.Count()
}
minSuccessfulNeeded := redundancy.OptimalThreshold() - piecesCheck.Healthy.Count()
minSuccessfulNeeded := int(newRedundancy.OptimalShares) - piecesCheck.Healthy.Count()

var alreadySelected []*nodeselection.SelectedNode
for i := range selectedNodes {
Expand All @@ -372,34 +358,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, overlayQueryError.Wrap(err)
}

// Create the order limits for the PUT_REPAIR action. We want to keep pieces in Healthy
// as well as pieces in InExcludedCountry (our policy is to let those nodes keep the
// pieces they have, as long as they are kept intact and retrievable).
maxToKeep := int(segment.Redundancy.TotalShares) - len(newNodes)
toKeep := map[uint16]struct{}{}

// TODO how to avoid this two loops
for _, piece := range pieces {
if piecesCheck.Healthy.Contains(int(piece.Number)) {
toKeep[piece.Number] = struct{}{}
}
}
for _, piece := range pieces {
if piecesCheck.InExcludedCountry.Contains(int(piece.Number)) {
if len(toKeep) >= maxToKeep {
break
}
toKeep[piece.Number] = struct{}{}
}
}

putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes)
oldRedundancyStrategy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
return true, invalidRepairError.New("invalid redundancy strategy: %w", err)
}

// Download the segment using just the retrievable pieces
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize))
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, oldRedundancyStrategy, int64(segment.EncryptedSize))

// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked
Expand Down Expand Up @@ -536,13 +501,44 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
repairer.reporter.RecordAudits(ctx, report)
}

// Create the order limits for the PUT_REPAIR action. We want to keep pieces in Healthy
// as well as pieces in InExcludedCountry (our policy is to let those nodes keep the
// pieces they have, as long as they are kept intact and retrievable).
maxToKeep := int(newRedundancy.TotalShares) - len(newNodes)
toKeep := map[uint16]struct{}{}

// TODO how to avoid this two loops
for _, piece := range pieces {
if piecesCheck.Healthy.Contains(int(piece.Number)) {
toKeep[piece.Number] = struct{}{}
}
}
for _, piece := range pieces {
if piecesCheck.InExcludedCountry.Contains(int(piece.Number)) {
if len(toKeep) >= maxToKeep {
break
}
toKeep[piece.Number] = struct{}{}
}
}

putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes)
if err != nil {
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
}

newRedundancyStrategy, err := eestream.NewRedundancyStrategyFromStorj(newRedundancy)
if err != nil {
return true, invalidRepairError.New("invalid redundancy strategy: %w", err)
}

// Upload the repaired pieces
successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, minSuccessfulNeeded)
successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, newRedundancyStrategy, segmentReader, repairer.timeout, minSuccessfulNeeded)
if err != nil {
return false, repairPutError.Wrap(err)
}

pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
pieceSize := newRedundancy.PieceSize(int64(segment.EncryptedSize))
var bytesRepaired int64

// Add the successfully uploaded pieces to repairedPieces
Expand All @@ -565,10 +561,10 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue

healthyAfterRepair := piecesCheck.Healthy.Count() + len(repairedPieces)
switch {
case healthyAfterRepair >= int(segment.Redundancy.OptimalShares):
case healthyAfterRepair >= int(newRedundancy.OptimalShares):
mon.Meter("repair_success").Mark(1) //mon:locked
stats.repairSuccess.Mark(1)
case healthyAfterRepair <= int(segment.Redundancy.RepairShares):
case healthyAfterRepair <= int(newRedundancy.RepairShares):
// Important: this indicates a failure to PUT enough pieces to the network to pass
// the repair threshold, and _not_ a failure to reconstruct the segment. But we
// put at least one piece, else ec.Repair() would have returned an error. So the
Expand All @@ -582,16 +578,16 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}

healthyRatioAfterRepair := 0.0
if segment.Redundancy.TotalShares != 0 {
healthyRatioAfterRepair = float64(healthyAfterRepair) / float64(segment.Redundancy.TotalShares)
if newRedundancy.TotalShares != 0 {
healthyRatioAfterRepair = float64(healthyAfterRepair) / float64(newRedundancy.TotalShares)
}

mon.FloatVal("healthy_ratio_after_repair").Observe(healthyRatioAfterRepair) //mon:locked
stats.healthyRatioAfterRepair.Observe(healthyRatioAfterRepair)

toRemove := make(map[uint16]metabase.Piece, piecesCheck.Unhealthy.Count())
switch {
case healthyAfterRepair >= int(segment.Redundancy.OptimalShares):
case healthyAfterRepair >= int(newRedundancy.OptimalShares):
// Repair was fully successful; remove all unhealthy pieces except those in
// (Retrievable AND InExcludedCountry). Those, we allow to remain on the nodes as
// long as the nodes are keeping the pieces intact and available.
Expand All @@ -605,7 +601,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
toRemove[piece.Number] = piece
}
}
case healthyAfterRepair > int(segment.Redundancy.RepairShares):
case healthyAfterRepair > int(newRedundancy.RepairShares):
// Repair was successful enough that we still want to drop all out-of-placement
// pieces. We want to do that wherever possible, except where doing so puts data in
// jeopardy.
Expand Down Expand Up @@ -642,7 +638,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
Position: segment.Position,

OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,
NewRedundancy: newRedundancy,
NewPieces: newPieces,

NewRepairedAt: time.Now(),
Expand Down Expand Up @@ -707,18 +703,18 @@ func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldS
return nil
}

func (repairer *SegmentRepairer) getStatsByRS(redundancy *pb.RedundancyScheme) *stats {
rsString := getRSString(repairer.loadRedundancy(redundancy))
return repairer.statsCollector.getStatsByRS(rsString)
func (repairer *SegmentRepairer) getStatsByRS(redundancy storj.RedundancyScheme) *stats {
return repairer.statsCollector.getStatsByRS(getRSString(redundancy))
}

func (repairer *SegmentRepairer) loadRedundancy(redundancy *pb.RedundancyScheme) (int, int, int, int) {
repair := int(redundancy.RepairThreshold)
overrideValue := repairer.repairThresholdOverrides.GetOverrideValuePB(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
func (repairer *SegmentRepairer) newRedundancy(redundancy storj.RedundancyScheme) storj.RedundancyScheme {
if overrideValue := repairer.repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
redundancy.RepairShares = int16(overrideValue)
}
if overrideValue := repairer.repairTargetOverrides.GetOverrideValue(redundancy); overrideValue != 0 {
redundancy.OptimalShares = int16(overrideValue)
}
return int(redundancy.MinReq), repair, int(redundancy.SuccessThreshold), int(redundancy.Total)
return redundancy
}

// SetNow allows tests to have the server act as if the current time is whatever they want.
Expand Down

0 comments on commit d07a221

Please sign in to comment.