Navigation Menu

Skip to content

Commit

Permalink
satellite/repairer: handle excluded countries
Browse files Browse the repository at this point in the history
For nodes in excluded areas, we don't necessarily want to remove them
from the pointer, but we do want to increase the number of pieces in the
segment in case those excluded area nodes go down. To do that, we
increase the number of pieces repaired by the number of pieces in
excluded areas.

Change-Id: I0424f1bcd7e93f33eb3eeeec79dbada3b3ea1f3a
  • Loading branch information
Fadila82 authored and cam-a committed Mar 14, 2022
1 parent 35290d1 commit 29fd36a
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 65 deletions.
14 changes: 14 additions & 0 deletions private/testplanet/reconfigure.go
Expand Up @@ -74,6 +74,20 @@ var ReconfigureRS = func(minThreshold, repairThreshold, successThreshold, totalT
}
}

// RepairExcludedCountryCodes returns function to change satellite repair excluded country codes.
var RepairExcludedCountryCodes = func(repairExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) {
return func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.RepairExcludedCountryCodes = repairExcludedCountryCodes
}
}

// UploadExcludedCountryCodes returns function to change satellite upload excluded country codes.
var UploadExcludedCountryCodes = func(uploadExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) {
return func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.UploadExcludedCountryCodes = uploadExcludedCountryCodes
}
}

// MaxSegmentSize returns function to change satellite max segment size value.
var MaxSegmentSize = func(maxSegmentSize memory.Size) func(log *zap.Logger, index int, config *satellite.Config) {
return func(log *zap.Logger, index int, config *satellite.Config) {
Expand Down
179 changes: 179 additions & 0 deletions satellite/audit/verifier_test.go
Expand Up @@ -5,6 +5,8 @@ package audit_test

import (
"context"
"crypto/rand"
"fmt"
"testing"
"time"

Expand All @@ -21,11 +23,13 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testblobs"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)

Expand Down Expand Up @@ -927,3 +931,178 @@ func TestVerifierUnknownError(t *testing.T) {
assert.Equal(t, report.Unknown[0], badNode.ID())
})
}

func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 20,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = true
},
testplanet.ReconfigureRS(3, 5, 8, 10),
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()

satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()

var testData = testrand.Bytes(8 * memory.KiB)
bucket := "testbucket"
// first, upload some remote data
err := uplinkPeer.Upload(ctx, satellite, bucket, "test/path", testData)
require.NoError(t, err)

segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, bucket)

remotePieces := segment.Pieces

numExcluded := 5
var nodesInExcluded storj.NodeIDList
for i := 0; i < numExcluded; i++ {
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
require.NoError(t, err)
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
}

// make extra pieces after optimal bad
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
require.NoError(t, err)
}

// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()

count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Equal(t, 1, count)

satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs()

// Verify that the segment was removed
count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Zero(t, count)

// Verify the segment has been repaired
segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, bucket)
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
require.Equal(t, 10, len(segmentAfterRepair.Pieces))

// check excluded area nodes still exist
for i, n := range nodesInExcluded {
var found bool
for _, p := range segmentAfterRepair.Pieces {
if p.StorageNode == n {
found = true
break
}
}
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
}
nodesInPointer := make(map[storj.NodeID]bool)
for _, n := range segmentAfterRepair.Pieces {
// check for duplicates
_, ok := nodesInPointer[n.StorageNode]
require.False(t, ok)
nodesInPointer[n.StorageNode] = true
}

lastPieceIndex := segmentAfterRepair.Pieces.Len() - 1
lastPiece := segmentAfterRepair.Pieces[lastPieceIndex]
for _, n := range planet.StorageNodes {
if n.ID() == lastPiece.StorageNode {
pieceID := segmentAfterRepair.RootPieceID.Derive(n.ID(), int32(lastPiece.Number))
corruptPieceData(ctx, t, planet, n, pieceID)
}
}

// now audit
report, err := satellite.Audit.Verifier.Verify(ctx, audit.Segment{
StreamID: segmentAfterRepair.StreamID,
Position: segmentAfterRepair.Position,
ExpiresAt: segmentAfterRepair.ExpiresAt,
EncryptedSize: segmentAfterRepair.EncryptedSize,
}, nil)
require.NoError(t, err)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], lastPiece.StorageNode)
})
}

