Skip to content

Commit

Permalink
satellite/repair: unify repair logic
Browse files Browse the repository at this point in the history
The repair checker and repair worker both need to determine which pieces
are healthy, which are retrievable, and which should be replaced, but
they have been doing it in different ways in different code, which has
been the cause of bugs. The same term could have very similar but subtly
different meanings between the two, causing much confusion.

With this change, the piece- and node-classification logic is
consolidated into one place within the satellite/repair package, so that
both subsystems can use it. This ought to make decision-making code more
concise and more readable.

The consolidated classification logic has been expanded to create more
sets, so that the decision-making code does not need to do as much
precalculation. It should now be clearer in comments and code that a
piece can belong to multiple sets arbitrarily (except where the
definition of the sets makes this logically impossible), and what the
precise meaning of each set is. These sets include Missing, Suspended,
Clumped, OutOfPlacement, InExcludedCountry, ForcingRepair,
UnhealthyRetrievable, Unhealthy, Retrievable, and Healthy.

Some other side effects of this change:

* CreatePutRepairOrderLimits no longer needs to special-case excluded
  countries; it can just create as many order limits as requested (by
  way of len(newNodes)).
* The repair checker will now queue a segment for repair when there are
  any pieces out of placement. The code calls this "forcing a repair".
* The checker.ReliabilityCache is now accessed by way of a GetNodes()
  function similar to the one on the overlay. The classification methods
  like MissingPieces(), OutOfPlacementPieces(), and
  PiecesNodesLastNetsInOrder() are removed in favor of the
  classification logic in satellite/repair/classification.go. This
  means the reliability cache no longer needs access to the placement
  rules or excluded countries list.

Change-Id: I105109fb94ee126952f07d747c6e11131164fadb
  • Loading branch information
thepaul committed Sep 25, 2023
1 parent c44e3d7 commit 1b8bd6c
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 596 deletions.
4 changes: 1 addition & 3 deletions cmd/satellite/repair_segment.go
Expand Up @@ -281,10 +281,8 @@ func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repai
return errs.New("not enough new nodes were found for repair: min %v got %v", redundancy.RepairThreshold(), len(newNodes))
}

optimalThresholdMultiplier := float64(1) // is this value fine?
numHealthyInExcludedCountries := 0
putLimits, putPrivateKey, err := peer.Orders.Service.CreatePutRepairOrderLimits(ctx, segment, make([]*pb.AddressedOrderLimit, len(newNodes)),
make(map[int32]struct{}), newNodes, optimalThresholdMultiplier, numHealthyInExcludedCountries)
make(map[uint16]struct{}), newNodes)
if err != nil {
return errs.New("could not create PUT_REPAIR order limits: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion monkit.lock
Expand Up @@ -128,7 +128,6 @@ storj.io/storj/satellite/repair/repairer."repair_too_many_nodes_failed" Meter
storj.io/storj/satellite/repair/repairer."repair_unnecessary" Meter
storj.io/storj/satellite/repair/repairer."repairer_segments_below_min_req" Counter
storj.io/storj/satellite/repair/repairer."segment_deleted_before_repair" Meter
storj.io/storj/satellite/repair/repairer."segment_repair_count" IntVal
storj.io/storj/satellite/repair/repairer."segment_time_until_repair" IntVal
storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal
storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal
Expand Down
28 changes: 21 additions & 7 deletions satellite/audit/verifier_test.go
Expand Up @@ -1106,6 +1106,7 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = true
config.Repairer.MaxExcessRateOptimalThreshold = 0.0
},
testplanet.ReconfigureRS(3, 5, 8, 10),
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
Expand Down Expand Up @@ -1138,7 +1139,8 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
}

// make extra pieces after optimal bad
// make extra pieces after optimal bad, so we know there are exactly OptimalShares
// retrievable shares. numExcluded of them are in an excluded country.
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
require.NoError(t, err)
Expand Down Expand Up @@ -1167,17 +1169,29 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
require.Equal(t, 10, len(segmentAfterRepair.Pieces))

