Skip to content

Commit

Permalink
satellite/repair/checker: optimize processing, part 3
Browse files Browse the repository at this point in the history
ClassifySegmentPieces uses custom set implementation instead map.

Side note, for custom set implementation I also checked int8 bit set but
it didn't give better performance so I used simpler implementation.

Benchmark results (compared against part 2 optimization change):
name                                       old time/op    new time/op    delta
RemoteSegment/healthy_segment-8    21.7µs ± 8%    15.4µs ±16%  -29.38%  (p=0.008 n=5+5)

name                                       old alloc/op   new alloc/op   delta
RemoteSegment/healthy_segment-8    7.41kB ± 0%    1.87kB ± 0%  -74.83%  (p=0.000 n=5+4)

name                                       old allocs/op  new allocs/op  delta
RemoteSegment/healthy_segment-8       150 ± 0%       130 ± 0%  -13.33%  (p=0.008 n=5+5)

Change-Id: I21feca9ec6ac0a2558ac5ce8894451c54f69e52d
  • Loading branch information
mniewrzal authored and Storj Robot committed Oct 16, 2023
1 parent c3fbac2 commit 0eaf431
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 118 deletions.
22 changes: 11 additions & 11 deletions satellite/repair/checker/observer.go
Expand Up @@ -390,18 +390,18 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck,
fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs)

numHealthy := len(piecesCheck.Healthy)
numHealthy := piecesCheck.Healthy.Size()
segmentTotalCountIntVal.Observe(int64(len(pieces)))
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))

segmentHealthyCountIntVal.Observe(int64(numHealthy))
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
segmentClumpedCountIntVal.Observe(int64(len(piecesCheck.Clumped)))
stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped)))
segmentExitingCountIntVal.Observe(int64(len(piecesCheck.Exiting)))
stats.segmentStats.segmentExitingCount.Observe(int64(len(piecesCheck.Exiting)))
segmentOffPlacementCountIntVal.Observe(int64(len(piecesCheck.OutOfPlacement)))
stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement)))
segmentClumpedCountIntVal.Observe(int64(piecesCheck.Clumped.Size()))
stats.segmentStats.segmentClumpedCount.Observe(int64(piecesCheck.Clumped.Size()))
segmentExitingCountIntVal.Observe(int64(piecesCheck.Exiting.Size()))
stats.segmentStats.segmentExitingCount.Observe(int64(piecesCheck.Exiting.Size()))
segmentOffPlacementCountIntVal.Observe(int64(piecesCheck.OutOfPlacement.Size()))
stats.segmentStats.segmentOffPlacementCount.Observe(int64(piecesCheck.OutOfPlacement.Size()))

