Skip to content

Commit

Permalink
satellite/repair/repairer: fix NumHealthyInExcludedCountries calculation
Browse files Browse the repository at this point in the history
Currently, we have issue were while counting unhealthy pieces we are
counting twice piece which is in excluded country and is outside segment
placement. This can cause unnecessary repair.

This change is also doing another step to move RepairExcludedCountryCodes
from overlay config into repair package.

Change-Id: I3692f6e0ddb9982af925db42be23d644aec1963f
  • Loading branch information
mniewrzal committed Jul 10, 2023
1 parent 05f3074 commit 1d62dc6
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 160 deletions.
3 changes: 1 addition & 2 deletions satellite/metabase/rangedloop/service_test.go
Expand Up @@ -426,9 +426,8 @@ func TestAllInOne(t *testing.T) {
log.Named("repair:checker"),
satellite.DB.RepairQueue(),
satellite.Overlay.Service,
satellite.Config.Checker,
overlay.NewPlacementRules().CreateFilters,
[]string{},
satellite.Config.Checker,
),
})

Expand Down
36 changes: 0 additions & 36 deletions satellite/overlay/service.go
Expand Up @@ -62,8 +62,6 @@ type DB interface {

// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error)
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
Expand Down Expand Up @@ -199,7 +197,6 @@ type NodeCriteria struct {
MinimumVersion string // semver or empty
OnlineWindow time.Duration
AsOfSystemInterval time.Duration // only used for CRDB queries
ExcludedCountries []string
}

// ReputationStatus indicates current reputation status for a node.
Expand Down Expand Up @@ -540,17 +537,6 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti
return count, err
}

// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)

criteria := &NodeCriteria{
OnlineWindow: service.config.Node.OnlineWindow,
ExcludedCountries: service.config.RepairExcludedCountryCodes,
}
return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds)
}

// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []nodeselection.SelectedNode, offlineNodes []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -747,28 +733,6 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi
return maps.Values(missingPiecesMap), nil
}

// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries.
func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) {
defer mon.Task()(&ctx)(&err)
var nodeIDs storj.NodeIDList
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.StorageNode)
}
inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}

for _, p := range pieces {
for _, nodeID := range inExcluded {
if nodeID == p.StorageNode {
piecesInExcluded = append(piecesInExcluded, p.Number)
}
}
}
return piecesInExcluded, nil
}

// DQNodesLastSeenBefore disqualifies nodes who have not been contacted since the cutoff time.
func (service *Service) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (count int, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
22 changes: 0 additions & 22 deletions satellite/overlay/service_test.go
Expand Up @@ -816,28 +816,6 @@ func TestVetAndUnvetNode(t *testing.T) {
})
}

func TestKnownReliableInExcludedCountries(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].Overlay.Service
node := planet.StorageNodes[0]

onlineNodes, _, err := service.Reliable(ctx)
require.NoError(t, err)
require.Len(t, onlineNodes, 2)

err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR")
require.NoError(t, err)

// first node should be excluded from Reliable result because of country code
nodes, err := service.KnownReliableInExcludedCountries(ctx, storj.NodeIDList{onlineNodes[0].ID, onlineNodes[1].ID})
require.NoError(t, err)
require.Len(t, nodes, 1)
require.Equal(t, node.ID(), nodes[0])
})
}

func TestUpdateReputationNodeEvents(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
Expand Down
7 changes: 5 additions & 2 deletions satellite/rangedloop.go
Expand Up @@ -151,13 +151,16 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}

{ // setup repair
if len(config.Checker.RepairExcludedCountryCodes) == 0 {
config.Checker.RepairExcludedCountryCodes = config.Overlay.RepairExcludedCountryCodes
}

peer.Repair.Observer = checker.NewObserver(
peer.Log.Named("repair:checker"),
peer.DB.RepairQueue(),
peer.Overlay.Service,
config.Checker,
config.Placement.CreateFilters,
config.Overlay.RepairExcludedCountryCodes,
config.Checker,
)
}

Expand Down
9 changes: 5 additions & 4 deletions satellite/repair/checker/config.go
Expand Up @@ -21,10 +21,11 @@ type Config struct {
RepairOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)" releaseDefault:"29/80/110-52,29/80/95-52,29/80/130-52" devDefault:""`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" `
DoDeclumping bool `help:"Treat pieces on the same network as in need of repair" default:"false"`
DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"`
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" `
RepairExcludedCountryCodes []string `help:"list of country codes to treat node from this country as offline " default:"" hidden:"true"`
DoDeclumping bool `help:"Treat pieces on the same network as in need of repair" default:"false"`
DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"`
}