// check excluded area nodes still exist
for i, n := range nodesInExcluded {
var found bool
// the number of nodes that should still be online holding intact pieces, not in
// excluded countries
expectHealthyNodes := int(segment.Redundancy.OptimalShares) - numExcluded
// repair should make this many new pieces to get the segment up to OptimalShares
// shares, not counting excluded-country nodes
expectNewPieces := int(segment.Redundancy.OptimalShares) - expectHealthyNodes
// so there should be this many pieces after repair, not counting excluded-country
// nodes
expectPiecesAfterRepair := expectHealthyNodes + expectNewPieces
// so there should be this many excluded-country pieces left in the segment (we
// couldn't keep all of them, or we would have had more than TotalShares pieces).
expectRemainingExcluded := int(segment.Redundancy.TotalShares) - expectPiecesAfterRepair

found := 0
for _, nodeID := range nodesInExcluded {
for _, p := range segmentAfterRepair.Pieces {
if p.StorageNode == n {
found = true
if p.StorageNode == nodeID {
found++
break
}
}
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
}
require.Equal(t, expectRemainingExcluded, found, "found wrong number of excluded-country pieces after repair")
nodesInPointer := make(map[storj.NodeID]bool)
for _, n := range segmentAfterRepair.Pieces {
// check for duplicates
Expand Down
20 changes: 3 additions & 17 deletions satellite/orders/service.go
Expand Up @@ -5,7 +5,6 @@ package orders

import (
"context"
"math"
mathrand "math/rand"
"sync"
"time"
Expand Down Expand Up @@ -459,18 +458,12 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, segment
}

// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[int32]struct{}, newNodes []*nodeselection.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[uint16]struct{}, newNodes []*nodeselection.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)

// Create the order limits for being used to upload the repaired pieces
pieceSize := segment.PieceSize()

totalPieces := int(segment.Redundancy.TotalShares)
totalPiecesAfterRepair := int(math.Ceil(float64(segment.Redundancy.OptimalShares)*optimalThresholdMultiplier)) + numPiecesInExcludedCountries

if totalPiecesAfterRepair > totalPieces {
totalPiecesAfterRepair = totalPieces
}

var numRetrievablePieces int
for _, o := range getOrderLimits {
Expand All @@ -479,8 +472,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
}
}

totalPiecesToRepair := totalPiecesAfterRepair - len(healthySet)

limits := make([]*pb.AddressedOrderLimit, totalPieces)

expirationDate := time.Time{}
Expand All @@ -493,7 +484,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}

var pieceNum int32
var pieceNum uint16
for _, node := range newNodes {
for int(pieceNum) < totalPieces {
_, isHealthy := healthySet[pieceNum]
Expand All @@ -507,18 +498,13 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces)
}

limit, err := signer.Sign(ctx, resolveStorageNode_Selected(node, false), pieceNum)
limit, err := signer.Sign(ctx, resolveStorageNode_Selected(node, false), int32(pieceNum))
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}

limits[pieceNum] = limit
pieceNum++
totalPiecesToRepair--

if totalPiecesToRepair == 0 {
break
}
}

return limits, signer.PrivateKey, nil
Expand Down
119 changes: 58 additions & 61 deletions satellite/repair/checker/observer.go
Expand Up @@ -17,10 +17,9 @@ import (
"golang.org/x/exp/slices"

"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/queue"
Expand All @@ -40,8 +39,10 @@ type Observer struct {
repairOverrides RepairOverridesMap
nodeFailureRate float64
repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placementRules overlay.PlacementRules

// the following are reset on each iteration
startTime time.Time
Expand All @@ -53,17 +54,26 @@ type Observer struct {

// NewObserver creates new checker observer instance.
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules overlay.PlacementRules, config Config) *Observer {
excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range config.RepairExcludedCountryCodes {
if cc := location.ToCountryCode(countryCode); cc != location.None {
excludedCountryCodes[cc] = struct{}{}
}
}

return &Observer{
logger: logger,

repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, config.RepairExcludedCountryCodes),
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placementRules: placementRules,
statsCollector: make(map[string]*observerRSStats),
}
}
Expand Down Expand Up @@ -231,29 +241,35 @@ type observerFork struct {
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
doDeclumping bool
doPlacementCheck bool
lastStreamID uuid.UUID
totalStats aggregateStats

// define from which countries nodes should be marked as offline
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placementRules overlay.PlacementRules

getObserverStats func(string) *observerRSStats
}

// newObserverFork creates new observer partial instance.
func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects.
return &observerFork{
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
getObserverStats: observer.getObserverStats,
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
excludedCountryCodes: observer.excludedCountryCodes,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
placementRules: observer.placementRules,
getObserverStats: observer.getObserverStats,
}
}

Expand Down Expand Up @@ -334,52 +350,28 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("could not get estimate of total number of nodes: %w", err)
}

