diff --git a/private/testplanet/uplink_test.go b/private/testplanet/uplink_test.go index 9156ccc68a25..90aebc87f630 100644 --- a/private/testplanet/uplink_test.go +++ b/private/testplanet/uplink_test.go @@ -27,6 +27,7 @@ import ( "storj.io/storj/private/revocation" "storj.io/storj/private/server" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/nodeselection" "storj.io/uplink" "storj.io/uplink/private/metaclient" ) @@ -105,8 +106,14 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) { } // confirm that we marked the correct number of storage nodes as offline - online, _, err := satellite.Overlay.Service.Reliable(ctx) + allNodes, err := satellite.Overlay.Service.GetParticipatingNodes(ctx) require.NoError(t, err) + online := make([]nodeselection.SelectedNode, 0, len(allNodes)) + for _, node := range allNodes { + if node.Online { + online = append(online, node) + } + } require.Len(t, online, len(planet.StorageNodes)-toKill) // we should be able to download data without any of the original nodes diff --git a/satellite/orders/service.go b/satellite/orders/service.go index c42bc0236046..3f5541f5bc1b 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -528,7 +528,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) - // should this use KnownReliable or similar? node, err := service.overlay.Get(ctx, nodeID) if err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) diff --git a/satellite/overlay/benchmark_test.go b/satellite/overlay/benchmark_test.go index ded9aba8e5fa..54797268b9bb 100644 --- a/satellite/overlay/benchmark_test.go +++ b/satellite/overlay/benchmark_test.go @@ -64,12 +64,19 @@ func BenchmarkOverlay(b *testing.B) { check = append(check, testrand.NodeID()) } - b.Run("KnownReliable", func(b *testing.B) { + b.Run("GetNodes", func(b *testing.B) { onlineWindow := 1000 * time.Hour for i := 0; i < b.N; i++ { - online, _, err := overlaydb.KnownReliable(ctx, check, onlineWindow, 0) + selectedNodes, err := overlaydb.GetNodes(ctx, check, onlineWindow, 0) require.NoError(b, err) - require.Len(b, online, OnlineCount) + require.Len(b, selectedNodes, len(check)) + foundOnline := 0 + for _, n := range selectedNodes { + if n.Online { + foundOnline++ + } + } + require.Equal(b, OnlineCount, foundOnline) } }) diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index a2cb371a0c8f..10c8ba32a5b8 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -20,7 +20,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "go.uber.org/zap/zaptest" - "golang.org/x/exp/slices" "storj.io/common/identity/testidentity" "storj.io/common/memory" @@ -118,38 +117,39 @@ func TestOnlineOffline(t *testing.T) { satellite := planet.Satellites[0] service := satellite.Overlay.Service - online, offline, err := service.KnownReliable(ctx, []storj.NodeID{ + selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), }) require.NoError(t, err) - require.Empty(t, offline) - require.Len(t, online, 1) + require.Len(t, selectedNodes, 1) + require.True(t, selectedNodes[0].Online) - online, offline, err = service.KnownReliable(ctx, []storj.NodeID{ + selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), }) require.NoError(t, err) - require.Empty(t, offline) - require.Len(t, online, 3) + require.Len(t, selectedNodes, 3) + for i := 0; i < 3; i++ { + require.True(t, selectedNodes[i].Online, i) + require.Equal(t, planet.StorageNodes[i].ID(), selectedNodes[i].ID, i) + } unreliableNodeID := storj.NodeID{1, 2, 3, 4} - online, offline, err = service.KnownReliable(ctx, []storj.NodeID{ + selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), unreliableNodeID, planet.StorageNodes[2].ID(), }) require.NoError(t, err) - require.Empty(t, offline) - require.Len(t, online, 2) - - require.False(t, slices.ContainsFunc(online, func(node nodeselection.SelectedNode) bool { - return node.ID == unreliableNodeID - })) - require.False(t, slices.ContainsFunc(offline, func(node nodeselection.SelectedNode) bool { - return node.ID == unreliableNodeID - })) + require.Len(t, selectedNodes, 3) + require.True(t, selectedNodes[0].Online) + require.False(t, selectedNodes[1].Online) + require.True(t, selectedNodes[2].Online) + require.Equal(t, planet.StorageNodes[0].ID(), selectedNodes[0].ID) + require.Equal(t, storj.NodeID{}, selectedNodes[1].ID) + require.Equal(t, planet.StorageNodes[2].ID(), selectedNodes[2].ID) }) } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index b6061224bbbd..d7da60932664 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -11,7 +11,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "golang.org/x/exp/maps" "storj.io/common/pb" "storj.io/common/storj" @@ -19,7 +18,6 @@ import ( "storj.io/common/sync2" "storj.io/private/version" "storj.io/storj/satellite/geoip" - "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/nodeevents" "storj.io/storj/satellite/nodeselection" ) @@ -62,10 +60,15 @@ type DB interface { // Get looks up the node by nodeID Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error) - // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. - KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) - // Reliable returns all nodes that are reliable (separated by whether they are currently online or offline). - Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) + // GetNodes gets records for all specified nodes as of the given system interval. The + // onlineWindow is used to determine whether each node is marked as Online. The results are + // returned in a slice of the same length as the input nodeIDs, and each index of the returned + // list corresponds to the same index in nodeIDs. If a node is not known, or is disqualified + // or exited, the corresponding returned SelectedNode will have a zero value. + GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) + // GetParticipatingNodes returns all known participating nodes (this includes all known nodes + // excluding nodes that have been disqualified or gracefully exited). + GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) // UpdateReputation updates the DB columns for all reputation fields in ReputationStatus. UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) error // UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version. @@ -486,20 +489,25 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti return count, err } -// KnownReliable filters a set of nodes to reliable (online and qualified) nodes. -func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []nodeselection.SelectedNode, offlineNodes []nodeselection.SelectedNode, err error) { +// GetNodes gets records for all specified nodes. The configured OnlineWindow is used to determine +// whether each node is marked as Online. The results are returned in a slice of the same length as +// the input nodeIDs, and each index of the returned list corresponds to the same index in nodeIDs. +// If a node is not known, or is disqualified or exited, the corresponding returned SelectedNode +// will have a zero value. +func (service *Service) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList) (records []nodeselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) // TODO add as of system time - return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0) + return service.db.GetNodes(ctx, nodeIDs, service.config.Node.OnlineWindow, 0) } -// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline). -func (service *Service) Reliable(ctx context.Context) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) { +// GetParticipatingNodes returns all known participating nodes (this includes all known nodes +// excluding nodes that have been disqualified or gracefully exited). +func (service *Service) GetParticipatingNodes(ctx context.Context) (records []nodeselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) - // TODO add as of system tim. - return service.db.Reliable(ctx, service.config.Node.OnlineWindow, 0) + // TODO add as of system time. + return service.db.GetParticipatingNodes(ctx, service.config.Node.OnlineWindow, 0) } // UpdateReputation updates the DB columns for any of the reputation fields. @@ -660,28 +668,6 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, return nil } -// GetMissingPieces returns the list of offline nodes and the corresponding pieces. -func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) { - defer mon.Task()(&ctx)(&err) - - // TODO this method will be removed completely in subsequent change - var nodeIDs storj.NodeIDList - missingPiecesMap := map[storj.NodeID]uint16{} - for _, p := range pieces { - nodeIDs = append(nodeIDs, p.StorageNode) - missingPiecesMap[p.StorageNode] = p.Number - } - onlineNodes, _, err := service.KnownReliable(ctx, nodeIDs) - if err != nil { - return nil, Error.New("error getting nodes %s", err) - } - - for _, node := range onlineNodes { - delete(missingPiecesMap, node.ID) - } - return maps.Values(missingPiecesMap), nil -} - // DQNodesLastSeenBefore disqualifies nodes who have not been contacted since the cutoff time. func (service *Service) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (count int, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 75a59bc2cc51..f8484200ce8a 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -6,7 +6,6 @@ package overlay_test import ( "context" "fmt" - "sort" "testing" "time" @@ -383,7 +382,7 @@ func TestNodeInfo(t *testing.T) { }) } -func TestKnownReliable(t *testing.T) { +func TestGetNodes(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ @@ -428,8 +427,8 @@ func TestKnownReliable(t *testing.T) { err = oc.TestSuspendNodeOffline(ctx, planet.StorageNodes[3].ID(), time.Now()) require.NoError(t, err) - // Check that only storage nodes #4 and #5 are reliable - online, _, err := service.KnownReliable(ctx, []storj.NodeID{ + // Check that the results of GetNodes match expectations. + selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), @@ -438,20 +437,26 @@ func TestKnownReliable(t *testing.T) { planet.StorageNodes[5].ID(), }) require.NoError(t, err) - require.Len(t, online, 2) - - // Sort the storage nodes for predictable checks - expectedReliable := []storj.NodeURL{ - planet.StorageNodes[4].NodeURL(), - planet.StorageNodes[5].NodeURL(), - } - sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) }) - sort.Slice(online, func(i, j int) bool { return online[i].ID.Less(online[j].ID) }) - - // Assert the reliable nodes are the expected ones - for i, node := range online { - assert.Equal(t, expectedReliable[i].ID, node.ID) - assert.Equal(t, expectedReliable[i].Address, node.Address.Address) + require.Len(t, selectedNodes, 6) + require.False(t, selectedNodes[0].Online) + require.Zero(t, selectedNodes[0]) // node was disqualified + require.False(t, selectedNodes[1].Online) + require.False(t, selectedNodes[1].Suspended) + require.True(t, selectedNodes[2].Online) + require.True(t, selectedNodes[2].Suspended) + require.True(t, selectedNodes[3].Online) + require.True(t, selectedNodes[3].Suspended) + require.True(t, selectedNodes[4].Online) + require.False(t, selectedNodes[4].Suspended) + require.True(t, selectedNodes[5].Online) + require.False(t, selectedNodes[5].Suspended) + + // Assert the returned nodes are the expected ones + for i, node := range selectedNodes { + if i == 0 { + continue + } + assert.Equal(t, planet.StorageNodes[i].ID(), node.ID) } }) } diff --git a/satellite/overlay/statdb_test.go b/satellite/overlay/statdb_test.go index 79d306a0a6b7..af79078c0b2f 100644 --- a/satellite/overlay/statdb_test.go +++ b/satellite/overlay/statdb_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -30,118 +29,151 @@ func TestStatDB(t *testing.T) { } func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { - { // Test KnownReliable and Reliable - for i, tt := range []struct { - nodeID storj.NodeID - unknownAuditSuspended bool - offlineSuspended bool - disqualified bool - offline bool - gracefullyexited bool - countryCode string - }{ - {storj.NodeID{1}, false, false, false, false, false, "DE"}, // good - {storj.NodeID{2}, false, false, true, false, false, "DE"}, // disqualified - {storj.NodeID{3}, true, false, false, false, false, "DE"}, // unknown audit suspended - {storj.NodeID{4}, false, false, false, true, false, "DE"}, // offline - {storj.NodeID{5}, false, false, false, false, true, "DE"}, // gracefully exited - {storj.NodeID{6}, false, true, false, false, false, "DE"}, // offline suspended - {storj.NodeID{7}, false, false, false, false, false, "FR"}, // excluded country - {storj.NodeID{8}, false, false, false, false, false, ""}, // good - } { - addr := fmt.Sprintf("127.0.%d.0:8080", i) - lastNet := fmt.Sprintf("127.0.%d", i) - d := overlay.NodeCheckInInfo{ - NodeID: tt.nodeID, - Address: &pb.NodeAddress{Address: addr}, - LastIPPort: addr, - LastNet: lastNet, - Version: &pb.NodeVersion{Version: "v1.0.0"}, - Capacity: &pb.NodeCapacity{}, - IsUp: true, - CountryCode: location.ToCountryCode(tt.countryCode), - } - err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{}) - require.NoError(t, err) - - if tt.unknownAuditSuspended { - err = cache.TestSuspendNodeUnknownAudit(ctx, tt.nodeID, time.Now()) - require.NoError(t, err) - } + for i, tt := range []struct { + nodeID storj.NodeID + unknownAuditSuspended bool + offlineSuspended bool + disqualified bool + offline bool + gracefullyExited bool + countryCode string + }{ + {storj.NodeID{1}, false, false, false, false, false, "DE"}, // good + {storj.NodeID{2}, false, false, true, false, false, "DE"}, // disqualified + {storj.NodeID{3}, true, false, false, false, false, "DE"}, // unknown audit suspended + {storj.NodeID{4}, false, false, false, true, false, "DE"}, // offline + {storj.NodeID{5}, false, false, false, false, true, "DE"}, // gracefully exited + {storj.NodeID{6}, false, true, false, false, false, "DE"}, // offline suspended + {storj.NodeID{7}, false, false, false, false, false, "FR"}, // excluded country + {storj.NodeID{8}, false, false, false, false, false, ""}, // good + } { + addr := fmt.Sprintf("127.0.%d.0:8080", i) + lastNet := fmt.Sprintf("127.0.%d", i) + d := overlay.NodeCheckInInfo{ + NodeID: tt.nodeID, + Address: &pb.NodeAddress{Address: addr}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + CountryCode: location.ToCountryCode(tt.countryCode), + } + err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) - if tt.offlineSuspended { - err = cache.TestSuspendNodeOffline(ctx, tt.nodeID, time.Now()) - require.NoError(t, err) - } + if tt.unknownAuditSuspended { + err = cache.TestSuspendNodeUnknownAudit(ctx, tt.nodeID, time.Now()) + require.NoError(t, err) + } - if tt.disqualified { - _, err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown) - require.NoError(t, err) - } - if tt.offline { - checkInInfo := getNodeInfo(tt.nodeID) - err = cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-2*time.Hour), overlay.NodeSelectionConfig{}) - require.NoError(t, err) - } - if tt.gracefullyexited { - req := &overlay.ExitStatusRequest{ - NodeID: tt.nodeID, - ExitInitiatedAt: time.Now(), - ExitLoopCompletedAt: time.Now(), - ExitFinishedAt: time.Now(), - } - _, err := cache.UpdateExitStatus(ctx, req) - require.NoError(t, err) - } + if tt.offlineSuspended { + err = cache.TestSuspendNodeOffline(ctx, tt.nodeID, time.Now()) + require.NoError(t, err) } - nodeIds := storj.NodeIDList{ - storj.NodeID{1}, storj.NodeID{2}, - storj.NodeID{3}, storj.NodeID{4}, - storj.NodeID{5}, storj.NodeID{6}, - storj.NodeID{7}, storj.NodeID{8}, - storj.NodeID{9}, + if tt.disqualified { + _, err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown) + require.NoError(t, err) + } + if tt.offline { + checkInInfo := getNodeInfo(tt.nodeID) + checkInInfo.CountryCode = location.ToCountryCode(tt.countryCode) + err = cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-2*time.Hour), overlay.NodeSelectionConfig{}) + require.NoError(t, err) } - contains := func(nodeID storj.NodeID) func(node nodeselection.SelectedNode) bool { - return func(node nodeselection.SelectedNode) bool { - return node.ID == nodeID + if tt.gracefullyExited { + req := &overlay.ExitStatusRequest{ + NodeID: tt.nodeID, + ExitInitiatedAt: time.Now(), + ExitLoopCompletedAt: time.Now(), + ExitFinishedAt: time.Now(), } + _, err := cache.UpdateExitStatus(ctx, req) + require.NoError(t, err) } + } - online, offline, err := cache.KnownReliable(ctx, nodeIds, time.Hour, 0) - require.NoError(t, err) + nodeIds := storj.NodeIDList{ + storj.NodeID{1}, storj.NodeID{2}, + storj.NodeID{3}, storj.NodeID{4}, + storj.NodeID{5}, storj.NodeID{6}, + storj.NodeID{7}, storj.NodeID{8}, + storj.NodeID{9}, + } - // unrealiable nodes shouldn't be in results - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db + t.Run("GetNodes", func(t *testing.T) { + selectedNodes, err := cache.GetNodes(ctx, nodeIds, time.Hour, 0) + require.NoError(t, err) + require.Len(t, selectedNodes, len(nodeIds)) - require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline - // KnownReliable is not excluding by country anymore - require.True(t, slices.ContainsFunc(online, contains(storj.NodeID{7}))) // excluded country + // disqualified/exited/unknown nodes should be returned as a zero-value SelectedNode in results + require.Zero(t, selectedNodes[1].ID) // #2 is disqualified + require.False(t, selectedNodes[1].Online) + require.Zero(t, selectedNodes[4].ID) // #5 gracefully exited + require.False(t, selectedNodes[4].Online) + require.Zero(t, selectedNodes[8].ID) // #9 is not in db + require.False(t, selectedNodes[8].Online) - require.Len(t, append(online, offline...), 4) + require.Equal(t, nodeIds[0], selectedNodes[0].ID) // #1 is online + require.True(t, selectedNodes[0].Online) + require.Equal(t, "DE", selectedNodes[0].CountryCode.String()) + require.Equal(t, nodeIds[2], selectedNodes[2].ID) // #3 is unknown-audit-suspended + require.True(t, selectedNodes[2].Online) + require.Equal(t, "DE", selectedNodes[2].CountryCode.String()) + require.Equal(t, nodeIds[3], selectedNodes[3].ID) // #4 is offline + require.False(t, selectedNodes[3].Online) + require.Equal(t, "DE", selectedNodes[3].CountryCode.String()) + require.Equal(t, nodeIds[5], selectedNodes[5].ID) // #6 is offline-suspended + require.True(t, selectedNodes[5].Online) + require.Equal(t, "DE", selectedNodes[5].CountryCode.String()) + require.Equal(t, nodeIds[6], selectedNodes[6].ID) // #7 is in an excluded country + require.True(t, selectedNodes[6].Online) + require.Equal(t, "FR", selectedNodes[6].CountryCode.String()) + require.Equal(t, nodeIds[7], selectedNodes[7].ID) // #8 is online but has no country code + require.True(t, selectedNodes[7].Online) + require.Equal(t, "", selectedNodes[7].CountryCode.String()) + }) - online, offline, err = cache.Reliable(ctx, time.Hour, 0) + t.Run("GetParticipatingNodes", func(t *testing.T) { + allNodes, err := cache.GetParticipatingNodes(ctx, time.Hour, 0) require.NoError(t, err) - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended + expectOnline := func(t *testing.T, nodeList []nodeselection.SelectedNode, nodeID storj.NodeID, shouldBeOnline bool) { + for _, n := range nodeList { + if n.ID == nodeID { + if n.Online != shouldBeOnline { + require.Failf(t, "invalid Onlineness", "node %x was found in list, but Online=%v, whereas we expected Online=%v", n.ID[:], n.Online, shouldBeOnline) + } + return + } + } + require.Fail(t, "node not found in list", "node ID %x not found in list. list: %v", nodeID[:], nodeList) + } - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended - require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db + expectOnline(t, allNodes, storj.NodeID{1}, true) // normal and online + expectOnline(t, allNodes, storj.NodeID{3}, true) // unknown audit suspended + expectOnline(t, allNodes, storj.NodeID{4}, false) // offline + expectOnline(t, allNodes, storj.NodeID{6}, true) // offline suspended + expectOnline(t, allNodes, storj.NodeID{7}, true) // excluded country + expectOnline(t, allNodes, storj.NodeID{8}, true) // normal and online, no country code - require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline - // Reliable is not excluding by country anymore - require.True(t, slices.ContainsFunc(online, contains(storj.NodeID{7}))) // excluded country + expectNotInList := func(t *testing.T, nodeList []nodeselection.SelectedNode, nodeID storj.NodeID) { + for index, n := range nodeList { + if n.ID == nodeID { + require.Failf(t, "not found in list", "node %x should not have been found in list, but it was found at index [%d].", nodeID[:], index) + } + } + } - require.Len(t, append(online, offline...), 4) - } + expectNotInList(t, allNodes, storj.NodeID{2}) // disqualified + expectNotInList(t, allNodes, storj.NodeID{5}) // gracefully exited + expectNotInList(t, allNodes, storj.NodeID{9}) // not in db + + require.Len(t, allNodes, 6) + }) - { // TestUpdateOperator + t.Run("TestUpdateOperator", func(t *testing.T) { nodeID := storj.NodeID{10} addr := "127.0.1.0:8080" lastNet := "127.0.1" @@ -214,9 +246,10 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { assert.Equal(t, "0x2222222222222222222222222222222222222222", updateWalletFeatures.Operator.Wallet) assert.Equal(t, "def456@mail.test", updateWalletFeatures.Operator.Email) assert.Equal(t, []string{"wallet_features_updated"}, updateWalletFeatures.Operator.WalletFeatures) - } + }) - { // test UpdateCheckIn updates the reputation correctly when the node is offline/online + // test UpdateCheckIn updates the reputation correctly when the node is offline/online + t.Run("UpdateCheckIn", func(t *testing.T) { nodeID := storj.NodeID{1} // get the existing node info that is stored in nodes table @@ -248,5 +281,5 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { require.NoError(t, err) _, err = cache.Get(ctx, nodeID) require.NoError(t, err) - } + }) } diff --git a/satellite/overlay/uploadselection_test.go b/satellite/overlay/uploadselection_test.go index 6aa326584f23..ae580109d67f 100644 --- a/satellite/overlay/uploadselection_test.go +++ b/satellite/overlay/uploadselection_test.go @@ -205,7 +205,7 @@ func TestRefreshConcurrent(t *testing.T) { require.True(t, 1 <= mockDB.callCount && mockDB.callCount <= 2, "calls %d", mockDB.callCount) } -func TestGetNodes(t *testing.T) { +func TestSelectNodes(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { var nodeSelectionConfig = overlay.NodeSelectionConfig{ NewNodeFraction: 0.2, @@ -768,6 +768,16 @@ func (m *mockdb) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDos panic("implement me") } +// GetNodes satisfies nodeevents.DB interface. +func (m *mockdb) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) { + panic("implement me") +} + +// GetParticipatingNodes satisfies nodeevents.DB interface. +func (m *mockdb) GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) { + panic("implement me") +} + // KnownReliable satisfies nodeevents.DB interface. func (m *mockdb) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) { panic("implement me") diff --git a/satellite/repair/checker/observer_unit_test.go b/satellite/repair/checker/observer_unit_test.go index 744cc6aece57..6f14979282c6 100644 --- a/satellite/repair/checker/observer_unit_test.go +++ b/satellite/repair/checker/observer_unit_test.go @@ -57,12 +57,9 @@ func TestObserverForkProcess(t *testing.T) { } o.nodesCache.state.Store(&reliabilityState{ - reliableAll: mapNodes(nodes, func(node nodeselection.SelectedNode) bool { + nodeByID: mapNodes(nodes, func(node nodeselection.SelectedNode) bool { return true }), - reliableOnline: mapNodes(nodes, func(node nodeselection.SelectedNode) bool { - return node.Online == true - }), created: time.Now(), }) return o diff --git a/satellite/repair/checker/online.go b/satellite/repair/checker/online.go index 67f5f4a8853f..a5a66ffb82c6 100644 --- a/satellite/repair/checker/online.go +++ b/satellite/repair/checker/online.go @@ -16,7 +16,7 @@ import ( "storj.io/storj/satellite/overlay" ) -// ReliabilityCache caches the reliable nodes for the specified staleness duration +// ReliabilityCache caches known nodes for the specified staleness duration // and updates automatically from overlay. // // architecture: Service @@ -32,9 +32,8 @@ type ReliabilityCache struct { // reliabilityState. type reliabilityState struct { - reliableOnline map[storj.NodeID]nodeselection.SelectedNode - reliableAll map[storj.NodeID]nodeselection.SelectedNode - created time.Time + nodeByID map[storj.NodeID]nodeselection.SelectedNode + created time.Time } // NewReliabilityCache creates a new reliability checking cache. @@ -73,7 +72,7 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err return 0, err } - return len(state.reliableOnline), nil + return len(state.nodeByID), nil } // MissingPieces returns piece indices that are unreliable with the given staleness period. @@ -84,8 +83,8 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T } var unreliable metabase.Pieces for _, p := range pieces { - node, ok := state.reliableOnline[p.StorageNode] - if !ok { + node, ok := state.nodeByID[p.StorageNode] + if !ok || !node.Online || node.Suspended { unreliable = append(unreliable, p) } else if _, excluded := cache.excludedCountryCodes[node.CountryCode]; excluded { unreliable = append(unreliable, p) @@ -109,7 +108,7 @@ func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created var outOfPlacementPieces metabase.Pieces nodeFilters := cache.placementRules(placement) for _, p := range pieces { - if node, ok := state.reliableAll[p.StorageNode]; ok && !nodeFilters.Match(&node) { + if node, ok := state.nodeByID[p.StorageNode]; ok && !nodeFilters.Match(&node) { outOfPlacementPieces = append(outOfPlacementPieces, p) } } @@ -118,8 +117,8 @@ func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created } // PiecesNodesLastNetsInOrder returns the /24 subnet for each piece storage node, in order. If a -// requested node is not in the database or it's unreliable, an empty string will be returned corresponding -// to that node's last_net. +// requested node is not in the database, an empty string will be returned corresponding to that +// node's last_net. func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, created time.Time, pieces metabase.Pieces) (lastNets []string, err error) { defer mon.Task()(&ctx)(nil) @@ -134,7 +133,7 @@ func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, c lastNets = make([]string, len(pieces)) for i, piece := range pieces { - if node, ok := state.reliableAll[piece.StorageNode]; ok { + if node, ok := state.nodeByID[piece.StorageNode]; ok { lastNets[i] = node.LastNet } } @@ -180,22 +179,17 @@ func (cache *ReliabilityCache) Refresh(ctx context.Context) (err error) { func (cache *ReliabilityCache) refreshLocked(ctx context.Context) (_ *reliabilityState, err error) { defer mon.Task()(&ctx)(&err) - online, offline, err := cache.overlay.Reliable(ctx) + selectedNodes, err := cache.overlay.GetParticipatingNodes(ctx) if err != nil { return nil, Error.Wrap(err) } state := &reliabilityState{ - created: time.Now(), - reliableOnline: make(map[storj.NodeID]nodeselection.SelectedNode, len(online)), - reliableAll: make(map[storj.NodeID]nodeselection.SelectedNode, len(online)+len(offline)), + created: time.Now(), + nodeByID: make(map[storj.NodeID]nodeselection.SelectedNode, len(selectedNodes)), } - for _, node := range online { - state.reliableOnline[node.ID] = node - state.reliableAll[node.ID] = node - } - for _, node := range offline { - state.reliableAll[node.ID] = node + for _, node := range selectedNodes { + state.nodeByID[node.ID] = node } cache.state.Store(state) diff --git a/satellite/repair/checker/online_test.go b/satellite/repair/checker/online_test.go index f3bd784015f5..b91f416199ff 100644 --- a/satellite/repair/checker/online_test.go +++ b/satellite/repair/checker/online_test.go @@ -60,13 +60,13 @@ func TestReliabilityCache_Concurrent(t *testing.T) { type fakeOverlayDB struct{ overlay.DB } type fakeNodeEvents struct{ nodeevents.DB } -func (fakeOverlayDB) Reliable(context.Context, time.Duration, time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) { +func (fakeOverlayDB) GetParticipatingNodes(context.Context, time.Duration, time.Duration) ([]nodeselection.SelectedNode, error) { return []nodeselection.SelectedNode{ - {ID: testrand.NodeID()}, - {ID: testrand.NodeID()}, - {ID: testrand.NodeID()}, - {ID: testrand.NodeID()}, - }, nil, nil + {ID: testrand.NodeID(), Online: true}, + {ID: testrand.NodeID(), Online: true}, + {ID: testrand.NodeID(), Online: true}, + {ID: testrand.NodeID(), Online: true}, + }, nil } func TestReliabilityCache_OutOfPlacementPieces(t *testing.T) { diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 1d2aed1a0a93..ddcd9955ca01 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -668,14 +668,14 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm allNodeIDs[i] = piece.StorageNode } - online, offline, err := repairer.overlay.KnownReliable(ctx, allNodeIDs) + selectedNodes, err := repairer.overlay.GetNodes(ctx, allNodeIDs) if err != nil { return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err) } - return repairer.classifySegmentPiecesWithNodes(ctx, segment, allNodeIDs, online, offline) + return repairer.classifySegmentPiecesWithNodes(ctx, segment, allNodeIDs, selectedNodes) } -func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Context, segment metabase.Segment, allNodeIDs []storj.NodeID, online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode) (result piecesCheckResult, err error) { +func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Context, segment metabase.Segment, allNodeIDs []storj.NodeID, selectedNodes []nodeselection.SelectedNode) (result piecesCheckResult, err error) { pieces := segment.Pieces nodeIDPieceMap := map[storj.NodeID]uint16{} @@ -688,22 +688,28 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont result.ExcludeNodeIDs = allNodeIDs + if len(selectedNodes) != len(pieces) { + repairer.log.Error("GetNodes returned an invalid result", zap.Any("pieces", pieces), zap.Any("selectedNodes", selectedNodes), zap.Error(err)) + return piecesCheckResult{}, overlayQueryError.New("GetNodes returned an invalid result") + } + nodeFilters := repairer.placementRules(segment.Placement) // remove online nodes from missing pieces - for _, onlineNode := range online { + for _, node := range selectedNodes { + if !node.Online || node.Suspended { + continue + } // count online nodes in excluded countries only if country is not excluded by segment // placement, those nodes will be counted with out of placement check - if _, excluded := repairer.excludedCountryCodes[onlineNode.CountryCode]; excluded && nodeFilters.Match(&onlineNode) { + if _, excluded := repairer.excludedCountryCodes[node.CountryCode]; excluded && nodeFilters.Match(&node) { result.NumHealthyInExcludedCountries++ } - pieceNum := nodeIDPieceMap[onlineNode.ID] + pieceNum := nodeIDPieceMap[node.ID] delete(result.MissingPiecesSet, pieceNum) } - nodeFilters = repairer.placementRules(segment.Placement) - if repairer.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilters) { // if multiple pieces are on the same last_net, keep only the first one. The rest are // to be considered retrievable but unhealthy. @@ -711,8 +717,11 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont reliablePieces := metabase.Pieces{} - collectLastNets := func(reliable []nodeselection.SelectedNode) { - for _, node := range reliable { + collectClumpedPieces := func(onlineness bool) { + for _, node := range selectedNodes { + if node.Online != onlineness { + continue + } pieceNum := nodeIDPieceMap[node.ID] reliablePieces = append(reliablePieces, metabase.Piece{ Number: pieceNum, @@ -721,8 +730,10 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont lastNets = append(lastNets, node.LastNet) } } - collectLastNets(online) - collectLastNets(offline) + // go over online nodes first, so that if we have to remove clumped pieces, we prefer + // to remove offline ones over online ones. + collectClumpedPieces(true) + collectClumpedPieces(false) clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets) result.ClumpedPiecesSet = map[uint16]bool{} @@ -734,17 +745,13 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont result.OutOfPlacementPiecesSet = map[uint16]bool{} if repairer.doPlacementCheck { - checkPlacement := func(reliable []nodeselection.SelectedNode) { - for _, node := range reliable { - if nodeFilters.Match(&node) { - continue - } - - result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true + for _, node := range selectedNodes { + if nodeFilters.Match(&node) { + continue } + + result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true } - checkPlacement(online) - checkPlacement(offline) } // verify that some of clumped pieces and out of placement pieces are not the same @@ -753,8 +760,10 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet) // offline nodes are not retrievable - for _, node := range offline { - delete(unhealthyRetrievableSet, nodeIDPieceMap[node.ID]) + for _, node := range selectedNodes { + if !node.Online { + delete(unhealthyRetrievableSet, nodeIDPieceMap[node.ID]) + } } result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet) diff --git a/satellite/repair/repairer/segments_unit_test.go b/satellite/repair/repairer/segments_unit_test.go index a6ed8af5a6f1..69f35046aef4 100644 --- a/satellite/repair/repairer/segments_unit_test.go +++ b/satellite/repair/repairer/segments_unit_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" "storj.io/common/identity/testidentity" "storj.io/common/storj" @@ -21,8 +22,21 @@ import ( func TestClassify(t *testing.T) { ctx := testcontext.New(t) + getNodes := func(nodes []nodeselection.SelectedNode, pieces metabase.Pieces) (res []nodeselection.SelectedNode) { + for _, piece := range pieces { + for _, node := range nodes { + if node.ID == piece.StorageNode { + res = append(res, node) + break + } + } + + } + return res + } + t.Run("all online", func(t *testing.T) { - var online, offline = generateNodes(5, func(ix int) bool { + var selectedNodes = generateNodes(5, func(ix int) bool { return true }, func(ix int, node *nodeselection.SelectedNode) { @@ -33,8 +47,8 @@ func TestClassify(t *testing.T) { s := SegmentRepairer{ placementRules: c.CreateFilters, } - pieces := createPieces(online, offline, 0, 1, 2, 3, 4) - result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), online, offline) + pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4) + result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), selectedNodes) require.NoError(t, err) require.Equal(t, 0, len(result.MissingPiecesSet)) @@ -44,10 +58,10 @@ func TestClassify(t *testing.T) { }) t.Run("out of placement", func(t *testing.T) { - var online, offline = generateNodes(10, func(ix int) bool { + var selectedNodes = generateNodes(10, func(ix int) bool { return true }, func(ix int, node *nodeselection.SelectedNode) { - if ix > 4 { + if ix < 4 { node.CountryCode = location.Germany } else { node.CountryCode = location.UnitedKingdom @@ -60,10 +74,11 @@ func TestClassify(t *testing.T) { s := SegmentRepairer{ placementRules: c.CreateFilters, doPlacementCheck: true, + log: zaptest.NewLogger(t), } - pieces := createPieces(online, offline, 1, 2, 3, 4, 7, 8) - result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline) + pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8) + result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) require.NoError(t, err) require.Equal(t, 0, len(result.MissingPiecesSet)) @@ -75,7 +90,7 @@ func TestClassify(t *testing.T) { t.Run("out of placement and offline", func(t *testing.T) { // all nodes are in wrong region and half of them are offline - var online, offline = generateNodes(10, func(ix int) bool { + var selectedNodes = generateNodes(10, func(ix int) bool { return ix < 5 }, func(ix int, node *nodeselection.SelectedNode) { node.CountryCode = location.Germany @@ -88,8 +103,8 @@ func TestClassify(t *testing.T) { doPlacementCheck: true, } - pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) - result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline) + pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) require.NoError(t, err) // offline nodes @@ -103,7 +118,7 @@ func TestClassify(t *testing.T) { }) t.Run("normal declumping (subnet check)", func(t *testing.T) { - var online, offline = generateNodes(10, func(ix int) bool { + var selectedNodes = generateNodes(10, func(ix int) bool { return ix < 5 }, func(ix int, node *nodeselection.SelectedNode) { node.LastNet = fmt.Sprintf("127.0.%d.0", ix/2) @@ -113,16 +128,17 @@ func TestClassify(t *testing.T) { s := SegmentRepairer{ placementRules: c.CreateFilters, doDeclumping: true, + log: zaptest.NewLogger(t), } // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 - pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6) - result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), online, offline) + pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) + result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) require.NoError(t, err) // offline nodes require.Equal(t, 2, len(result.MissingPiecesSet)) - require.Equal(t, 4, len(result.ClumpedPiecesSet)) + require.Equal(t, 3, len(result.ClumpedPiecesSet)) require.Equal(t, 0, len(result.OutOfPlacementPiecesSet)) require.Equal(t, 2, result.NumUnhealthyRetrievable) numHealthy := len(pieces) - len(result.MissingPiecesSet) - result.NumUnhealthyRetrievable @@ -131,7 +147,7 @@ func TestClassify(t *testing.T) { }) t.Run("declumping but with no subnet filter", func(t *testing.T) { - var online, offline = generateNodes(10, func(ix int) bool { + var selectedNodes = generateNodes(10, func(ix int) bool { return ix < 5 }, func(ix int, node *nodeselection.SelectedNode) { node.LastNet = fmt.Sprintf("127.0.%d.0", ix/2) @@ -147,8 +163,8 @@ func TestClassify(t *testing.T) { } // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 - pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6) - result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline) + pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) + result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) require.NoError(t, err) // offline nodes @@ -163,31 +179,25 @@ func TestClassify(t *testing.T) { } -func generateNodes(num int, isOnline func(i int) bool, config func(ix int, node *nodeselection.SelectedNode)) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode) { +func generateNodes(num int, isOnline func(i int) bool, config func(ix int, node *nodeselection.SelectedNode)) (selectedNodes []nodeselection.SelectedNode) { for i := 0; i < num; i++ { node := nodeselection.SelectedNode{ - ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID, + ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID, + Online: isOnline(i), } config(i, &node) - if isOnline(i) { - online = append(online, node) - } else { - offline = append(offline, node) - } + selectedNodes = append(selectedNodes, node) } return } -func createPieces(online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, indexes ...int) (res metabase.Pieces) { +func createPieces(selectedNodes []nodeselection.SelectedNode, indexes ...int) (res metabase.Pieces) { for _, index := range indexes { piece := metabase.Piece{ Number: uint16(index), } - if len(online)-1 < index { - piece.StorageNode = offline[index-len(online)].ID - } else { - piece.StorageNode = online[index].ID - } + piece.StorageNode = selectedNodes[index].ID + res = append(res, piece) } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index dedcb2f1feaa..30ef1b5dff6e 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -394,47 +394,31 @@ func (cache *overlaycache) UpdateLastOfflineEmail(ctx context.Context, nodeIDs s return err } -// KnownReliable filters a set of nodes to reliable nodes. List is split into online and offline nodes. -func (cache *overlaycache) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) { - var on, off []*nodeselection.SelectedNode - var err error - for { - on, off, err = cache.knownReliable(ctx, nodeIDs, onlineWindow, asOfSystemInterval) - if err != nil { - if cockroachutil.NeedsRetry(err) { - continue - } - return nil, nil, err - } - break - } - err = cache.addNodeTags(ctx, append(on, off...)) - deref := func(nodes []*nodeselection.SelectedNode) []nodeselection.SelectedNode { - var res []nodeselection.SelectedNode - for _, node := range nodes { - res = append(res, *node) - } - return res - } - return deref(on), deref(off), err -} - -func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []*nodeselection.SelectedNode, offline []*nodeselection.SelectedNode, err error) { +// GetNodes gets records for all specified nodes as of the given system interval. The +// onlineWindow is used to determine whether each node is marked as Online. The results are +// returned in a slice of the same length as the input nodeIDs, and each index of the returned +// list corresponds to the same index in nodeIDs. If a node is not known, or is disqualified +// or exited, the corresponding returned SelectedNode will have a zero value. +func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (records []nodeselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) + var nodes []*nodeselection.SelectedNode + if len(nodeIDs) == 0 { - return nil, nil, Error.New("no ids provided") + return nil, Error.New("no ids provided") } err = withRows(cache.db.Query(ctx, ` - SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online, exit_initiated_at IS NOT NULL as exiting - FROM nodes + SELECT n.id, n.address, n.last_net, n.last_ip_port, n.country_code, + n.last_contact_success > $2 AS online, + (n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended, + n.disqualified IS NOT NULL AS disqualified, + n.exit_initiated_at IS NOT NULL AS exiting, + n.exit_finished_at IS NOT NULL AS exited + FROM unnest($1::bytea[]) WITH ORDINALITY AS input(node_id, ordinal) + LEFT OUTER JOIN nodes n ON input.node_id = n.id `+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+` - WHERE id = any($1::bytea[]) - AND disqualified IS NULL - AND unknown_audit_suspended IS NULL - AND offline_suspended IS NULL - AND exit_finished_at IS NULL + ORDER BY input.ordinal `, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow), ))(func(rows tagsql.Rows) error { for rows.Next() { @@ -443,53 +427,43 @@ func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.Node return err } - if node.Online { - online = append(online, &node) - } else { - offline = append(offline, &node) - } + nodes = append(nodes, &node) } return nil }) + if err != nil { + return nil, Error.Wrap(err) + } - return online, offline, Error.Wrap(err) -} - -// Reliable returns all nodes that are reliable, online and offline. -func (cache *overlaycache) Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) { - var on, off []*nodeselection.SelectedNode - var err error - for { - on, off, err = cache.reliable(ctx, onlineWindow, asOfSystemInterval) - if err != nil { - if cockroachutil.NeedsRetry(err) { - continue - } - return nil, nil, err - } - break + err = cache.addNodeTags(ctx, nodes) + if err != nil { + return nil, Error.Wrap(err) } - err = cache.addNodeTags(ctx, append(on, off...)) - deref := func(nodes []*nodeselection.SelectedNode) []nodeselection.SelectedNode { - var res []nodeselection.SelectedNode - for _, node := range nodes { - res = append(res, *node) - } - return res + records = make([]nodeselection.SelectedNode, len(nodes)) + for i := 0; i < len(nodes); i++ { + records[i] = *nodes[i] } - return deref(on), deref(off), err + + return records, Error.Wrap(err) } -func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []*nodeselection.SelectedNode, offline []*nodeselection.SelectedNode, err error) { +// GetParticipatingNodes returns all known participating nodes (this includes all known nodes +// excluding nodes that have been disqualified or gracefully exited). +func (cache *overlaycache) GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (records []nodeselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) + var nodes []*nodeselection.SelectedNode + err = withRows(cache.db.Query(ctx, ` - SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $1 as online, exit_initiated_at IS NOT NULL as exiting + SELECT id, address, last_net, last_ip_port, country_code, + last_contact_success > $1 AS online, + (offline_suspended IS NOT NULL OR unknown_audit_suspended IS NOT NULL) AS suspended, + false AS disqualified, + exit_initiated_at IS NOT NULL AS exiting, + false AS exited FROM nodes `+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+` WHERE disqualified IS NULL - AND unknown_audit_suspended IS NULL - AND offline_suspended IS NULL AND exit_finished_at IS NULL `, time.Now().Add(-onlineWindow), ))(func(rows tagsql.Rows) error { @@ -498,35 +472,82 @@ func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSyste if err != nil { return err } - - if node.Online { - online = append(online, &node) - } else { - offline = append(offline, &node) - } + nodes = append(nodes, &node) } return nil }) + if err != nil { + return nil, Error.Wrap(err) + } - return online, offline, Error.Wrap(err) + err = cache.addNodeTags(ctx, nodes) + if err != nil { + return nil, Error.Wrap(err) + } + records = make([]nodeselection.SelectedNode, len(nodes)) + for i := 0; i < len(nodes); i++ { + records[i] = *nodes[i] + } + + return records, Error.Wrap(err) +} + +// nullNodeID represents a NodeID that may be null. +type nullNodeID struct { + NodeID storj.NodeID + Valid bool +} + +// Scan implements the sql.Scanner interface. +func (n *nullNodeID) Scan(value any) error { + if value == nil { + n.NodeID = storj.NodeID{} + n.Valid = false + return nil + } + err := n.NodeID.Scan(value) + if err != nil { + n.Valid = false + return err + } + n.Valid = true + return nil } func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) { var node nodeselection.SelectedNode node.Address = &pb.NodeAddress{} - var lastIPPort sql.NullString - err := rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &node.Online, &node.Exiting) + var nodeID nullNodeID + var address, lastNet, lastIPPort, countryCode sql.NullString + var online, suspended, disqualified, exiting, exited sql.NullBool + err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode, + &online, &suspended, &disqualified, &exiting, &exited) if err != nil { return nodeselection.SelectedNode{}, err } + // If node ID was null, no record was found for the specified ID. For our purposes + // here, we will treat that as equivalent to a node being DQ'd or exited. + if !nodeID.Valid { + // return an empty record + return nodeselection.SelectedNode{}, nil + } + // nodeID was valid, so from here on we assume all the other non-null fields are valid, per database constraints + if disqualified.Bool || exited.Bool { + return nodeselection.SelectedNode{}, nil + } + node.ID = nodeID.NodeID + node.Address.Address = address.String + node.LastNet = lastNet.String if lastIPPort.Valid { node.LastIPPort = lastIPPort.String } - // node.Suspended is always false for now, but that will change in a coming - // commit; we need to include suspended nodes in return values from - // Reliable() and KnownReliable() (in case they are in excluded countries, - // are out of placement, are on clumped IP networks, etc). + if countryCode.Valid { + node.CountryCode = location.ToCountryCode(countryCode.String) + } + node.Online = online.Bool + node.Suspended = suspended.Bool + node.Exiting = exiting.Bool return node, nil } diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index 4a3429c32ae9..de66e0c39478 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "storj.io/common/identity/testidentity" @@ -448,200 +449,264 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) { } -func TestOverlayCache_KnownReliable(t *testing.T) { +type nodeDisposition struct { + id storj.NodeID + address string + lastIPPort string + offlineInterval time.Duration + countryCode location.CountryCode + disqualified bool + auditSuspended bool + offlineSuspended bool + exiting bool + exited bool +} + +func TestOverlayCache_GetNodes(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { cache := db.OverlayCache() - allNodes := []nodeselection.SelectedNode{ - addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false), - addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false), - addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false), - addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false), - addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false), - addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true), + allNodes := []nodeDisposition{ + addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false), + addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false), + addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false), + addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false), + addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false), + addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false), + addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true), } - ids := func(nodes ...nodeselection.SelectedNode) storj.NodeIDList { - nodeIds := storj.NodeIDList{} - for _, node := range nodes { - nodeIds = append(nodeIds, node.ID) + nodes := func(nodeNums ...int) []nodeDisposition { + nodeDisps := make([]nodeDisposition, len(nodeNums)) + for i, nodeNum := range nodeNums { + nodeDisps[i] = allNodes[nodeNum] } - return nodeIds + return nodeDisps } - nodes := func(nodes ...nodeselection.SelectedNode) []nodeselection.SelectedNode { - return append([]nodeselection.SelectedNode{}, nodes...) + sNodes := func(nodes ...int) []nodeselection.SelectedNode { + selectedNodes := make([]nodeselection.SelectedNode, len(nodes)) + for i, nodeNum := range nodes { + selectedNodes[i] = nodeDispositionToSelectedNode(allNodes[nodeNum], time.Hour) + } + return selectedNodes } type testCase struct { - IDs storj.NodeIDList - Online []nodeselection.SelectedNode - Offline []nodeselection.SelectedNode + QueryNodes []nodeDisposition + Online []nodeselection.SelectedNode + Offline []nodeselection.SelectedNode } - shuffledNodeIDs := ids(allNodes...) - rand.Shuffle(len(shuffledNodeIDs), shuffledNodeIDs.Swap) - - for _, tc := range []testCase{ + for testNum, tc := range []testCase{ { - IDs: ids(allNodes[0], allNodes[1]), - Online: nodes(allNodes[0]), - Offline: nodes(allNodes[1]), + QueryNodes: nodes(0, 1), + Online: sNodes(0), + Offline: sNodes(1), }, { - IDs: ids(allNodes[0]), - Online: nodes(allNodes[0]), + QueryNodes: nodes(0), + Online: sNodes(0), }, { - IDs: ids(allNodes[1]), - Offline: nodes(allNodes[1]), + QueryNodes: nodes(1), + Offline: sNodes(1), }, { // only unreliable - IDs: ids(allNodes[2], allNodes[3], allNodes[4], allNodes[5]), + QueryNodes: nodes(2, 3, 4, 5), + Online: sNodes(3), + Offline: sNodes(4, 5), }, { // all nodes - IDs: ids(allNodes...), - Online: nodes(allNodes[0]), - Offline: nodes(allNodes[1]), - }, - // all nodes but in shuffled order - { - IDs: shuffledNodeIDs, - Online: nodes(allNodes[0]), - Offline: nodes(allNodes[1]), + QueryNodes: allNodes, + Online: sNodes(0, 3), + Offline: sNodes(1, 4, 5), }, // all nodes + one ID not from DB { - IDs: append(ids(allNodes...), testrand.NodeID()), - Online: nodes(allNodes[0]), - Offline: nodes(allNodes[1]), + QueryNodes: append(allNodes, nodeDisposition{ + id: testrand.NodeID(), + disqualified: true, // just so we expect a zero ID for this entry + }), + Online: sNodes(0, 3), + Offline: sNodes(1, 4, 5), }, } { - online, offline, err := cache.KnownReliable(ctx, tc.IDs, 1*time.Hour, 0) + ids := make([]storj.NodeID, len(tc.QueryNodes)) + for i := range tc.QueryNodes { + ids[i] = tc.QueryNodes[i].id + } + selectedNodes, err := cache.GetNodes(ctx, ids, 1*time.Hour, 0) require.NoError(t, err) - require.ElementsMatch(t, tc.Online, online) - require.ElementsMatch(t, tc.Offline, offline) + require.Equal(t, len(tc.QueryNodes), len(selectedNodes)) + var gotOnline []nodeselection.SelectedNode + var gotOffline []nodeselection.SelectedNode + for i, n := range selectedNodes { + if tc.QueryNodes[i].disqualified || tc.QueryNodes[i].exited { + assert.Zero(t, n, testNum, i) + } else { + assert.Equal(t, tc.QueryNodes[i].id, selectedNodes[i].ID, "%d:%d", testNum, i) + if n.Online { + gotOnline = append(gotOnline, n) + } else { + gotOffline = append(gotOffline, n) + } + } + } + assert.Equal(t, tc.Online, gotOnline) + assert.Equal(t, tc.Offline, gotOffline) } // test empty id list - _, _, err := cache.KnownReliable(ctx, storj.NodeIDList{}, 1*time.Hour, 0) + _, err := cache.GetNodes(ctx, storj.NodeIDList{}, 1*time.Hour, 0) require.Error(t, err) // test as of system time - _, _, err = cache.KnownReliable(ctx, ids(allNodes...), 1*time.Hour, -1*time.Microsecond) + allIDs := make([]storj.NodeID, len(allNodes)) + for i := range allNodes { + allIDs[i] = allNodes[i].id + } + _, err = cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond) require.NoError(t, err) }) } -func TestOverlayCache_Reliable(t *testing.T) { +func TestOverlayCache_GetParticipatingNodes(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { cache := db.OverlayCache() - allNodes := []nodeselection.SelectedNode{ - addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false), - addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false), - addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false), - addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false), - addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false), - addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true), + allNodes := []nodeDisposition{ + addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false), + addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false), + addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false), + addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false), + addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false), + addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false), + addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true), } type testCase struct { OnlineWindow time.Duration - Online []nodeselection.SelectedNode - Offline []nodeselection.SelectedNode + Online []int + Offline []int } for i, tc := range []testCase{ { OnlineWindow: 1 * time.Hour, - Online: []nodeselection.SelectedNode{allNodes[0]}, - Offline: []nodeselection.SelectedNode{allNodes[1]}, + Online: []int{0, 3}, + Offline: []int{1, 4, 5}, }, { OnlineWindow: 20 * time.Hour, - Online: []nodeselection.SelectedNode{allNodes[0], allNodes[1]}, + Online: []int{0, 1, 3, 4, 5}, }, { OnlineWindow: 1 * time.Microsecond, - Offline: []nodeselection.SelectedNode{allNodes[0], allNodes[1]}, + Offline: []int{0, 1, 3, 4, 5}, }, } { - online, offline, err := cache.Reliable(ctx, tc.OnlineWindow, 0) - require.NoError(t, err) - // make the .Online attribute match expectations for this OnlineWindow - for n := range tc.Online { - tc.Online[n].Online = true + expectedNodes := make([]nodeselection.SelectedNode, 0, len(tc.Offline)+len(tc.Online)) + for _, num := range tc.Online { + selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0) + selectedNode.Online = true + expectedNodes = append(expectedNodes, selectedNode) } - for n := range tc.Offline { - tc.Offline[n].Online = false + for _, num := range tc.Offline { + selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0) + selectedNode.Online = false + expectedNodes = append(expectedNodes, selectedNode) } - require.ElementsMatch(t, tc.Online, online, "#%d", i) - require.ElementsMatch(t, tc.Offline, offline, "#%d", i) + gotNodes, err := cache.GetParticipatingNodes(ctx, tc.OnlineWindow, 0) + require.NoError(t, err) + require.ElementsMatch(t, expectedNodes, gotNodes, "#%d", i) } // test as of system time - _, _, err := cache.Reliable(ctx, 1*time.Hour, -1*time.Microsecond) + _, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond) require.NoError(t, err) }) } -func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqualified, auditSuspended, offlineSuspended, exited bool) nodeselection.SelectedNode { - selectedNode := nodeselection.SelectedNode{ - ID: testrand.NodeID(), - Address: &pb.NodeAddress{Address: address}, - LastNet: lastIPPort, - LastIPPort: lastIPPort, - CountryCode: location.Poland, - Online: online, +func nodeDispositionToSelectedNode(disp nodeDisposition, onlineWindow time.Duration) nodeselection.SelectedNode { + if disp.exited || disp.disqualified { + return nodeselection.SelectedNode{} + } + return nodeselection.SelectedNode{ + ID: disp.id, + Address: &pb.NodeAddress{Address: disp.address}, + LastNet: disp.lastIPPort, + LastIPPort: disp.lastIPPort, + CountryCode: disp.countryCode, + Exiting: disp.exiting, + Suspended: disp.auditSuspended || disp.offlineSuspended, + Online: disp.offlineInterval <= onlineWindow, + } +} + +func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, offlineInterval time.Duration, disqualified, auditSuspended, offlineSuspended, exiting, exited bool) nodeDisposition { + disp := nodeDisposition{ + id: testrand.NodeID(), + address: address, + lastIPPort: lastIPPort, + offlineInterval: offlineInterval, + countryCode: location.Poland, + disqualified: disqualified, + auditSuspended: auditSuspended, + offlineSuspended: offlineSuspended, + exiting: exiting, + exited: exited, } checkInInfo := overlay.NodeCheckInInfo{ IsUp: true, - NodeID: selectedNode.ID, - Address: &pb.NodeAddress{Address: selectedNode.Address.Address}, - LastIPPort: selectedNode.LastIPPort, - LastNet: selectedNode.LastNet, - CountryCode: selectedNode.CountryCode, + NodeID: disp.id, + Address: &pb.NodeAddress{Address: disp.address}, + LastIPPort: disp.lastIPPort, + LastNet: disp.lastIPPort, + CountryCode: disp.countryCode, Version: &pb.NodeVersion{Version: "v0.0.0"}, } - timestamp := time.Now().UTC() - if !online { - timestamp = time.Now().Add(-10 * time.Hour) - } + timestamp := time.Now().UTC().Add(-disp.offlineInterval) err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{}) require.NoError(t, err) if disqualified { - _, err := cache.DisqualifyNode(ctx, selectedNode.ID, time.Now(), overlay.DisqualificationReasonAuditFailure) + _, err := cache.DisqualifyNode(ctx, disp.id, time.Now(), overlay.DisqualificationReasonAuditFailure) require.NoError(t, err) } if auditSuspended { - require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, selectedNode.ID, time.Now())) - selectedNode.Suspended = true + require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, disp.id, time.Now())) } if offlineSuspended { - require.NoError(t, cache.TestSuspendNodeOffline(ctx, selectedNode.ID, time.Now())) - selectedNode.Suspended = true + require.NoError(t, cache.TestSuspendNodeOffline(ctx, disp.id, time.Now())) + } + + if exiting { + now := time.Now() + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: disp.id, + ExitInitiatedAt: now, + }) + require.NoError(t, err) } if exited { now := time.Now() _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ - NodeID: selectedNode.ID, + NodeID: disp.id, ExitInitiatedAt: now, ExitLoopCompletedAt: now, ExitFinishedAt: now, ExitSuccess: true, }) - selectedNode.Exiting = true require.NoError(t, err) } - return selectedNode + return disp }