// getRemoteSegment returns a remote pointer its path from satellite.
// nolint:golint
func getRemoteSegment(
ctx context.Context, t *testing.T, satellite *testplanet.Satellite, projectID uuid.UUID, bucketName string,
) (_ metabase.Segment, key metabase.SegmentKey) {
t.Helper()

objects, err := satellite.Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
require.Len(t, objects, 1)

segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.False(t, segments[0].Inline())

return segments[0], metabase.SegmentLocation{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: objects[0].ObjectKey,
Position: segments[0].Position,
}.Encode()
}

// corruptPieceData manipulates piece data on a storage node.
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) {
t.Helper()

blobRef := storage.BlobRef{
Namespace: planet.Satellites[0].ID().Bytes(),
Key: corruptedPieceID.Bytes(),
}

// get currently stored piece data from storagenode
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
require.NoError(t, err)
pieceSize, err := reader.Size()
require.NoError(t, err)
require.True(t, pieceSize > 0)
pieceData := make([]byte, pieceSize)

// delete piece data
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
require.NoError(t, err)

// create new random data
_, err = rand.Read(pieceData)
require.NoError(t, err)

// corrupt piece data (not PieceHeader) and write back to storagenode
// this means repair downloading should fail during piece hash verification
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
require.NoError(t, err)

n, err := writer.Write(pieceData)
require.NoError(t, err)
require.EqualValues(t, n, pieceSize)

err = writer.Commit(ctx)
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions satellite/nodeselection/uploadselection/criteria.go
Expand Up @@ -34,6 +34,9 @@ func (c *Criteria) MatchInclude(node *Node) bool {
}

for _, code := range c.ExcludedCountryCodes {
if code.String() == "" {
continue
}
if node.CountryCode == code {
return false
}
Expand Down
5 changes: 3 additions & 2 deletions satellite/orders/service.go
Expand Up @@ -434,7 +434,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
}

// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)

// Create the order limits for being used to upload the repaired pieces
Expand All @@ -445,7 +445,8 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)

totalPieces := redundancy.TotalCount()
totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold()) * optimalThresholdMultiplier))
totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold())*optimalThresholdMultiplier)) + numPiecesInExcludedCountries

if totalPiecesAfterRepair > totalPieces {
totalPiecesAfterRepair = totalPieces
}
Expand Down
37 changes: 36 additions & 1 deletion satellite/overlay/service.go
Expand Up @@ -60,6 +60,8 @@ type DB interface {
KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
// Reliable returns all nodes that are reliable
Expand Down Expand Up @@ -489,6 +491,17 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st
return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
}

// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)

criteria := &NodeCriteria{
OnlineWindow: service.config.Node.OnlineWindow,
ExcludedCountries: service.config.RepairExcludedCountryCodes,
}
return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds)
}

// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -599,7 +612,7 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
return nil
}

// GetMissingPieces returns the list of offline nodes.
// GetMissingPieces returns the list of offline nodes and the corresponding pieces.
func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) {
defer mon.Task()(&ctx)(&err)
var nodeIDs storj.NodeIDList
Expand All @@ -621,6 +634,28 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi
return missingPieces, nil
}

// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries.
func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) {
defer mon.Task()(&ctx)(&err)
var nodeIDs storj.NodeIDList
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.StorageNode)
}
inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}

for _, p := range pieces {
for _, nodeID := range inExcluded {
if nodeID == p.StorageNode {
piecesInExcluded = append(piecesInExcluded, p.Number)
}
}
}
return piecesInExcluded, nil
}

// DisqualifyNode disqualifies a storage node.
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
22 changes: 22 additions & 0 deletions satellite/overlay/service_test.go
Expand Up @@ -845,3 +845,25 @@ func TestReliable(t *testing.T) {
require.NotEqual(t, node.ID(), nodes[0])
})
}

func TestKnownReliableInExcludedCountries(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].Overlay.Service
node := planet.StorageNodes[0]

nodes, err := service.Reliable(ctx)
require.NoError(t, err)
require.Len(t, nodes, 2)

err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR")
require.NoError(t, err)

// first node should be excluded from Reliable result because of country code
nodes, err = service.KnownReliableInExcludedCountries(ctx, nodes)
require.NoError(t, err)
require.Len(t, nodes, 1)
require.Equal(t, node.ID(), nodes[0])
})
}

0 comments on commit 29fd36a

Please sign in to comment.