missingPieces, err := fork.nodesCache.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
nodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
nodeIDs[i] = piece.StorageNode
}
selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, nodeIDs)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting missing pieces: %w", err)
}

var clumpedPieces metabase.Pieces
var lastNets []string

nodeFilter := fork.nodesCache.placementRules(segment.Placement)

if fork.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilter) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining node last_nets"), err)
}
clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets)
}

numOutOfPlacementPieces := 0
if fork.doPlacementCheck {
outOfPlacementPieces, err := fork.nodesCache.OutOfPlacementPieces(ctx, segment.CreatedAt, segment.Pieces, segment.Placement)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining nodes placement"), err)
}

numOutOfPlacementPieces = len(outOfPlacementPieces)
return Error.New("error getting node information for pieces: %w", err)
}
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck, fork.doDeclumping, fork.placementRules(segment.Placement))

numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
numHealthy := len(piecesCheck.Healthy)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))

mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(numOutOfPlacementPieces)) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(numOutOfPlacementPieces))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(piecesCheck.Clumped))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(len(piecesCheck.OutOfPlacement))) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement)))

segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
Expand All @@ -395,7 +387,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numOutOfPlacementPieces > 0 {
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || len(piecesCheck.ForcingRepair) > 0 {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++
Expand All @@ -418,8 +410,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}

// monitor irreparable segments
numRetrievable := len(pieces) - len(missingPieces)
if numRetrievable < required {
if len(piecesCheck.Retrievable) < required {
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
}
Expand Down Expand Up @@ -449,18 +440,24 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
stats.segmentStats.segmentsBelowMinReq.Inc(1)

var unhealthyNodes []string
for _, p := range missingPieces {
unhealthyNodes = append(unhealthyNodes, p.StorageNode.String())
var missingNodes []string
for _, piece := range pieces {
if _, isMissing := piecesCheck.Missing[piece.Number]; isMissing {
missingNodes = append(missingNodes, piece.StorageNode.String())
}
}
fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ",")))
} else if numRetrievable > repairThreshold {
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unavailable node IDs", strings.Join(missingNodes, ",")))
} else if len(piecesCheck.Clumped) > 0 && len(piecesCheck.Healthy)+len(piecesCheck.Clumped) > repairThreshold && len(piecesCheck.ForcingRepair) == 0 {
// This segment is to be repaired because of clumping (it wouldn't need repair yet
// otherwise). Produce a brief report of where the clumping occurred so that we have
// a better understanding of the cause.
lastNets := make([]string, len(pieces))
for i, node := range selectedNodes {
lastNets[i] = node.LastNet
}
clumpedNets := clumpingReport{lastNets: lastNets}
fork.log.Info("segment needs repair because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets))
fork.log.Info("segment needs repair only because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets))
}
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) {
Expand Down
10 changes: 7 additions & 3 deletions satellite/repair/checker/observer_unit_test.go
Expand Up @@ -47,13 +47,16 @@ func TestObserverForkProcess(t *testing.T) {
}

ctx := testcontext.New(t)
placementRules := overlay.ConfigurablePlacementRule{}
parsed, err := placementRules.Parse()
require.NoError(t, err)
createDefaultObserver := func() *Observer {
o := &Observer{
statsCollector: make(map[string]*observerRSStats),
nodesCache: &ReliabilityCache{
staleness: time.Hour,
placementRules: overlay.NewPlacementDefinitions().CreateFilters,
staleness: time.Hour,
},
placementRules: parsed.CreateFilters,
}

o.nodesCache.state.Store(&reliabilityState{
Expand All @@ -72,6 +75,7 @@ func TestObserverForkProcess(t *testing.T) {
rsStats: make(map[string]*partialRSStats),
doDeclumping: o.doDeclumping,
doPlacementCheck: o.doPlacementCheck,
placementRules: o.placementRules,
getNodesEstimate: o.getNodesEstimate,
nodesCache: o.nodesCache,
repairQueue: queue.NewInsertBuffer(q, 1000),
Expand Down Expand Up @@ -146,7 +150,7 @@ func TestObserverForkProcess(t *testing.T) {
require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF)))
parsed, err := placements.Parse()
require.NoError(t, err)
o.nodesCache.placementRules = parsed.CreateFilters
o.placementRules = parsed.CreateFilters

q := queue.MockRepairQueue{}
fork := createFork(o, &q)
Expand Down

0 comments on commit 1b8bd6c

Please sign in to comment.