// RepairOverride is a configuration struct that contains an override repair
Expand Down
5 changes: 2 additions & 3 deletions satellite/repair/checker/observer.go
Expand Up @@ -51,13 +51,12 @@ type Observer struct {
}

// NewObserver creates new checker observer instance.
// TODO move excludedCountries into config but share it somehow with segment repairer.
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, config Config, placementRules overlay.PlacementRules, excludedCountries []string) *Observer {
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules overlay.PlacementRules, config Config) *Observer {
return &Observer{
logger: logger,

repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, excludedCountries),
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, config.RepairExcludedCountryCodes),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
Expand Down
2 changes: 1 addition & 1 deletion satellite/repair/checker/observer_test.go
Expand Up @@ -555,7 +555,7 @@ func BenchmarkRemoteSegment(b *testing.B) {
}

observer := checker.NewObserver(zap.NewNop(), planet.Satellites[0].DB.RepairQueue(),
planet.Satellites[0].Auditor.Overlay, planet.Satellites[0].Config.Checker, overlay.NewPlacementRules().CreateFilters, []string{})
planet.Satellites[0].Auditor.Overlay, overlay.NewPlacementRules().CreateFilters, planet.Satellites[0].Config.Checker)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)

Expand Down
1 change: 1 addition & 0 deletions satellite/repair/repairer/repairer.go
Expand Up @@ -36,6 +36,7 @@ type Config struct {
InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"`
ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"`
UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"`
RepairExcludedCountryCodes []string `help:"list of country codes to treat node from this country as offline" default:"" hidden:"true"`
DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"false"`
DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"`
}
Expand Down
40 changes: 26 additions & 14 deletions satellite/repair/repairer/segments.go
Expand Up @@ -18,6 +18,7 @@ import (

"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/sync2"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
Expand Down Expand Up @@ -99,6 +100,8 @@ type SegmentRepairer struct {
// repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairOverrides checker.RepairOverridesMap

excludedCountryCodes map[location.CountryCode]struct{}

nowFn func() time.Time
OnTestingCheckSegmentAlteredHook func()
OnTestingPiecesReportHook func(pieces FetchResultReport)
Expand Down Expand Up @@ -127,6 +130,13 @@ func NewSegmentRepairer(
excessOptimalThreshold = 0
}

excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range config.RepairExcludedCountryCodes {
if cc := location.ToCountryCode(countryCode); cc != location.None {
excludedCountryCodes[cc] = struct{}{}
}
}

return &SegmentRepairer{
log: log,
statsCollector: newStatsCollector(),
Expand All @@ -137,6 +147,7 @@ func NewSegmentRepairer(
timeout: config.Timeout,
multiplierOptimalThreshold: 1 + excessOptimalThreshold,
repairOverrides: repairOverrides.GetMap(),
excludedCountryCodes: excludedCountryCodes,
reporter: reporter,
reputationUpdateEnabled: config.ReputationUpdateEnabled,
doDeclumping: config.DoDeclumping,
Expand Down Expand Up @@ -223,13 +234,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, nil
}

piecesInExcludedCountries, err := repairer.overlay.GetReliablePiecesInExcludedCountries(ctx, pieces)
if err != nil {
return false, overlayQueryError.New("error identifying pieces in excluded countries: %w", err)
}

numHealthyInExcludedCountries := len(piecesInExcludedCountries)

// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked
stats.repairerSegmentsBelowMinReq.Inc(0)
Expand All @@ -248,7 +252,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}

// repair not needed
if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) {
if numHealthy-piecesCheck.NumHealthyInExcludedCountries > int(repairThreshold) {
// remove pieces out of placement without repairing as we are above repair threshold
if len(piecesCheck.OutOfPlacementPiecesSet) > 0 {

Expand Down Expand Up @@ -348,12 +352,12 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
var minSuccessfulNeeded int
{
totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold)
requestCount = int(totalNeeded) + numHealthyInExcludedCountries
requestCount = int(totalNeeded) + piecesCheck.NumHealthyInExcludedCountries
if requestCount > redundancy.TotalCount() {
requestCount = redundancy.TotalCount()
}
requestCount -= numHealthy
minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + numHealthyInExcludedCountries
minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + piecesCheck.NumHealthyInExcludedCountries
}

// Request Overlay for n-h new storage nodes
Expand All @@ -368,7 +372,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}

// Create the order limits for the PUT_REPAIR action
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries)
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, piecesCheck.NumHealthyInExcludedCountries)
if err != nil {
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
}
Expand Down Expand Up @@ -633,7 +637,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
zap.Uint64("Position", segment.Position.Encode()),
zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)),
zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)),
zap.Int("in excluded countries", numHealthyInExcludedCountries),
zap.Int("in excluded countries", piecesCheck.NumHealthyInExcludedCountries),
zap.Int("removed pieces", len(toRemove)),
zap.Int("repaired pieces", len(repairedPieces)),
zap.Int("healthy before repair", numHealthy),
Expand All @@ -648,13 +652,15 @@ type piecesCheckResult struct {
ClumpedPiecesSet map[uint16]bool
OutOfPlacementPiecesSet map[uint16]bool

NumUnhealthyRetrievable int
NumUnhealthyRetrievable int
NumHealthyInExcludedCountries int
}

