From d07a22113e6bc76585ab3b2cfa7a9718ac4f5d0a Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 15 Mar 2024 11:35:43 +0100 Subject: [PATCH] satellite/repair/repairer: allow overriding optimal shares while repair This change is adding new repair flag "checker.repair-target-overrides". This flag defines override for segment RS optimal shares value. We can use it to change optimal shares for existing segments while repair. We want to do that for all segments from before introducing new RS schema for new uploads (optimal shares = 65). https://github.com/storj/storj/issues/6594 Change-Id: I9462ddbb5912838edc17c45c67fc6736f5ed77b9 --- cmd/satellite/repair_segment.go | 3 +- satellite/repair/checker/config.go | 1 + satellite/repair/repair_test.go | 71 ++++++++++++ satellite/repair/repairer/segments.go | 156 +++++++++++++------------- satellite/repair/repairer/stats.go | 6 +- satellite/repairer.go | 3 +- satellite/satellite-config.yaml.lock | 3 + 7 files changed, 159 insertions(+), 84 deletions(-) diff --git a/cmd/satellite/repair_segment.go b/cmd/satellite/repair_segment.go index 6b5ba326f25f..a72c38f63caa 100644 --- a/cmd/satellite/repair_segment.go +++ b/cmd/satellite/repair_segment.go @@ -131,7 +131,8 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) { nil, // TODO add noop version ecRepairer, placement, - config.Checker.RepairOverrides, + config.Checker.RepairThresholdOverrides, + config.Checker.RepairTargetOverrides, config.Repairer, ) diff --git a/satellite/repair/checker/config.go b/satellite/repair/checker/config.go index 4ee8d006a9b0..6dde76100b0b 100644 --- a/satellite/repair/checker/config.go +++ b/satellite/repair/checker/config.go @@ -20,6 +20,7 @@ type Config struct { ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m" testDefault:"1m"` RepairOverrides RepairOverrides `help:"[DEPRECATED] comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"" devDefault:"" deprecated:"true"` RepairThresholdOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k-threshold" releaseDefault:"29-52" devDefault:""` + RepairTargetOverrides RepairOverrides `help:"comma-separated override values for repair success target in the format k-target" releaseDefault:"29-65" devDefault:""` // Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day. // This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration. NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" ` diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 2edaa138e8d6..a84d7fe659e9 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -1822,6 +1822,77 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) { }) } +func TestRepairTargetOverrides(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 10, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 3, 7, 7), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Checker.RepairThresholdOverrides.Values[2] = 4 + config.Checker.RepairTargetOverrides.Values[2] = 5 + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + // first, upload some remote data + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Stop() + + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() + satellite.Repair.Repairer.Loop.Pause() + + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testrand.Bytes(8*memory.KiB)) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + + toMarkOffline := 3 + for _, piece := range segment.Pieces[:toMarkOffline] { + node := planet.FindNode(piece.StorageNode) + + err := planet.StopNodeAndUpdate(ctx, node) + require.NoError(t, err) + + err = updateNodeCheckIn(ctx, satellite.DB.OverlayCache(), node, false, time.Now().Add(-24*time.Hour)) + require.NoError(t, err) + } + + // trigger checker with ranged loop to add segment to repair queue + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + // Verify that the segment is on the repair queue + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, count, 1) + + // Run the repairer + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that repair queue is empty and segment was repaired + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + require.NotNil(t, segment.RepairedAt) + + // for repair we aren't uploading number of pieces exact to optimal shares but we will + // upload between optimal shares and optimal shares * MaxExcessRateOptimalThreshold (e.g. 0.05) + // In production target number of pieces will be usually equal to optimal shares but on test env + // where things are going fast it may from time to time upload more pieces. + require.Contains(t, []int{5, 6}, len(segment.Pieces)) + }) +} + func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *testplanet.StorageNode, isUp bool, timestamp time.Time) error { local := node.Contact.Service.Local() checkInInfo := overlay.NodeCheckInInfo{ diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 8df487db8562..932b0030050c 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -99,6 +99,8 @@ type SegmentRepairer struct { // repairThresholdOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes. repairThresholdOverrides checker.RepairOverrides + // repairTargetOverrides is similar but determines the optimum number of pieces per segment. + repairTargetOverrides checker.RepairOverrides excludedCountryCodes map[location.CountryCode]struct{} @@ -121,7 +123,7 @@ func NewSegmentRepairer( reporter audit.Reporter, ecRepairer *ECRepairer, placements nodeselection.PlacementDefinitions, - repairThresholdOverrides checker.RepairOverrides, + repairThresholdOverrides, repairTargetOverrides checker.RepairOverrides, config Config, ) *SegmentRepairer { @@ -147,6 +149,7 @@ func NewSegmentRepairer( timeout: config.Timeout, multiplierOptimalThreshold: 1 + excessOptimalThreshold, repairThresholdOverrides: repairThresholdOverrides, + repairTargetOverrides: repairTargetOverrides, excludedCountryCodes: excludedCountryCodes, reporter: reporter, reputationUpdateEnabled: config.ReputationUpdateEnabled, @@ -191,19 +194,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return true, nil } - redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) - if err != nil { - return true, invalidRepairError.New("invalid redundancy strategy: %w", err) - } - - stats := repairer.getStatsByRS(&pb.RedundancyScheme{ - Type: pb.RedundancyScheme_SchemeType(segment.Redundancy.Algorithm), - ErasureShareSize: segment.Redundancy.ShareSize, - MinReq: int32(segment.Redundancy.RequiredShares), - RepairThreshold: int32(segment.Redundancy.RepairShares), - SuccessThreshold: int32(segment.Redundancy.OptimalShares), - Total: int32(segment.Redundancy.TotalShares), - }) + stats := repairer.getStatsByRS(segment.Redundancy) mon.Meter("repair_attempts").Mark(1) //mon:locked stats.repairAttempts.Mark(1) @@ -226,8 +217,10 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue pieces := segment.Pieces piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placements[segment.Placement]) + newRedundancy := repairer.newRedundancy(segment.Redundancy) + // irreparable segment - if piecesCheck.Retrievable.Count() < int(segment.Redundancy.RequiredShares) { + if piecesCheck.Retrievable.Count() < int(newRedundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(1) mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked @@ -235,7 +228,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue log.Warn("irreparable segment", zap.Int("piecesAvailable", piecesCheck.Retrievable.Count()), - zap.Int16("piecesRequired", segment.Redundancy.RequiredShares), + zap.Int16("piecesRequired", newRedundancy.RequiredShares), ) return false, nil } @@ -244,20 +237,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(0) - repairThreshold := int32(segment.Redundancy.RepairShares) - - pbRedundancy := &pb.RedundancyScheme{ - MinReq: int32(segment.Redundancy.RequiredShares), - RepairThreshold: int32(segment.Redundancy.RepairShares), - SuccessThreshold: int32(segment.Redundancy.OptimalShares), - Total: int32(segment.Redundancy.TotalShares), - } - overrideValue := repairer.repairThresholdOverrides.GetOverrideValuePB(pbRedundancy) - if overrideValue != 0 { - repairThreshold = overrideValue - } - - if piecesCheck.Healthy.Count() > int(repairThreshold) { + if piecesCheck.Healthy.Count() > int(newRedundancy.RepairShares) { // No repair is needed (note Healthy does not include pieces in ForcingRepair). var dropPieces metabase.Pieces @@ -280,7 +260,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue Position: segment.Position, OldPieces: segment.Pieces, - NewRedundancy: segment.Redundancy, + NewRedundancy: newRedundancy, NewPieces: newPieces, NewRepairedAt: time.Now(), @@ -295,9 +275,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) - log.Info("segment above repair threshold", zap.Int("numHealthy", piecesCheck.Healthy.Count()), zap.Int32("repairThreshold", repairThreshold), - zap.Int("numClumped", piecesCheck.Clumped.Count()), zap.Int("numExiting", piecesCheck.Exiting.Count()), zap.Int("numOffPieces", piecesCheck.OutOfPlacement.Count()), - zap.Int("numExcluded", piecesCheck.InExcludedCountry.Count()), zap.Int("droppedPieces", len(dropPieces))) + log.Info("segment above repair threshold", + zap.Int("numHealthy", piecesCheck.Healthy.Count()), + zap.Int16("repairThreshold", newRedundancy.RepairShares), + zap.Int16("repairTarget", newRedundancy.OptimalShares), + zap.Int("numClumped", piecesCheck.Clumped.Count()), + zap.Int("numExiting", piecesCheck.Exiting.Count()), + zap.Int("numOffPieces", piecesCheck.OutOfPlacement.Count()), + zap.Int("numExcluded", piecesCheck.InExcludedCountry.Count()), + zap.Int("droppedPieces", len(dropPieces))) return true, nil } @@ -347,13 +333,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue var requestCount int { - totalNeeded := int(math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold)) - if totalNeeded > redundancy.TotalCount() { - totalNeeded = redundancy.TotalCount() + totalNeeded := int(math.Ceil(float64(newRedundancy.OptimalShares) * repairer.multiplierOptimalThreshold)) + if totalNeeded > int(newRedundancy.TotalShares) { + totalNeeded = int(newRedundancy.TotalShares) } requestCount = totalNeeded - piecesCheck.Healthy.Count() } - minSuccessfulNeeded := redundancy.OptimalThreshold() - piecesCheck.Healthy.Count() + minSuccessfulNeeded := int(newRedundancy.OptimalShares) - piecesCheck.Healthy.Count() var alreadySelected []*nodeselection.SelectedNode for i := range selectedNodes { @@ -372,34 +358,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, overlayQueryError.Wrap(err) } - // Create the order limits for the PUT_REPAIR action. We want to keep pieces in Healthy - // as well as pieces in InExcludedCountry (our policy is to let those nodes keep the - // pieces they have, as long as they are kept intact and retrievable). - maxToKeep := int(segment.Redundancy.TotalShares) - len(newNodes) - toKeep := map[uint16]struct{}{} - - // TODO how to avoid this two loops - for _, piece := range pieces { - if piecesCheck.Healthy.Contains(int(piece.Number)) { - toKeep[piece.Number] = struct{}{} - } - } - for _, piece := range pieces { - if piecesCheck.InExcludedCountry.Contains(int(piece.Number)) { - if len(toKeep) >= maxToKeep { - break - } - toKeep[piece.Number] = struct{}{} - } - } - - putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes) + oldRedundancyStrategy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) if err != nil { - return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) + return true, invalidRepairError.New("invalid redundancy strategy: %w", err) } // Download the segment using just the retrievable pieces - segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, oldRedundancyStrategy, int64(segment.EncryptedSize)) // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked @@ -536,13 +501,44 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.reporter.RecordAudits(ctx, report) } + // Create the order limits for the PUT_REPAIR action. We want to keep pieces in Healthy + // as well as pieces in InExcludedCountry (our policy is to let those nodes keep the + // pieces they have, as long as they are kept intact and retrievable). + maxToKeep := int(newRedundancy.TotalShares) - len(newNodes) + toKeep := map[uint16]struct{}{} + + // TODO how to avoid this two loops + for _, piece := range pieces { + if piecesCheck.Healthy.Contains(int(piece.Number)) { + toKeep[piece.Number] = struct{}{} + } + } + for _, piece := range pieces { + if piecesCheck.InExcludedCountry.Contains(int(piece.Number)) { + if len(toKeep) >= maxToKeep { + break + } + toKeep[piece.Number] = struct{}{} + } + } + + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes) + if err != nil { + return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) + } + + newRedundancyStrategy, err := eestream.NewRedundancyStrategyFromStorj(newRedundancy) + if err != nil { + return true, invalidRepairError.New("invalid redundancy strategy: %w", err) + } + // Upload the repaired pieces - successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, minSuccessfulNeeded) + successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, newRedundancyStrategy, segmentReader, repairer.timeout, minSuccessfulNeeded) if err != nil { return false, repairPutError.Wrap(err) } - pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) + pieceSize := newRedundancy.PieceSize(int64(segment.EncryptedSize)) var bytesRepaired int64 // Add the successfully uploaded pieces to repairedPieces @@ -565,10 +561,10 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue healthyAfterRepair := piecesCheck.Healthy.Count() + len(repairedPieces) switch { - case healthyAfterRepair >= int(segment.Redundancy.OptimalShares): + case healthyAfterRepair >= int(newRedundancy.OptimalShares): mon.Meter("repair_success").Mark(1) //mon:locked stats.repairSuccess.Mark(1) - case healthyAfterRepair <= int(segment.Redundancy.RepairShares): + case healthyAfterRepair <= int(newRedundancy.RepairShares): // Important: this indicates a failure to PUT enough pieces to the network to pass // the repair threshold, and _not_ a failure to reconstruct the segment. But we // put at least one piece, else ec.Repair() would have returned an error. So the @@ -582,8 +578,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } healthyRatioAfterRepair := 0.0 - if segment.Redundancy.TotalShares != 0 { - healthyRatioAfterRepair = float64(healthyAfterRepair) / float64(segment.Redundancy.TotalShares) + if newRedundancy.TotalShares != 0 { + healthyRatioAfterRepair = float64(healthyAfterRepair) / float64(newRedundancy.TotalShares) } mon.FloatVal("healthy_ratio_after_repair").Observe(healthyRatioAfterRepair) //mon:locked @@ -591,7 +587,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue toRemove := make(map[uint16]metabase.Piece, piecesCheck.Unhealthy.Count()) switch { - case healthyAfterRepair >= int(segment.Redundancy.OptimalShares): + case healthyAfterRepair >= int(newRedundancy.OptimalShares): // Repair was fully successful; remove all unhealthy pieces except those in // (Retrievable AND InExcludedCountry). Those, we allow to remain on the nodes as // long as the nodes are keeping the pieces intact and available. @@ -605,7 +601,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue toRemove[piece.Number] = piece } } - case healthyAfterRepair > int(segment.Redundancy.RepairShares): + case healthyAfterRepair > int(newRedundancy.RepairShares): // Repair was successful enough that we still want to drop all out-of-placement // pieces. We want to do that wherever possible, except where doing so puts data in // jeopardy. @@ -642,7 +638,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue Position: segment.Position, OldPieces: segment.Pieces, - NewRedundancy: segment.Redundancy, + NewRedundancy: newRedundancy, NewPieces: newPieces, NewRepairedAt: time.Now(), @@ -707,18 +703,18 @@ func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldS return nil } -func (repairer *SegmentRepairer) getStatsByRS(redundancy *pb.RedundancyScheme) *stats { - rsString := getRSString(repairer.loadRedundancy(redundancy)) - return repairer.statsCollector.getStatsByRS(rsString) +func (repairer *SegmentRepairer) getStatsByRS(redundancy storj.RedundancyScheme) *stats { + return repairer.statsCollector.getStatsByRS(getRSString(redundancy)) } -func (repairer *SegmentRepairer) loadRedundancy(redundancy *pb.RedundancyScheme) (int, int, int, int) { - repair := int(redundancy.RepairThreshold) - overrideValue := repairer.repairThresholdOverrides.GetOverrideValuePB(redundancy) - if overrideValue != 0 { - repair = int(overrideValue) +func (repairer *SegmentRepairer) newRedundancy(redundancy storj.RedundancyScheme) storj.RedundancyScheme { + if overrideValue := repairer.repairThresholdOverrides.GetOverrideValue(redundancy); overrideValue != 0 { + redundancy.RepairShares = int16(overrideValue) + } + if overrideValue := repairer.repairTargetOverrides.GetOverrideValue(redundancy); overrideValue != 0 { + redundancy.OptimalShares = int16(overrideValue) } - return int(redundancy.MinReq), repair, int(redundancy.SuccessThreshold), int(redundancy.Total) + return redundancy } // SetNow allows tests to have the server act as if the current time is whatever they want. diff --git a/satellite/repair/repairer/stats.go b/satellite/repair/repairer/stats.go index 4b0c64c4e2bb..c411ea184bb1 100644 --- a/satellite/repair/repairer/stats.go +++ b/satellite/repair/repairer/stats.go @@ -8,6 +8,8 @@ import ( "sync" "github.com/spacemonkeygo/monkit/v3" + + "storj.io/common/storj" ) // statsCollector holds a *stats for each redundancy scheme @@ -92,6 +94,6 @@ func (stats *stats) Stats(cb func(key monkit.SeriesKey, field string, val float6 stats.segmentRepairCount.Stats(cb) } -func getRSString(min, repair, success, total int) string { - return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total) +func getRSString(rs storj.RedundancyScheme) string { + return fmt.Sprintf("%d/%d/%d/%d", rs.RequiredShares, rs.RepairShares, rs.OptimalShares, rs.TotalShares) } diff --git a/satellite/repairer.go b/satellite/repairer.go index 6a24ea87edec..9a2735779c8d 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -238,7 +238,8 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, peer.Audit.Reporter, peer.EcRepairer, placement, - config.Checker.RepairOverrides, + config.Checker.RepairTargetOverrides, + config.Checker.RepairTargetOverrides, config.Repairer, ) peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer) diff --git a/satellite/satellite-config.yaml.lock b/satellite/satellite-config.yaml.lock index ea473f58e427..6813036eaa54 100644 --- a/satellite/satellite-config.yaml.lock +++ b/satellite/satellite-config.yaml.lock @@ -130,6 +130,9 @@ # Number of damaged segments to buffer in-memory before flushing to the repair queue # checker.repair-queue-insert-batch-size: 100 +# comma-separated override values for repair success target in the format k-target +# checker.repair-target-overrides: 29-65 + # comma-separated override values for repair threshold in the format k-threshold # checker.repair-threshold-overrides: 29-52