From 1d62dc63f5f8fbad3a0648c5106be5f64f17dd73 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 30 Jun 2023 11:02:01 +0200 Subject: [PATCH] satellite/repair/repairer: fix NumHealthyInExcludedCountries calculation Currently, we have issue were while counting unhealthy pieces we are counting twice piece which is in excluded country and is outside segment placement. This can cause unnecessary repair. This change is also doing another step to move RepairExcludedCountryCodes from overlay config into repair package. Change-Id: I3692f6e0ddb9982af925db42be23d644aec1963f --- satellite/metabase/rangedloop/service_test.go | 3 +- satellite/overlay/service.go | 36 --------- satellite/overlay/service_test.go | 22 ------ satellite/rangedloop.go | 7 +- satellite/repair/checker/config.go | 9 ++- satellite/repair/checker/observer.go | 5 +- satellite/repair/checker/observer_test.go | 2 +- satellite/repair/repairer/repairer.go | 1 + satellite/repair/repairer/segments.go | 40 ++++++---- satellite/repair/repairer/segments_test.go | 58 ++++++++++++++- satellite/repairer.go | 4 + satellite/satellitedb/overlaycache.go | 74 ------------------- 12 files changed, 101 insertions(+), 160 deletions(-) diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index f92497ef04a4..f64463b72d21 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -426,9 +426,8 @@ func TestAllInOne(t *testing.T) { log.Named("repair:checker"), satellite.DB.RepairQueue(), satellite.Overlay.Service, - satellite.Config.Checker, overlay.NewPlacementRules().CreateFilters, - []string{}, + satellite.Config.Checker, ), }) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 0a75b65e053e..7905120744e1 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -62,8 +62,6 @@ type DB interface { // Get looks up the node by nodeID Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error) - // KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. - KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) // Reliable returns all nodes that are reliable (separated by whether they are currently online or offline). @@ -199,7 +197,6 @@ type NodeCriteria struct { MinimumVersion string // semver or empty OnlineWindow time.Duration AsOfSystemInterval time.Duration // only used for CRDB queries - ExcludedCountries []string } // ReputationStatus indicates current reputation status for a node. @@ -540,17 +537,6 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti return count, err } -// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. -func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - - criteria := &NodeCriteria{ - OnlineWindow: service.config.Node.OnlineWindow, - ExcludedCountries: service.config.RepairExcludedCountryCodes, - } - return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds) -} - // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []nodeselection.SelectedNode, offlineNodes []nodeselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) @@ -747,28 +733,6 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi return maps.Values(missingPiecesMap), nil } -// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries. -func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) { - defer mon.Task()(&ctx)(&err) - var nodeIDs storj.NodeIDList - for _, p := range pieces { - nodeIDs = append(nodeIDs, p.StorageNode) - } - inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs) - if err != nil { - return nil, Error.New("error getting nodes %s", err) - } - - for _, p := range pieces { - for _, nodeID := range inExcluded { - if nodeID == p.StorageNode { - piecesInExcluded = append(piecesInExcluded, p.Number) - } - } - } - return piecesInExcluded, nil -} - // DQNodesLastSeenBefore disqualifies nodes who have not been contacted since the cutoff time. func (service *Service) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (count int, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index cf6232961a60..99efb0d7d167 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -816,28 +816,6 @@ func TestVetAndUnvetNode(t *testing.T) { }) } -func TestKnownReliableInExcludedCountries(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - service := planet.Satellites[0].Overlay.Service - node := planet.StorageNodes[0] - - onlineNodes, _, err := service.Reliable(ctx) - require.NoError(t, err) - require.Len(t, onlineNodes, 2) - - err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR") - require.NoError(t, err) - - // first node should be excluded from Reliable result because of country code - nodes, err := service.KnownReliableInExcludedCountries(ctx, storj.NodeIDList{onlineNodes[0].ID, onlineNodes[1].ID}) - require.NoError(t, err) - require.Len(t, nodes, 1) - require.Equal(t, node.ID(), nodes[0]) - }) -} - func TestUpdateReputationNodeEvents(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index 2150d5e4ba20..b86c4fef27f0 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -151,13 +151,16 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf } { // setup repair + if len(config.Checker.RepairExcludedCountryCodes) == 0 { + config.Checker.RepairExcludedCountryCodes = config.Overlay.RepairExcludedCountryCodes + } + peer.Repair.Observer = checker.NewObserver( peer.Log.Named("repair:checker"), peer.DB.RepairQueue(), peer.Overlay.Service, - config.Checker, config.Placement.CreateFilters, - config.Overlay.RepairExcludedCountryCodes, + config.Checker, ) } diff --git a/satellite/repair/checker/config.go b/satellite/repair/checker/config.go index 78b49471782c..c5b7f2c69703 100644 --- a/satellite/repair/checker/config.go +++ b/satellite/repair/checker/config.go @@ -21,10 +21,11 @@ type Config struct { RepairOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)" releaseDefault:"29/80/110-52,29/80/95-52,29/80/130-52" devDefault:""` // Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day. // This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration. - NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" ` - RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" ` - DoDeclumping bool `help:"Treat pieces on the same network as in need of repair" default:"false"` - DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"` + NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" ` + RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" ` + RepairExcludedCountryCodes []string `help:"list of country codes to treat node from this country as offline " default:"" hidden:"true"` + DoDeclumping bool `help:"Treat pieces on the same network as in need of repair" default:"false"` + DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"` } // RepairOverride is a configuration struct that contains an override repair diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index 97bcd0a66d6e..e942c87141f2 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -51,13 +51,12 @@ type Observer struct { } // NewObserver creates new checker observer instance. -// TODO move excludedCountries into config but share it somehow with segment repairer. -func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, config Config, placementRules overlay.PlacementRules, excludedCountries []string) *Observer { +func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules overlay.PlacementRules, config Config) *Observer { return &Observer{ logger: logger, repairQueue: repairQueue, - nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, excludedCountries), + nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, config.RepairExcludedCountryCodes), overlayService: overlay, repairOverrides: config.RepairOverrides.GetMap(), nodeFailureRate: config.NodeFailureRate, diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index ed58b1f83448..9c9b6e20dc28 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -555,7 +555,7 @@ func BenchmarkRemoteSegment(b *testing.B) { } observer := checker.NewObserver(zap.NewNop(), planet.Satellites[0].DB.RepairQueue(), - planet.Satellites[0].Auditor.Overlay, planet.Satellites[0].Config.Checker, overlay.NewPlacementRules().CreateFilters, []string{}) + planet.Satellites[0].Auditor.Overlay, overlay.NewPlacementRules().CreateFilters, planet.Satellites[0].Config.Checker) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(b, err) diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index 971ca39e79f4..21a3d9787ec2 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -36,6 +36,7 @@ type Config struct { InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"` ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"` UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"` + RepairExcludedCountryCodes []string `help:"list of country codes to treat node from this country as offline" default:"" hidden:"true"` DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"false"` DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"` } diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index cd534397a160..813172418a94 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -18,6 +18,7 @@ import ( "storj.io/common/pb" "storj.io/common/storj" + "storj.io/common/storj/location" "storj.io/common/sync2" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" @@ -99,6 +100,8 @@ type SegmentRepairer struct { // repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes. repairOverrides checker.RepairOverridesMap + excludedCountryCodes map[location.CountryCode]struct{} + nowFn func() time.Time OnTestingCheckSegmentAlteredHook func() OnTestingPiecesReportHook func(pieces FetchResultReport) @@ -127,6 +130,13 @@ func NewSegmentRepairer( excessOptimalThreshold = 0 } + excludedCountryCodes := make(map[location.CountryCode]struct{}) + for _, countryCode := range config.RepairExcludedCountryCodes { + if cc := location.ToCountryCode(countryCode); cc != location.None { + excludedCountryCodes[cc] = struct{}{} + } + } + return &SegmentRepairer{ log: log, statsCollector: newStatsCollector(), @@ -137,6 +147,7 @@ func NewSegmentRepairer( timeout: config.Timeout, multiplierOptimalThreshold: 1 + excessOptimalThreshold, repairOverrides: repairOverrides.GetMap(), + excludedCountryCodes: excludedCountryCodes, reporter: reporter, reputationUpdateEnabled: config.ReputationUpdateEnabled, doDeclumping: config.DoDeclumping, @@ -223,13 +234,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, nil } - piecesInExcludedCountries, err := repairer.overlay.GetReliablePiecesInExcludedCountries(ctx, pieces) - if err != nil { - return false, overlayQueryError.New("error identifying pieces in excluded countries: %w", err) - } - - numHealthyInExcludedCountries := len(piecesInExcludedCountries) - // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(0) @@ -248,7 +252,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // repair not needed - if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { + if numHealthy-piecesCheck.NumHealthyInExcludedCountries > int(repairThreshold) { // remove pieces out of placement without repairing as we are above repair threshold if len(piecesCheck.OutOfPlacementPiecesSet) > 0 { @@ -348,12 +352,12 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue var minSuccessfulNeeded int { totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold) - requestCount = int(totalNeeded) + numHealthyInExcludedCountries + requestCount = int(totalNeeded) + piecesCheck.NumHealthyInExcludedCountries if requestCount > redundancy.TotalCount() { requestCount = redundancy.TotalCount() } requestCount -= numHealthy - minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + numHealthyInExcludedCountries + minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + piecesCheck.NumHealthyInExcludedCountries } // Request Overlay for n-h new storage nodes @@ -368,7 +372,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Create the order limits for the PUT_REPAIR action - putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries) + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, piecesCheck.NumHealthyInExcludedCountries) if err != nil { return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) } @@ -633,7 +637,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue zap.Uint64("Position", segment.Position.Encode()), zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)), zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)), - zap.Int("in excluded countries", numHealthyInExcludedCountries), + zap.Int("in excluded countries", piecesCheck.NumHealthyInExcludedCountries), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), zap.Int("healthy before repair", numHealthy), @@ -648,13 +652,15 @@ type piecesCheckResult struct { ClumpedPiecesSet map[uint16]bool OutOfPlacementPiecesSet map[uint16]bool - NumUnhealthyRetrievable int + NumUnhealthyRetrievable int + NumHealthyInExcludedCountries int } func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) { defer mon.Task()(&ctx)(&err) pieces := segment.Pieces + placement := segment.Placement allNodeIDs := make([]storj.NodeID, len(pieces)) nodeIDPieceMap := map[storj.NodeID]uint16{} @@ -674,6 +680,12 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm // remove online nodes from missing pieces for _, onlineNode := range online { + // count online nodes in excluded countries only if country is not excluded by segment + // placement, those nodes will be counted with out of placement check + if _, excluded := repairer.excludedCountryCodes[onlineNode.CountryCode]; excluded && placement.AllowedCountry(onlineNode.CountryCode) { + result.NumHealthyInExcludedCountries++ + } + pieceNum := nodeIDPieceMap[onlineNode.ID] delete(result.MissingPiecesSet, pieceNum) } @@ -705,7 +717,7 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm } } - if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { + if repairer.doPlacementCheck && placement != storj.EveryCountry { result.OutOfPlacementPiecesSet = map[uint16]bool{} nodeFilters := repairer.placementRules(segment.Placement) diff --git a/satellite/repair/repairer/segments_test.go b/satellite/repair/repairer/segments_test.go index 50c676d67500..532d993e16f0 100644 --- a/satellite/repair/repairer/segments_test.go +++ b/satellite/repair/repairer/segments_test.go @@ -216,7 +216,7 @@ func TestSegmentRepairPlacementAndClumped(t *testing.T) { func TestSegmentRepairPlacementNotEnoughNodes(t *testing.T) { testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1, + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.ReconfigureRS(1, 2, 4, 4), }, @@ -255,11 +255,65 @@ func TestSegmentRepairPlacementNotEnoughNodes(t *testing.T) { StreamID: segments[0].StreamID, Position: segments[0].Position, }) - require.Error(t, err) + require.True(t, overlay.ErrNotEnoughNodes.Has(err)) require.False(t, shouldDelete) }) } +func TestSegmentRepairPlacementAndExcludedCountries(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(1, 2, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.RepairExcludedCountryCodes = []string{"US"} + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + Name: "testbucket", + Placement: storj.EU, + }) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, 4) + + // change single node to location outside bucket placement and location which is part of RepairExcludedCountryCodes + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, segments[0].Pieces[0].StorageNode, "US")) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + shouldDelete, err := planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + require.True(t, shouldDelete) + + // we are checking that repairer counted only single piece as out of placement and didn't count this piece + // also as from excluded country. That would cause full repair because repairer would count single pieces + // as unhealthy two times. Full repair would restore number of pieces to 4 but we just removed single pieces. + segmentsAfter, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.ElementsMatch(t, segments[0].Pieces[1:], segmentsAfter[0].Pieces) + }) +} + func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) { for _, piece := range pieces { nodeDossier, err := overaly.Get(ctx, piece.StorageNode) diff --git a/satellite/repairer.go b/satellite/repairer.go index 6a97e9a3dbe4..2a6efd622c27 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -211,6 +211,10 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, config.Repairer.DownloadTimeout, config.Repairer.InMemoryRepair) + if len(config.Repairer.RepairExcludedCountryCodes) == 0 { + config.Repairer.RepairExcludedCountryCodes = config.Overlay.RepairExcludedCountryCodes + } + peer.SegmentRepairer = repairer.NewSegmentRepairer( log.Named("segment-repair"), metabaseDB, diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 50ba4f77ac24..c80b5153b4e1 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -386,80 +386,6 @@ func (cache *overlaycache) UpdateLastOfflineEmail(ctx context.Context, nodeIDs s return err } -// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. -func (cache *overlaycache) KnownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { - for { - reliableInExcluded, err = cache.knownReliableInExcludedCountries(ctx, criteria, nodeIDs) - if err != nil { - if cockroachutil.NeedsRetry(err) { - continue - } - return reliableInExcluded, err - } - break - } - - return reliableInExcluded, err -} - -func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - - if len(nodeIDs) == 0 { - return nil, Error.New("no ids provided") - } - - args := []interface{}{ - pgutil.NodeIDArray(nodeIDs), - time.Now().Add(-criteria.OnlineWindow), - } - - // When this config is not set, it's a string slice with one empty string. This is a sanity check just - // in case for some reason it's nil or has no elements. - if criteria.ExcludedCountries == nil || len(criteria.ExcludedCountries) == 0 { - return reliableInExcluded, nil - } - - var excludedCountriesCondition string - if criteria.ExcludedCountries[0] == "" { - return reliableInExcluded, nil - } - - excludedCountriesCondition = "AND country_code IN (SELECT UNNEST($3::TEXT[]))" - args = append(args, pgutil.TextArray(criteria.ExcludedCountries)) - - // get reliable and online nodes - var rows tagsql.Rows - rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT id - FROM nodes - `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` - WHERE id = any($1::bytea[]) - AND disqualified IS NULL - AND unknown_audit_suspended IS NULL - AND offline_suspended IS NULL - AND exit_finished_at IS NULL - AND last_contact_success > $2 - `+excludedCountriesCondition+` - `), args..., - ) - if err != nil { - return nil, err - } - defer func() { err = errs.Combine(err, rows.Close()) }() - - for rows.Next() { - var id storj.NodeID - err = rows.Scan(&id) - if err != nil { - return nil, err - } - reliableInExcluded = append(reliableInExcluded, id) - } - - return reliableInExcluded, Error.Wrap(rows.Err()) -} - // KnownReliable filters a set of nodes to reliable nodes. List is split into online and offline nodes. func (cache *overlaycache) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) { for {