func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) {
defer mon.Task()(&ctx)(&err)

pieces := segment.Pieces
placement := segment.Placement

allNodeIDs := make([]storj.NodeID, len(pieces))
nodeIDPieceMap := map[storj.NodeID]uint16{}
Expand All @@ -674,6 +680,12 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm

// remove online nodes from missing pieces
for _, onlineNode := range online {
// count online nodes in excluded countries only if country is not excluded by segment
// placement, those nodes will be counted with out of placement check
if _, excluded := repairer.excludedCountryCodes[onlineNode.CountryCode]; excluded && placement.AllowedCountry(onlineNode.CountryCode) {
result.NumHealthyInExcludedCountries++
}

pieceNum := nodeIDPieceMap[onlineNode.ID]
delete(result.MissingPiecesSet, pieceNum)
}
Expand Down Expand Up @@ -705,7 +717,7 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm
}
}

if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry {
if repairer.doPlacementCheck && placement != storj.EveryCountry {
result.OutOfPlacementPiecesSet = map[uint16]bool{}

nodeFilters := repairer.placementRules(segment.Placement)
Expand Down
58 changes: 56 additions & 2 deletions satellite/repair/repairer/segments_test.go
Expand Up @@ -216,7 +216,7 @@ func TestSegmentRepairPlacementAndClumped(t *testing.T) {

func TestSegmentRepairPlacementNotEnoughNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1,
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
Expand Down Expand Up @@ -255,11 +255,65 @@ func TestSegmentRepairPlacementNotEnoughNodes(t *testing.T) {
StreamID: segments[0].StreamID,
Position: segments[0].Position,
})
require.Error(t, err)
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
require.False(t, shouldDelete)
})
}

func TestSegmentRepairPlacementAndExcludedCountries(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(1, 2, 4, 4),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.RepairExcludedCountryCodes = []string{"US"}
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))

_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket",
Placement: storj.EU,
})
require.NoError(t, err)

for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}

err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)

segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)

// change single node to location outside bucket placement and location which is part of RepairExcludedCountryCodes
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, segments[0].Pieces[0].StorageNode, "US"))

require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))

shouldDelete, err := planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
})
require.NoError(t, err)
require.True(t, shouldDelete)

// we are checking that repairer counted only single piece as out of placement and didn't count this piece
// also as from excluded country. That would cause full repair because repairer would count single pieces
// as unhealthy two times. Full repair would restore number of pieces to 4 but we just removed single pieces.
segmentsAfter, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.ElementsMatch(t, segments[0].Pieces[1:], segmentsAfter[0].Pieces)
})
}

func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) {
for _, piece := range pieces {
nodeDossier, err := overaly.Get(ctx, piece.StorageNode)
Expand Down
4 changes: 4 additions & 0 deletions satellite/repairer.go
Expand Up @@ -211,6 +211,10 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
config.Repairer.DownloadTimeout,
config.Repairer.InMemoryRepair)

if len(config.Repairer.RepairExcludedCountryCodes) == 0 {
config.Repairer.RepairExcludedCountryCodes = config.Overlay.RepairExcludedCountryCodes
}

peer.SegmentRepairer = repairer.NewSegmentRepairer(
log.Named("segment-repair"),
metabaseDB,
Expand Down

0 comments on commit 1d62dc6

Please sign in to comment.