segmentAge := time.Since(segment.CreatedAt)
segmentAgeIntVal.Observe(int64(segmentAge.Seconds()))
Expand All @@ -417,7 +417,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || len(piecesCheck.ForcingRepair) > 0 {
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || piecesCheck.ForcingRepair.Size() > 0 {
injuredSegmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++
Expand All @@ -440,7 +440,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}

// monitor irreparable segments
if len(piecesCheck.Retrievable) < required {
if piecesCheck.Retrievable.Size() < required {
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
}
Expand Down Expand Up @@ -472,13 +472,13 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme

var missingNodes []string
for _, piece := range pieces {
if _, isMissing := piecesCheck.Missing[piece.Number]; isMissing {
if piecesCheck.Missing.Contains(int(piece.Number)) {
missingNodes = append(missingNodes, piece.StorageNode.String())
}
}
fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unavailable node IDs", strings.Join(missingNodes, ",")))
} else if len(piecesCheck.Clumped) > 0 && len(piecesCheck.Healthy)+len(piecesCheck.Clumped) > repairThreshold && len(piecesCheck.ForcingRepair) == 0 {
} else if piecesCheck.Clumped.Size() > 0 && piecesCheck.Healthy.Size()+piecesCheck.Clumped.Size() > repairThreshold && piecesCheck.ForcingRepair.Size() == 0 {
// This segment is to be repaired because of clumping (it wouldn't need repair yet
// otherwise). Produce a brief report of where the clumping occurred so that we have
// a better understanding of the cause.
Expand Down
157 changes: 109 additions & 48 deletions satellite/repair/classification.go
Expand Up @@ -4,8 +4,6 @@
package repair

import (
"golang.org/x/exp/maps"

"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/storj/satellite/metabase"
Expand All @@ -22,39 +20,39 @@ type PiecesCheckResult struct {

// Missing is a set of Piece Numbers which are to be considered as lost and irretrievable.
// (They reside on offline/disqualified/unknown nodes.)
Missing map[uint16]struct{}
Missing IntSet
// Retrievable contains all Piece Numbers that are retrievable; that is, all piece numbers
// from the segment that are NOT in Missing.
Retrievable map[uint16]struct{}
Retrievable IntSet

// Suspended is a set of Piece Numbers which reside on nodes which are suspended.
Suspended map[uint16]struct{}
Suspended IntSet
// Clumped is a set of Piece Numbers which are to be considered unhealthy because of IP
// clumping. (If DoDeclumping is disabled, this set will be empty.)
Clumped map[uint16]struct{}
Clumped IntSet
// Exiting is a set of Piece Numbers which are considered unhealthy because the node on
// which they reside has initiated graceful exit.
Exiting map[uint16]struct{}
Exiting IntSet
// OutOfPlacement is a set of Piece Numbers which are unhealthy because of placement rules.
// (If DoPlacementCheck is disabled, this set will be empty.)
OutOfPlacement map[uint16]struct{}
OutOfPlacement IntSet
// InExcludedCountry is a set of Piece Numbers which are unhealthy because they are in
// Excluded countries.
InExcludedCountry map[uint16]struct{}
InExcludedCountry IntSet

// ForcingRepair is the set of pieces which force a repair operation for this segment (that
// includes, currently, only pieces in OutOfPlacement).
ForcingRepair map[uint16]struct{}
ForcingRepair IntSet
// Unhealthy contains all Piece Numbers which are in Missing OR Suspended OR Clumped OR
// Exiting OR OutOfPlacement OR InExcludedCountry.
Unhealthy map[uint16]struct{}
Unhealthy IntSet
// UnhealthyRetrievable is the set of pieces that are "unhealthy-but-retrievable". That is,
// pieces that are in Unhealthy AND Retrievable.
UnhealthyRetrievable map[uint16]struct{}
UnhealthyRetrievable IntSet
// Healthy contains all Piece Numbers from the segment which are not in Unhealthy.
// (Equivalently: all Piece Numbers from the segment which are NOT in Missing OR
// Suspended OR Clumped OR Exiting OR OutOfPlacement OR InExcludedCountry).
Healthy map[uint16]struct{}
Healthy IntSet
}

// ClassifySegmentPieces classifies the pieces of a segment into the categories
Expand All @@ -64,12 +62,20 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) {
result.ExcludeNodeIDs = excludeNodeIDs

maxPieceNum := 0
for _, piece := range pieces {
if int(piece.Number) > maxPieceNum {
maxPieceNum = int(piece.Number)
}
}
maxPieceNum++

// check excluded countries and remove online nodes from missing pieces
result.Missing = make(map[uint16]struct{})
result.Suspended = make(map[uint16]struct{})
result.Exiting = make(map[uint16]struct{})
result.Retrievable = make(map[uint16]struct{})
result.InExcludedCountry = make(map[uint16]struct{})
result.Missing = NewIntSet(maxPieceNum)
result.Suspended = NewIntSet(maxPieceNum)
result.Exiting = NewIntSet(maxPieceNum)
result.Retrievable = NewIntSet(maxPieceNum)
result.InExcludedCountry = NewIntSet(maxPieceNum)
for index, nodeRecord := range nodes {
pieceNum := pieces[index].Number

Expand All @@ -80,21 +86,21 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
if nodeRecord.ID.IsZero() || !nodeRecord.Online {
// node ID was not found, or the node is disqualified or exited,
// or it is offline
result.Missing[pieceNum] = struct{}{}
result.Missing.Include(int(pieceNum))
} else {
// node is expected to be online and receiving requests.
result.Retrievable[pieceNum] = struct{}{}
result.Retrievable.Include(int(pieceNum))
}

if nodeRecord.Suspended {
result.Suspended[pieceNum] = struct{}{}
result.Suspended.Include(int(pieceNum))
}
if nodeRecord.Exiting {
result.Exiting[pieceNum] = struct{}{}
result.Exiting.Include(int(pieceNum))
}

if _, excluded := excludedCountryCodes[nodeRecord.CountryCode]; excluded {
result.InExcludedCountry[pieceNum] = struct{}{}
result.InExcludedCountry.Include(int(pieceNum))
}
}

Expand All @@ -103,7 +109,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
// to be considered retrievable but unhealthy.

lastNets := make(map[string]struct{}, len(pieces))
result.Clumped = make(map[uint16]struct{})
result.Clumped = NewIntSet(maxPieceNum)

collectClumpedPieces := func(onlineness bool) {
for index, nodeRecord := range nodes {
Expand All @@ -117,7 +123,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
_, ok := lastNets[nodeRecord.LastNet]
if ok {
// this LastNet was already seen
result.Clumped[pieceNum] = struct{}{}
result.Clumped.Include(int(pieceNum))
} else {
// add to the list of seen LastNets
lastNets[nodeRecord.LastNet] = struct{}{}
Expand All @@ -133,7 +139,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
if doPlacementCheck {
// mark all pieces that are out of placement.

result.OutOfPlacement = make(map[uint16]struct{})
result.OutOfPlacement = NewIntSet(maxPieceNum)
for index, nodeRecord := range nodes {
if nodeRecord.ID.IsZero() {
continue
Expand All @@ -142,37 +148,92 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
continue
}
pieceNum := pieces[index].Number
result.OutOfPlacement[pieceNum] = struct{}{}
result.OutOfPlacement.Include(int(pieceNum))
}
}

// ForcingRepair = OutOfPlacement only, for now
result.ForcingRepair = make(map[uint16]struct{})
maps.Copy(result.ForcingRepair, result.OutOfPlacement)

// Unhealthy = Missing OR Suspended OR Clumped OR OutOfPlacement OR InExcludedCountry
result.Unhealthy = make(map[uint16]struct{})
maps.Copy(result.Unhealthy, result.Missing)
maps.Copy(result.Unhealthy, result.Suspended)
maps.Copy(result.Unhealthy, result.Clumped)
maps.Copy(result.Unhealthy, result.Exiting)
maps.Copy(result.Unhealthy, result.OutOfPlacement)
maps.Copy(result.Unhealthy, result.InExcludedCountry)
result.ForcingRepair = copyIntSet(NewIntSet(maxPieceNum),
result.OutOfPlacement,
)

// Unhealthy = Missing OR Suspended OR Clumped OR Exiting OR OutOfPlacement OR InExcludedCountry
result.Unhealthy = copyIntSet(NewIntSet(maxPieceNum),
result.Missing,
result.Suspended,
result.Clumped,
result.Exiting,
result.OutOfPlacement,
result.InExcludedCountry,
)

// UnhealthyRetrievable = Unhealthy AND Retrievable
result.UnhealthyRetrievable = make(map[uint16]struct{})
for pieceNum := range result.Unhealthy {
if _, isRetrievable := result.Retrievable[pieceNum]; isRetrievable {
result.UnhealthyRetrievable[pieceNum] = struct{}{}
}
}

// Healthy = NOT Unhealthy
result.Healthy = make(map[uint16]struct{})
result.UnhealthyRetrievable = NewIntSet(maxPieceNum)
result.Healthy = NewIntSet(maxPieceNum)
for _, piece := range pieces {
if _, found := result.Unhealthy[piece.Number]; !found {
result.Healthy[piece.Number] = struct{}{}
if !result.Unhealthy.Contains(int(piece.Number)) {
result.Healthy.Include(int(piece.Number))
} else if result.Retrievable.Contains(int(piece.Number)) {
result.UnhealthyRetrievable.Include(int(piece.Number))
}
}

return result
}

func copyIntSet(destination IntSet, sources ...IntSet) IntSet {
for element := 0; element < destination.Cap(); element++ {
for _, sources := range sources {
if sources.Contains(element) {
destination.Include(element)
break
}
}
}
return destination
}

// IntSet set of pieces.
type IntSet struct {
bits []bool
size int
}

// NewIntSet creates new int set.
func NewIntSet(n int) IntSet {
return IntSet{
bits: make([]bool, n),
}
}

// Contains returns true if set includes int value.
func (i IntSet) Contains(value int) bool {
if value >= cap(i.bits) {
return false
}
return i.bits[value]
}

// Include includes int value into set.
// Ignores values above set size.
func (i *IntSet) Include(value int) {
i.bits[value] = true
i.size++
}

// Remove removes int value from set.
func (i *IntSet) Remove(value int) {
i.bits[value] = true
i.size--
}

// Size returns size of set.
func (i IntSet) Size() int {
return i.size
}

// Cap returns set capacity.
func (i IntSet) Cap() int {
return cap(i.bits)
}
46 changes: 23 additions & 23 deletions satellite/repair/classification_test.go
Expand Up @@ -45,10 +45,10 @@ func TestClassifySegmentPieces(t *testing.T) {
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, parsed.CreateFilters(0), piecesToNodeIDs(pieces))

require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 0, len(result.UnhealthyRetrievable))
require.Equal(t, 0, result.Missing.Size())
require.Equal(t, 0, result.Clumped.Size())
require.Equal(t, 0, result.OutOfPlacement.Size())
require.Equal(t, 0, result.UnhealthyRetrievable.Size())
})

t.Run("out of placement", func(t *testing.T) {
Expand All @@ -71,11 +71,11 @@ func TestClassifySegmentPieces(t *testing.T) {
pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))

require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 0, result.Missing.Size())
require.Equal(t, 0, result.Clumped.Size())
// 1,2,3 are in Germany instead of GB
require.Equal(t, 3, len(result.OutOfPlacement))
require.Equal(t, 3, len(result.UnhealthyRetrievable))
require.Equal(t, 3, result.OutOfPlacement.Size())
require.Equal(t, 3, result.UnhealthyRetrievable.Size())
})

t.Run("out of placement and offline", func(t *testing.T) {
Expand All @@ -95,11 +95,11 @@ func TestClassifySegmentPieces(t *testing.T) {
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 5, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 10, len(result.OutOfPlacement))
require.Equal(t, 5, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 5, result.Missing.Size())
require.Equal(t, 0, result.Clumped.Size())
require.Equal(t, 10, result.OutOfPlacement.Size())
require.Equal(t, 5, result.UnhealthyRetrievable.Size())
numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size()
require.Equal(t, 0, numHealthy)

})
Expand All @@ -118,11 +118,11 @@ func TestClassifySegmentPieces(t *testing.T) {
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0), piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 2, len(result.Missing))
require.Equal(t, 3, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 2, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 2, result.Missing.Size())
require.Equal(t, 3, result.Clumped.Size())
require.Equal(t, 0, result.OutOfPlacement.Size())
require.Equal(t, 2, result.UnhealthyRetrievable.Size())
numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size()
require.Equal(t, 3, numHealthy)

})
Expand All @@ -145,11 +145,11 @@ func TestClassifySegmentPieces(t *testing.T) {
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10), piecesToNodeIDs(pieces))

// offline nodes
require.Equal(t, 2, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 0, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 2, result.Missing.Size())
require.Equal(t, 0, result.Clumped.Size())
require.Equal(t, 0, result.OutOfPlacement.Size())
require.Equal(t, 0, result.UnhealthyRetrievable.Size())
numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size()
require.Equal(t, 5, numHealthy)

})
Expand Down

0 comments on commit 0eaf431

Please sign in to comment.