diff --git a/private/testplanet/reconfigure.go b/private/testplanet/reconfigure.go index 2fbd1399bf74..186d28a20dca 100644 --- a/private/testplanet/reconfigure.go +++ b/private/testplanet/reconfigure.go @@ -74,6 +74,20 @@ var ReconfigureRS = func(minThreshold, repairThreshold, successThreshold, totalT } } +// RepairExcludedCountryCodes returns function to change satellite repair excluded country codes. +var RepairExcludedCountryCodes = func(repairExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) { + return func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.RepairExcludedCountryCodes = repairExcludedCountryCodes + } +} + +// UploadExcludedCountryCodes returns function to change satellite upload excluded country codes. +var UploadExcludedCountryCodes = func(uploadExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) { + return func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.UploadExcludedCountryCodes = uploadExcludedCountryCodes + } +} + // MaxSegmentSize returns function to change satellite max segment size value. var MaxSegmentSize = func(maxSegmentSize memory.Size) func(log *zap.Logger, index int, config *satellite.Config) { return func(log *zap.Logger, index int, config *satellite.Config) { diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index bc02f5bbd9a9..6a1659c71ba5 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -5,6 +5,8 @@ package audit_test import ( "context" + "crypto/rand" + "fmt" "testing" "time" @@ -21,11 +23,13 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/common/uuid" "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" + "storj.io/storj/storage" "storj.io/storj/storagenode" ) @@ -927,3 +931,178 @@ func TestVerifierUnknownError(t *testing.T) { assert.Equal(t, report.Unknown[0], badNode.ID()) }) } + +func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + bucket := "testbucket" + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, bucket, "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, bucket) + + remotePieces := segment.Pieces + + numExcluded := 5 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + + // make extra pieces after optimal bad + for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, bucket) + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + + lastPieceIndex := segmentAfterRepair.Pieces.Len() - 1 + lastPiece := segmentAfterRepair.Pieces[lastPieceIndex] + for _, n := range planet.StorageNodes { + if n.ID() == lastPiece.StorageNode { + pieceID := segmentAfterRepair.RootPieceID.Derive(n.ID(), int32(lastPiece.Number)) + corruptPieceData(ctx, t, planet, n, pieceID) + } + } + + // now audit + report, err := satellite.Audit.Verifier.Verify(ctx, audit.Segment{ + StreamID: segmentAfterRepair.StreamID, + Position: segmentAfterRepair.Position, + ExpiresAt: segmentAfterRepair.ExpiresAt, + EncryptedSize: segmentAfterRepair.EncryptedSize, + }, nil) + require.NoError(t, err) + require.Len(t, report.Fails, 1) + require.Equal(t, report.Fails[0], lastPiece.StorageNode) + }) +} + +// getRemoteSegment returns a remote pointer its path from satellite. +// nolint:golint +func getRemoteSegment( + ctx context.Context, t *testing.T, satellite *testplanet.Satellite, projectID uuid.UUID, bucketName string, +) (_ metabase.Segment, key metabase.SegmentKey) { + t.Helper() + + objects, err := satellite.Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, 1) + + segments, err := satellite.Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.False(t, segments[0].Inline()) + + return segments[0], metabase.SegmentLocation{ + ProjectID: projectID, + BucketName: bucketName, + ObjectKey: objects[0].ObjectKey, + Position: segments[0].Position, + }.Encode() +} + +// corruptPieceData manipulates piece data on a storage node. +func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) { + t.Helper() + + blobRef := storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: corruptedPieceID.Bytes(), + } + + // get currently stored piece data from storagenode + reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef) + require.NoError(t, err) + pieceSize, err := reader.Size() + require.NoError(t, err) + require.True(t, pieceSize > 0) + pieceData := make([]byte, pieceSize) + + // delete piece data + err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef) + require.NoError(t, err) + + // create new random data + _, err = rand.Read(pieceData) + require.NoError(t, err) + + // corrupt piece data (not PieceHeader) and write back to storagenode + // this means repair downloading should fail during piece hash verification + pieceData[pieceSize-1]++ // if we don't do this, this test should fail + writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize) + require.NoError(t, err) + + n, err := writer.Write(pieceData) + require.NoError(t, err) + require.EqualValues(t, n, pieceSize) + + err = writer.Commit(ctx) + require.NoError(t, err) +} diff --git a/satellite/nodeselection/uploadselection/criteria.go b/satellite/nodeselection/uploadselection/criteria.go index bd94f893c50d..d1f477959cfa 100644 --- a/satellite/nodeselection/uploadselection/criteria.go +++ b/satellite/nodeselection/uploadselection/criteria.go @@ -34,6 +34,9 @@ func (c *Criteria) MatchInclude(node *Node) bool { } for _, code := range c.ExcludedCountryCodes { + if code.String() == "" { + continue + } if node.CountryCode == code { return false } diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 2060ee7fd62b..1a4265ce6d26 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -434,7 +434,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m } // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes. -func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { +func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) // Create the order limits for being used to upload the repaired pieces @@ -445,7 +445,8 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) totalPieces := redundancy.TotalCount() - totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold()) * optimalThresholdMultiplier)) + totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold())*optimalThresholdMultiplier)) + numPiecesInExcludedCountries + if totalPiecesAfterRepair > totalPieces { totalPiecesAfterRepair = totalPieces } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index b4e75e691b53..0e74fa488f6b 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -60,6 +60,8 @@ type DB interface { KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) + // KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. + KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error) // Reliable returns all nodes that are reliable @@ -489,6 +491,17 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) } +// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. +func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + defer mon.Task()(&ctx)(&err) + + criteria := &NodeCriteria{ + OnlineWindow: service.config.Node.OnlineWindow, + ExcludedCountries: service.config.RepairExcludedCountryCodes, + } + return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds) +} + // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { defer mon.Task()(&ctx)(&err) @@ -599,7 +612,7 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, return nil } -// GetMissingPieces returns the list of offline nodes. +// GetMissingPieces returns the list of offline nodes and the corresponding pieces. func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) { defer mon.Task()(&ctx)(&err) var nodeIDs storj.NodeIDList @@ -621,6 +634,28 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi return missingPieces, nil } +// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries. +func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) { + defer mon.Task()(&ctx)(&err) + var nodeIDs storj.NodeIDList + for _, p := range pieces { + nodeIDs = append(nodeIDs, p.StorageNode) + } + inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs) + if err != nil { + return nil, Error.New("error getting nodes %s", err) + } + + for _, p := range pieces { + for _, nodeID := range inExcluded { + if nodeID == p.StorageNode { + piecesInExcluded = append(piecesInExcluded, p.Number) + } + } + } + return piecesInExcluded, nil +} + // DisqualifyNode disqualifies a storage node. func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 644f086bb7f7..e9ba1ee33ff4 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -845,3 +845,25 @@ func TestReliable(t *testing.T) { require.NotEqual(t, node.ID(), nodes[0]) }) } + +func TestKnownReliableInExcludedCountries(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + service := planet.Satellites[0].Overlay.Service + node := planet.StorageNodes[0] + + nodes, err := service.Reliable(ctx) + require.NoError(t, err) + require.Len(t, nodes, 2) + + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR") + require.NoError(t, err) + + // first node should be excluded from Reliable result because of country code + nodes, err = service.KnownReliableInExcludedCountries(ctx, nodes) + require.NoError(t, err) + require.Len(t, nodes, 1) + require.Equal(t, node.ID(), nodes[0]) + }) +} diff --git a/satellite/repair/checker/checker_test.go b/satellite/repair/checker/checker_test.go index a30b653b2a39..8c428c89cae4 100644 --- a/satellite/repair/checker/checker_test.go +++ b/satellite/repair/checker/checker_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" - "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -84,61 +83,6 @@ func TestIdentifyInjuredSegments(t *testing.T) { }) } -func TestInjuredsSegmentWhenPiecesAreInExcludedCountries(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(2, 3, 4, 4), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - checker := planet.Satellites[0].Repair.Checker - - checker.Loop.Pause() - planet.Satellites[0].Repair.Repairer.Loop.Pause() - - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "key", testrand.Bytes(5*memory.KiB)) - require.NoError(t, err) - - objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 1) - - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.False(t, segments[0].Inline()) - - err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[0].ID(), "FR") - require.NoError(t, err) - err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[1].ID(), "FR") - require.NoError(t, err) - - checker.Loop.TriggerWait() - - // check that the healthy segments was added to repair queue - // because of part of nodes have country code value on exclude - // list - count, err := planet.Satellites[0].DB.RepairQueue().Count(ctx) - require.NoError(t, err) - require.Equal(t, 1, count) - - // trigger checker to add segment to repair queue - planet.Satellites[0].Repair.Repairer.Loop.Restart() - planet.Satellites[0].Repair.Repairer.Loop.TriggerWait() - planet.Satellites[0].Repair.Repairer.Loop.Pause() - planet.Satellites[0].Repair.Repairer.WaitForPendingRepairs() - - count, err = planet.Satellites[0].DB.RepairQueue().Count(ctx) - require.NoError(t, err) - require.Equal(t, 0, count) - - segmentsAfterRepair, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - - require.Equal(t, segments[0].Pieces, segmentsAfterRepair[0].Pieces) - }) -} - func TestIdentifyIrreparableSegments(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1, diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 4148dc0e2a97..65ac45ad03c4 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -3076,3 +3076,185 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { require.NotContains(t, mock.addressesDialed, realAddresses) }) } + +func TestSegmentInExcludedCountriesRepair(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.Equal(t, 3, int(segment.Redundancy.RequiredShares)) + + remotePieces := segment.Pieces + + numExcluded := 5 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + // make extra pieces after optimal bad + for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + }) +} + +func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.Equal(t, 3, int(segment.Redundancy.RequiredShares)) + + remotePieces := segment.Pieces + + numExcluded := 6 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + // make the rest unhealthy + for i := numExcluded; i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + }) +} diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index d0e619ec012a..5d5eeccffbfa 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -189,6 +189,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, nil } + piecesInExcludedCountries, err := repairer.overlay.GetReliablePiecesInExcludedCountries(ctx, pieces) + if err != nil { + return false, overlayQueryError.New("error identifying pieces in excluded countries: %w", err) + } + + numHealthyInExcludedCountries := len(piecesInExcludedCountries) + // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(0) @@ -207,7 +214,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // repair not needed - if numHealthy > int(repairThreshold) { + if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold)) @@ -268,8 +275,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue var minSuccessfulNeeded int { totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold) - requestCount = int(totalNeeded) - len(healthyPieces) - minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + requestCount = int(totalNeeded) - len(healthyPieces) + numHealthyInExcludedCountries + minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + numHealthyInExcludedCountries } // Request Overlay for n-h new storage nodes @@ -283,7 +290,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Create the order limits for the PUT_REPAIR action - putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold) + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries) if err != nil { return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 49086a507235..6ccbda065966 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -405,6 +405,80 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri return badNodes, err } +// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. +func (cache *overlaycache) KnownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + for { + reliableInExcluded, err = cache.knownReliableInExcludedCountries(ctx, criteria, nodeIDs) + if err != nil { + if cockroachutil.NeedsRetry(err) { + continue + } + return reliableInExcluded, err + } + break + } + + return reliableInExcluded, err +} + +func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + defer mon.Task()(&ctx)(&err) + + if len(nodeIDs) == 0 { + return nil, Error.New("no ids provided") + } + + args := []interface{}{ + pgutil.NodeIDArray(nodeIDs), + time.Now().Add(-criteria.OnlineWindow), + } + + // When this config is not set, it's a string slice with one empty string. This is a sanity check just + // in case for some reason it's nil or has no elements. + if criteria.ExcludedCountries == nil || len(criteria.ExcludedCountries) == 0 { + return reliableInExcluded, nil + } + + var excludedCountriesCondition string + if criteria.ExcludedCountries[0] == "" { + return reliableInExcluded, nil + } + + excludedCountriesCondition = "AND country_code IN (SELECT UNNEST($3::TEXT[]))" + args = append(args, pgutil.TextArray(criteria.ExcludedCountries)) + + // get reliable and online nodes + var rows tagsql.Rows + rows, err = cache.db.Query(ctx, cache.db.Rebind(` + SELECT id + FROM nodes + `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` + WHERE id = any($1::bytea[]) + AND disqualified IS NULL + AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL + AND exit_finished_at IS NULL + AND last_contact_success > $2 + `+excludedCountriesCondition+` + `), args..., + ) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + for rows.Next() { + var id storj.NodeID + err = rows.Scan(&id) + if err != nil { + return nil, err + } + reliableInExcluded = append(reliableInExcluded, id) + } + + return reliableInExcluded, Error.Wrap(rows.Err()) +} + func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) @@ -520,6 +594,18 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC } func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { + args := []interface{}{ + time.Now().Add(-criteria.OnlineWindow), + } + + // When this config is not set, it's a string slice with one empty string. I added some sanity checks to make sure we don't + // dereference a nil pointer or index an element that doesn't exist. + var excludedCountriesCondition string + if criteria.ExcludedCountries != nil && len(criteria.ExcludedCountries) != 0 && criteria.ExcludedCountries[0] != "" { + excludedCountriesCondition = "AND country_code NOT IN (SELECT UNNEST($2::TEXT[]))" + args = append(args, pgutil.TextArray(criteria.ExcludedCountries)) + } + // get reliable and online nodes rows, err := cache.db.Query(ctx, cache.db.Rebind(` SELECT id @@ -530,8 +616,8 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC AND offline_suspended IS NULL AND exit_finished_at IS NULL AND last_contact_success > $1 - AND country_code NOT IN (SELECT UNNEST($2::TEXT[])) - `), time.Now().Add(-criteria.OnlineWindow), pgutil.TextArray(criteria.ExcludedCountries)) + `+excludedCountriesCondition+` + `), args...) if err != nil { return nil, err }