Skip to content

Commit

Permalink
satellite/overlay: change Reliable and KnownReliable
Browse files Browse the repository at this point in the history
as GetParticipatingNodes and GetNodes, respectively.

We now want these functions to include offline and suspended nodes as
well, so that we can force immediate repair when pieces are out of
placement or in excluded countries. With that change, the old names no
longer made sense.

Change-Id: Icbcbad43dbde0ca8cbc80a4d17a896bb89b078b7
  • Loading branch information
elek authored and Storj Robot committed Sep 2, 2023
1 parent 6896241 commit e2006d8
Show file tree
Hide file tree
Showing 15 changed files with 577 additions and 434 deletions.
9 changes: 8 additions & 1 deletion private/testplanet/uplink_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"storj.io/storj/private/revocation"
"storj.io/storj/private/server"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/nodeselection"
"storj.io/uplink"
"storj.io/uplink/private/metaclient"
)
Expand Down Expand Up @@ -105,8 +106,14 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
}

// confirm that we marked the correct number of storage nodes as offline
online, _, err := satellite.Overlay.Service.Reliable(ctx)
allNodes, err := satellite.Overlay.Service.GetParticipatingNodes(ctx)
require.NoError(t, err)
online := make([]nodeselection.SelectedNode, 0, len(allNodes))
for _, node := range allNodes {
if node.Online {
online = append(online, node)
}
}
require.Len(t, online, len(planet.StorageNodes)-toKill)

// we should be able to download data without any of the original nodes
Expand Down
1 change: 0 additions & 1 deletion satellite/orders/service.go
Expand Up @@ -528,7 +528,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)

// should this use KnownReliable or similar?
node, err := service.overlay.Get(ctx, nodeID)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
Expand Down
13 changes: 10 additions & 3 deletions satellite/overlay/benchmark_test.go
Expand Up @@ -64,12 +64,19 @@ func BenchmarkOverlay(b *testing.B) {
check = append(check, testrand.NodeID())
}

b.Run("KnownReliable", func(b *testing.B) {
b.Run("GetNodes", func(b *testing.B) {
onlineWindow := 1000 * time.Hour
for i := 0; i < b.N; i++ {
online, _, err := overlaydb.KnownReliable(ctx, check, onlineWindow, 0)
selectedNodes, err := overlaydb.GetNodes(ctx, check, onlineWindow, 0)
require.NoError(b, err)
require.Len(b, online, OnlineCount)
require.Len(b, selectedNodes, len(check))
foundOnline := 0
for _, n := range selectedNodes {
if n.Online {
foundOnline++
}
}
require.Equal(b, OnlineCount, foundOnline)
}
})

Expand Down
34 changes: 17 additions & 17 deletions satellite/overlay/selection_test.go
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/exp/slices"

"storj.io/common/identity/testidentity"
"storj.io/common/memory"
Expand Down Expand Up @@ -118,38 +117,39 @@ func TestOnlineOffline(t *testing.T) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service

online, offline, err := service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 1)
require.Len(t, selectedNodes, 1)
require.True(t, selectedNodes[0].Online)

online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 3)
require.Len(t, selectedNodes, 3)
for i := 0; i < 3; i++ {
require.True(t, selectedNodes[i].Online, i)
require.Equal(t, planet.StorageNodes[i].ID(), selectedNodes[i].ID, i)
}

unreliableNodeID := storj.NodeID{1, 2, 3, 4}
online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
unreliableNodeID,
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 2)

require.False(t, slices.ContainsFunc(online, func(node nodeselection.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
require.False(t, slices.ContainsFunc(offline, func(node nodeselection.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
require.Len(t, selectedNodes, 3)
require.True(t, selectedNodes[0].Online)
require.False(t, selectedNodes[1].Online)
require.True(t, selectedNodes[2].Online)
require.Equal(t, planet.StorageNodes[0].ID(), selectedNodes[0].ID)
require.Equal(t, storj.NodeID{}, selectedNodes[1].ID)
require.Equal(t, planet.StorageNodes[2].ID(), selectedNodes[2].ID)
})
}

Expand Down
56 changes: 21 additions & 35 deletions satellite/overlay/service.go
Expand Up @@ -11,15 +11,13 @@ import (

"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/maps"

"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/sync2"
"storj.io/private/version"
"storj.io/storj/satellite/geoip"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/nodeselection"
)
Expand Down Expand Up @@ -62,10 +60,15 @@ type DB interface {

// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error)
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error)
// GetNodes gets records for all specified nodes as of the given system interval. The
// onlineWindow is used to determine whether each node is marked as Online. The results are
// returned in a slice of the same length as the input nodeIDs, and each index of the returned
// list corresponds to the same index in nodeIDs. If a node is not known, or is disqualified
// or exited, the corresponding returned SelectedNode will have a zero value.
GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error)
// GetParticipatingNodes returns all known participating nodes (this includes all known nodes
// excluding nodes that have been disqualified or gracefully exited).
GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error)
// UpdateReputation updates the DB columns for all reputation fields in ReputationStatus.
UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) error
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
Expand Down Expand Up @@ -486,20 +489,25 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti
return count, err
}

// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []nodeselection.SelectedNode, offlineNodes []nodeselection.SelectedNode, err error) {
// GetNodes gets records for all specified nodes. The configured OnlineWindow is used to determine
// whether each node is marked as Online. The results are returned in a slice of the same length as
// the input nodeIDs, and each index of the returned list corresponds to the same index in nodeIDs.
// If a node is not known, or is disqualified or exited, the corresponding returned SelectedNode
// will have a zero value.
func (service *Service) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)

// TODO add as of system time
return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
return service.db.GetNodes(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
}

// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
func (service *Service) Reliable(ctx context.Context) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) {
// GetParticipatingNodes returns all known participating nodes (this includes all known nodes
// excluding nodes that have been disqualified or gracefully exited).
func (service *Service) GetParticipatingNodes(ctx context.Context) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)

// TODO add as of system tim.
return service.db.Reliable(ctx, service.config.Node.OnlineWindow, 0)
// TODO add as of system time.
return service.db.GetParticipatingNodes(ctx, service.config.Node.OnlineWindow, 0)
}

// UpdateReputation updates the DB columns for any of the reputation fields.
Expand Down Expand Up @@ -660,28 +668,6 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
return nil
}

// GetMissingPieces returns the list of offline nodes and the corresponding pieces.
func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) {
defer mon.Task()(&ctx)(&err)

// TODO this method will be removed completely in subsequent change
var nodeIDs storj.NodeIDList
missingPiecesMap := map[storj.NodeID]uint16{}
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.StorageNode)
missingPiecesMap[p.StorageNode] = p.Number
}
onlineNodes, _, err := service.KnownReliable(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}

for _, node := range onlineNodes {
delete(missingPiecesMap, node.ID)
}
return maps.Values(missingPiecesMap), nil
}

// DQNodesLastSeenBefore disqualifies nodes who have not been contacted since the cutoff time.
func (service *Service) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (count int, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
41 changes: 23 additions & 18 deletions satellite/overlay/service_test.go
Expand Up @@ -6,7 +6,6 @@ package overlay_test
import (
"context"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -383,7 +382,7 @@ func TestNodeInfo(t *testing.T) {
})
}

func TestKnownReliable(t *testing.T) {
func TestGetNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Expand Down Expand Up @@ -428,8 +427,8 @@ func TestKnownReliable(t *testing.T) {
err = oc.TestSuspendNodeOffline(ctx, planet.StorageNodes[3].ID(), time.Now())
require.NoError(t, err)

// Check that only storage nodes #4 and #5 are reliable
online, _, err := service.KnownReliable(ctx, []storj.NodeID{
// Check that the results of GetNodes match expectations.
selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
Expand All @@ -438,20 +437,26 @@ func TestKnownReliable(t *testing.T) {
planet.StorageNodes[5].ID(),
})
require.NoError(t, err)
require.Len(t, online, 2)

// Sort the storage nodes for predictable checks
expectedReliable := []storj.NodeURL{
planet.StorageNodes[4].NodeURL(),
planet.StorageNodes[5].NodeURL(),
}
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) })
sort.Slice(online, func(i, j int) bool { return online[i].ID.Less(online[j].ID) })

// Assert the reliable nodes are the expected ones
for i, node := range online {
assert.Equal(t, expectedReliable[i].ID, node.ID)
assert.Equal(t, expectedReliable[i].Address, node.Address.Address)
require.Len(t, selectedNodes, 6)
require.False(t, selectedNodes[0].Online)
require.Zero(t, selectedNodes[0]) // node was disqualified
require.False(t, selectedNodes[1].Online)
require.False(t, selectedNodes[1].Suspended)
require.True(t, selectedNodes[2].Online)
require.True(t, selectedNodes[2].Suspended)
require.True(t, selectedNodes[3].Online)
require.True(t, selectedNodes[3].Suspended)
require.True(t, selectedNodes[4].Online)
require.False(t, selectedNodes[4].Suspended)
require.True(t, selectedNodes[5].Online)
require.False(t, selectedNodes[5].Suspended)

// Assert the returned nodes are the expected ones
for i, node := range selectedNodes {
if i == 0 {
continue
}
assert.Equal(t, planet.StorageNodes[i].ID(), node.ID)
}
})
}
Expand Down

0 comments on commit e2006d8

Please sign in to comment.