Skip to content

Commit

Permalink
satellite/gc/sender: avoid sending BF to disqualified and exited nodes
Browse files Browse the repository at this point in the history
We don't want to waste our time on disqualified and exited nodes.

Change-Id: I11709350ad291c24f3b46670dd6a418c0ddbb44f
  • Loading branch information
mniewrzal authored and Storj Robot committed Nov 29, 2022
1 parent ef4b564 commit 75b77d5
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
5 changes: 5 additions & 0 deletions satellite/gc/sender/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *inter
return Error.Wrap(err)
}

// avoid sending bloom filters to disqualified and exited nodes
if dossier.Disqualified != nil || dossier.ExitStatus.ExitSuccess {
return nil
}

if service.Config.RetainSendTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, service.Config.RetainSendTimeout)
Expand Down
62 changes: 62 additions & 0 deletions satellite/gc/sender/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/uplink"
)
Expand Down Expand Up @@ -93,6 +94,67 @@ func TestSendRetainFilters(t *testing.T) {
})
}

func TestSendRetainFiltersDisqualifedNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 2,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 2, 2, 2),
StorageNode: func(index int, config *storagenode.Config) {
// stop processing at storagenode side so it can be inspected
config.Retain.Concurrency = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// Set satellite 1 to store bloom filters of satellite 0
access := planet.Uplinks[0].Access[planet.Satellites[0].NodeURL().ID]
accessString, err := access.Serialize()
require.NoError(t, err)

// configure sender
gcsender := planet.Satellites[0].GarbageCollection.Sender
gcsender.Config.AccessGrant = accessString

// upload 1 piece
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err = upl.Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", testData)
require.NoError(t, err)

// configure filter uploader
config := planet.Satellites[0].Config.GarbageCollectionBF
config.AccessGrant = accessString
config.ZipBatchSize = 2

bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
err = bloomFilterService.RunOnce(ctx)
require.NoError(t, err)

storageNode0 := planet.StorageNodes[0]
err = planet.Satellites[0].Overlay.Service.DisqualifyNode(ctx, storageNode0.ID(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)

storageNode1 := planet.StorageNodes[1]
_, err = planet.Satellites[0].DB.OverlayCache().UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: storageNode1.ID(),
ExitSuccess: true,
})
require.NoError(t, err)

for _, node := range planet.StorageNodes {
require.Zero(t, node.Peer.Storage2.RetainService.HowManyQueued())
}

// send to storagenodes
require.NoError(t, gcsender.RunOnce(ctx))

for _, node := range planet.StorageNodes {
require.Zero(t, node.Peer.Storage2.RetainService.HowManyQueued())
}
})
}

func TestSendInvalidZip(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2,
Expand Down

0 comments on commit 75b77d5

Please sign in to comment.