Skip to content

Commit

Permalink
satellite/bloomfilter: don't create BF for disqualified nodes
Browse files Browse the repository at this point in the history
Currently we have large set of nodes that are already disqualified and
we are not sending bloom filters to them. The issue is that we are still
generating filters for them while garbage collection process. Even if
we have only segment with one piece which was stored on this node. This
consumes additional memory and processing powers.

This change is changing logic behind `AllPieceCounts` (renamed to
ActiveNodesPieceCounts) to return piece count for all nodes except disqualified one (even with piece count = 0). With this change we can
modify GC observer to skip nodes that where not returned by
ActiveNodesPieceCounts.

Change-Id: Ic75159135abe535084d8aeee560bb801a4a03e17
  • Loading branch information
mniewrzal authored and Storj Robot committed Nov 22, 2023
1 parent 07cb8dc commit 573ce71
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 61 deletions.
12 changes: 9 additions & 3 deletions satellite/gc/bloomfilter/observer.go
Expand Up @@ -67,7 +67,7 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
obs.log.Debug("collecting bloom filters started")

// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx)
lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
Expand Down Expand Up @@ -197,8 +197,14 @@ func (fork *observerFork) add(nodeID storj.NodeID, pieceID storj.PieceID) {
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := fork.config.InitialPieces
if pieceCounts := fork.pieceCounts[nodeID]; pieceCounts > 0 {
numPieces = pieceCounts
if pieceCounts, found := fork.pieceCounts[nodeID]; found {
if pieceCounts > 0 {
numPieces = pieceCounts
}
} else {
// node was not in pieceCounts which means it was disqalified
// and we won't generate bloom filter for it
return
}

hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, fork.config.FalsePositiveRate, 2*memory.MiB)
Expand Down
12 changes: 9 additions & 3 deletions satellite/gc/bloomfilter/observer_sync.go
Expand Up @@ -63,7 +63,7 @@ func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err er
obs.log.Debug("collecting bloom filters started")

// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx)
lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
Expand Down Expand Up @@ -147,8 +147,14 @@ func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) {
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := obs.config.InitialPieces
if pieceCounts := obs.lastPieceCounts[nodeID]; pieceCounts > 0 {
numPieces = pieceCounts
if pieceCounts, found := obs.lastPieceCounts[nodeID]; found {
if pieceCounts > 0 {
numPieces = pieceCounts
}
} else {
// node was not in lastPieceCounts which means it was disqalified
// and we won't generate bloom filter for it
return
}

hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB)
Expand Down
49 changes: 37 additions & 12 deletions satellite/gc/bloomfilter/observer_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"fmt"
"io"
"slices"
"sort"
"strconv"
"testing"
Expand All @@ -18,6 +19,7 @@ import (

"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
Expand All @@ -26,6 +28,7 @@ import (
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
"storj.io/storj/satellite/overlay"
"storj.io/uplink"
)

Expand All @@ -50,16 +53,21 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
defer ctx.Check(project.Close)

type testCase struct {
Bucket string
ZipBatchSize int
ExpectedPacks int
Bucket string
ZipBatchSize int
ExpectedPacks int
DisqualifiedNodes []storj.NodeID
}

testCases := []testCase{
{"bloomfilters-bucket-1", 1, 7},
{"bloomfilters-bucket-2", 2, 4},
{"bloomfilters-bucket-7", 7, 1},
{"bloomfilters-bucket-100", 100, 1},
{"bloomfilters-bucket-1", 1, 7, []storj.NodeID{}},
{"bloomfilters-bucket-2", 2, 4, []storj.NodeID{}},
{"bloomfilters-bucket-2-dq-nodes", 2, 3, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[3].ID(),
}},
{"bloomfilters-bucket-7", 7, 1, []storj.NodeID{}},
{"bloomfilters-bucket-100", 100, 1, []storj.NodeID{}},
}

for _, tc := range testCases {
Expand All @@ -78,6 +86,24 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
}

expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
_, err := planet.Satellites[0].DB.Testing().RawDB().ExecContext(ctx, "UPDATE nodes SET disqualified = null WHERE id = $1", node.ID())
require.NoError(t, err)

expectedNodeIds = append(expectedNodeIds, node.ID().String())
}

for _, nodeID := range tc.DisqualifiedNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.DisqualifyNode(ctx, nodeID, overlay.DisqualificationReasonAuditFailure))

if index := slices.Index(expectedNodeIds, nodeID.String()); index != -1 {
expectedNodeIds = slices.Delete(expectedNodeIds, index, index+1)
}
}

sort.Strings(expectedNodeIds)

for _, observer := range observers {
name := fmt.Sprintf("%s-%T", tc.Bucket, observer)
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -134,6 +160,10 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())

nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())

nodeID, err := storj.NodeIDFromBytes(pbRetainInfo.StorageNodeId.Bytes())
require.NoError(t, err)
require.NotContains(t, tc.DisqualifiedNodes, nodeID)
}

count++
Expand All @@ -149,11 +179,6 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
sort.Strings(packNames)
require.Equal(t, expectedPackNames, packNames)

expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
sort.Strings(expectedNodeIds)
sort.Strings(nodeIds)
require.Equal(t, expectedNodeIds, nodeIds)
})
Expand Down
8 changes: 4 additions & 4 deletions satellite/gc/piecetracker/observer_test.go
Expand Up @@ -35,9 +35,9 @@ func TestObserverPieceTracker(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// ensure that the piece counts are empty
pieceCounts, err := planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err := planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(pieceCounts))
require.Equal(t, 4, len(pieceCounts))

// Setup: create 50KiB of data for the uplink to upload
testdata := testrand.Bytes(50 * memory.KiB)
Expand All @@ -51,7 +51,7 @@ func TestObserverPieceTracker(t *testing.T) {
require.NoError(t, err)

// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)

Expand All @@ -71,7 +71,7 @@ func TestObserverPieceTracker(t *testing.T) {
require.NoError(t, err)

// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)

Expand Down
58 changes: 33 additions & 25 deletions satellite/overlay/piececount_test.go
Expand Up @@ -24,56 +24,64 @@ func TestDB_PieceCounts(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
overlaydb := db.OverlayCache()

type TestNode struct {
ID storj.NodeID
PieceCount int64
}
expectedNodePieces := make(map[storj.NodeID]int64, 100)

nodes := make([]TestNode, 100)
for i := range nodes {
nodes[i].ID = testrand.NodeID()
nodes[i].PieceCount = int64(math.Pow10(i + 1))
for i := 0; i < 100; i++ {
expectedNodePieces[testrand.NodeID()] = int64(math.Pow10(i + 1))
}

for i, node := range nodes {
var nodeToDisqualify storj.NodeID

i := 0
for nodeID := range expectedNodePieces {
addr := fmt.Sprintf("127.0.%d.0:8080", i)
lastNet := fmt.Sprintf("127.0.%d", i)
d := overlay.NodeCheckInInfo{
NodeID: node.ID,
NodeID: nodeID,
Address: &pb.NodeAddress{Address: addr},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
IsUp: true,
}
err := overlaydb.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
i++

nodeToDisqualify = nodeID
}

// check that they are initialized to zero
initialCounts, err := overlaydb.AllPieceCounts(ctx)
initialCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.Empty(t, initialCounts)
// TODO: make AllPieceCounts return results for all nodes,
// since it will keep the logic slightly clearer.

// update counts
counts := make(map[storj.NodeID]int64)
for _, node := range nodes {
counts[node.ID] = node.PieceCount
require.Equal(t, len(expectedNodePieces), len(initialCounts))
for nodeID := range expectedNodePieces {
pieceCount, found := initialCounts[nodeID]
require.True(t, found)
require.Zero(t, pieceCount)
}
err = overlaydb.UpdatePieceCounts(ctx, counts)

err = overlaydb.UpdatePieceCounts(ctx, expectedNodePieces)
require.NoError(t, err)

// fetch new counts
updatedCounts, err := overlaydb.AllPieceCounts(ctx)
updatedCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)

// verify values
for _, node := range nodes {
count, ok := updatedCounts[node.ID]
for nodeID, pieceCount := range expectedNodePieces {
count, ok := updatedCounts[nodeID]
require.True(t, ok)
require.Equal(t, count, node.PieceCount)
require.Equal(t, pieceCount, count)
}

// disqualify one node so it won't be returned by ActiveNodesPieceCounts
_, err = overlaydb.DisqualifyNode(ctx, nodeToDisqualify, time.Now(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)

pieceCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.NotContains(t, pieceCounts, nodeToDisqualify)
})
}

Expand Down Expand Up @@ -121,7 +129,7 @@ func BenchmarkDB_PieceCounts(b *testing.B) {

b.Run("All", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := overlaydb.AllPieceCounts(ctx)
_, err := overlaydb.ActiveNodesPieceCounts(ctx)
if err != nil {
b.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions satellite/overlay/service.go
Expand Up @@ -80,8 +80,9 @@ type DB interface {
// SetAllContainedNodes updates the contained field for all nodes, as necessary.
SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error)

// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
// ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db.
// Returns only pieces for nodes that are not disqualified.
ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
// UpdatePieceCounts sets the piece count field for the given node IDs.
UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int64) (err error)

Expand Down
2 changes: 1 addition & 1 deletion satellite/overlay/uploadselection_test.go
Expand Up @@ -814,7 +814,7 @@ func (m *mockdb) SetAllContainedNodes(ctx context.Context, containedNodes []stor
}

// AllPieceCounts satisfies nodeevents.DB interface.
func (m *mockdb) AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) {
func (m *mockdb) ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion satellite/satellitedb/dbx/node.dbx
Expand Up @@ -167,7 +167,7 @@ read paged (

read all (
select node.id node.piece_count
where node.piece_count != 0
where node.disqualified = null
)

// node_api_version is a table for storing the supported API.
Expand Down
10 changes: 5 additions & 5 deletions satellite/satellitedb/dbx/satellitedb.dbx.go
Expand Up @@ -15096,11 +15096,11 @@ func (obj *pgxImpl) Paged_Node(ctx context.Context,

}

func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)

var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL")

var __values []interface{}

Expand Down Expand Up @@ -23469,11 +23469,11 @@ func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context,

}

func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)

var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL")

var __values []interface{}

Expand Down Expand Up @@ -28896,7 +28896,7 @@ type Methods interface {
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)

All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error)

All_Project(ctx context.Context) (
Expand Down
11 changes: 6 additions & 5 deletions satellite/satellitedb/overlaycache.go
Expand Up @@ -757,14 +757,15 @@ func (cache *overlaycache) TestUnsuspendNodeUnknownAudit(ctx context.Context, no
return nil
}

// AllPieceCounts returns a map of node IDs to piece counts from the db.
// ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db. Returns only pieces for
// nodes that are not disqualified.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
func (cache *overlaycache) ActiveNodesPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err)

// NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node
// ID and piece count from the nodes table where piece count is not zero.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
// NB: `All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null` selects node
// ID and piece count from the nodes which are not disqualified.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
Expand Down

0 comments on commit 573ce71

Please sign in to comment.