Skip to content

Commit

Permalink
satellite/repair/checker: Split segment metrics by placement
Browse files Browse the repository at this point in the history
We want to track the segment metrics by placement through monkit tags to
filter out Storj Select data from our public stats API.

This commit only tackle the metrics handled by the
satelite/repair/checker package, to adjust the last metrics that we need
to classify by placement.

Change-Id: I43505bcc29389070caaa2f8fb30dd3e1c9eaa954
  • Loading branch information
ifraixedes authored and Storj Robot committed Mar 19, 2024
1 parent 833bce5 commit 6ec8420
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 35 deletions.
90 changes: 55 additions & 35 deletions satellite/repair/checker/observer.go
Expand Up @@ -28,8 +28,10 @@ import (
"storj.io/storj/satellite/repair/queue"
)

var _ rangedloop.Observer = (*Observer)(nil)
var _ rangedloop.Partial = (*observerFork)(nil)
var (
_ rangedloop.Observer = (*Observer)(nil)
_ rangedloop.Partial = (*observerFork)(nil)
)

// Observer implements the ranged loop Observer interface.
//
Expand All @@ -49,7 +51,7 @@ type Observer struct {

// the following are reset on each iteration
startTime time.Time
TotalStats aggregateStats
TotalStats aggregateStatsPlacements

mu sync.Mutex
statsCollector map[storj.RedundancyScheme]*observerRSStats
Expand Down Expand Up @@ -141,7 +143,8 @@ func (observer *Observer) Start(ctx context.Context, startTime time.Time) (err e
defer mon.Task()(&ctx)(&err)

observer.startTime = startTime
observer.TotalStats = aggregateStats{}
// Reuse the allocated slice.
observer.TotalStats = observer.TotalStats[:0]

return nil
}
Expand Down Expand Up @@ -188,24 +191,30 @@ func (observer *Observer) Finish(ctx context.Context) (err error) {

observer.collectAggregates()

mon.IntVal("remote_files_checked").Observe(observer.TotalStats.objectsChecked) //mon:locked
mon.IntVal("remote_segments_checked").Observe(observer.TotalStats.remoteSegmentsChecked) //mon:locked
mon.IntVal("remote_segments_failed_to_check").Observe(observer.TotalStats.remoteSegmentsFailedToCheck) //mon:locked
mon.IntVal("remote_segments_needing_repair").Observe(observer.TotalStats.remoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("new_remote_segments_needing_repair").Observe(observer.TotalStats.newRemoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("remote_segments_lost").Observe(observer.TotalStats.remoteSegmentsLost) //mon:locked
mon.IntVal("remote_files_lost").Observe(int64(len(observer.TotalStats.objectsLost))) //mon:locked
mon.IntVal("remote_segments_over_threshold_1").Observe(observer.TotalStats.remoteSegmentsOverThreshold[0]) //mon:locked
mon.IntVal("remote_segments_over_threshold_2").Observe(observer.TotalStats.remoteSegmentsOverThreshold[1]) //mon:locked
mon.IntVal("remote_segments_over_threshold_3").Observe(observer.TotalStats.remoteSegmentsOverThreshold[2]) //mon:locked
mon.IntVal("remote_segments_over_threshold_4").Observe(observer.TotalStats.remoteSegmentsOverThreshold[3]) //mon:locked
mon.IntVal("remote_segments_over_threshold_5").Observe(observer.TotalStats.remoteSegmentsOverThreshold[4]) //mon:locked
mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //mon:locked
allUnhealthy := observer.TotalStats.remoteSegmentsNeedingRepair + observer.TotalStats.remoteSegmentsFailedToCheck
allChecked := observer.TotalStats.remoteSegmentsChecked
var allUnhealthy, allChecked int64
for p, s := range observer.TotalStats {
t := monkit.NewSeriesTag("placement", strconv.FormatUint(uint64(p), 10))

mon.IntVal("remote_files_checked", t).Observe(s.objectsChecked) //mon:locked
mon.IntVal("remote_segments_checked", t).Observe(s.remoteSegmentsChecked) //mon:locked
mon.IntVal("remote_segments_failed_to_check", t).Observe(s.remoteSegmentsFailedToCheck) //mon:locked
mon.IntVal("remote_segments_needing_repair", t).Observe(s.remoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("new_remote_segments_needing_repair", t).Observe(s.newRemoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("remote_segments_lost", t).Observe(s.remoteSegmentsLost) //mon:locked
mon.IntVal("remote_files_lost", t).Observe(int64(len(s.objectsLost))) //mon:locked
mon.IntVal("remote_segments_over_threshold_1", t).Observe(s.remoteSegmentsOverThreshold[0]) //mon:locked
mon.IntVal("remote_segments_over_threshold_2", t).Observe(s.remoteSegmentsOverThreshold[1]) //mon:locked
mon.IntVal("remote_segments_over_threshold_3", t).Observe(s.remoteSegmentsOverThreshold[2]) //mon:locked
mon.IntVal("remote_segments_over_threshold_4", t).Observe(s.remoteSegmentsOverThreshold[3]) //mon:locked
mon.IntVal("remote_segments_over_threshold_5", t).Observe(s.remoteSegmentsOverThreshold[4]) //mon:locked

allUnhealthy = s.remoteSegmentsNeedingRepair + s.remoteSegmentsFailedToCheck
allChecked = s.remoteSegmentsChecked
}

mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //mon:locked
allHealthy := allChecked - allUnhealthy
mon.FloatVal("remote_segments_healthy_percentage").Observe(100 * float64(allHealthy) / float64(allChecked)) //mon:locked

return nil
}

Expand Down Expand Up @@ -257,7 +266,7 @@ type observerFork struct {
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
totalStats aggregateStats
totalStats aggregateStatsPlacements

// reuse those slices to optimize memory usage
nodeIDs []storj.NodeID
Expand Down Expand Up @@ -322,7 +331,6 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg
var (
// initialize monkit metrics once for better performance.
segmentTotalCountIntVal = mon.IntVal("checker_segment_total_count") //mon:locked
segmentHealthyCountIntVal = mon.IntVal("checker_segment_healthy_count") //mon:locked
segmentClumpedCountIntVal = mon.IntVal("checker_segment_clumped_count") //mon:locked
segmentExitingCountIntVal = mon.IntVal("checker_segment_exiting_count")
segmentAgeIntVal = mon.IntVal("checker_segment_age") //mon:locked
Expand All @@ -333,10 +341,15 @@ var (
)

func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segment) (err error) {
// Grow the fork.totalStats if this placement doesn't fit.
if l := int(segment.Placement+1) - len(fork.totalStats); l > 0 {
fork.totalStats = append(fork.totalStats, make([]aggregateStats, l)...)
}

if segment.Inline() {
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
fork.lastStreamID = segment.StreamID
fork.totalStats.objectsChecked++
fork.totalStats[segment.Placement].objectsChecked++
}

return nil
Expand All @@ -351,10 +364,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
fork.lastStreamID = segment.StreamID
stats.iterationAggregates.objectsChecked++
fork.totalStats.objectsChecked++
fork.totalStats[segment.Placement].objectsChecked++
}

fork.totalStats.remoteSegmentsChecked++
fork.totalStats[segment.Placement].remoteSegmentsChecked++
stats.iterationAggregates.remoteSegmentsChecked++

// ensure we get values, even if only zero values, so that redash can have an alert based on this
Expand Down Expand Up @@ -384,7 +397,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}
selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, fork.nodeIDs, fork.nodes)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
fork.totalStats[segment.Placement].remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting node information for pieces: %w", err)
}
Expand All @@ -395,7 +408,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
segmentTotalCountIntVal.Observe(int64(len(pieces)))
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))

segmentHealthyCountIntVal.Observe(int64(numHealthy))
mon.IntVal("checker_segment_healthy_count", monkit.NewSeriesTag(
"placement", strconv.FormatUint(uint64(segment.Placement), 10),
)).Observe(int64(numHealthy)) //mon:locked

stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))

segmentClumpedCountIntVal.Observe(int64(piecesCheck.Clumped.Count()))
Expand Down Expand Up @@ -423,7 +439,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || piecesCheck.ForcingRepair.Count() > 0 {
injuredSegmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++
fork.totalStats[segment.Placement].remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
err := fork.repairQueue.Insert(ctx, &queue.InjuredSegment{
StreamID: segment.StreamID,
Expand All @@ -434,7 +450,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}, func() {
// Counters are increased after the queue has determined
// that the segment wasn't already queued for repair.
fork.totalStats.newRemoteSegmentsNeedingRepair++
fork.totalStats[segment.Placement].newRemoteSegmentsNeedingRepair++
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
})
if err != nil {
Expand All @@ -444,8 +460,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme

// monitor irreparable segments
if piecesCheck.Retrievable.Count() < required {
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
if !slices.Contains(fork.totalStats[segment.Placement].objectsLost, segment.StreamID) {
fork.totalStats[segment.Placement].objectsLost = append(
fork.totalStats[segment.Placement].objectsLost, segment.StreamID,
)
}

if !slices.Contains(stats.iterationAggregates.objectsLost, segment.StreamID) {
Expand All @@ -467,7 +485,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
segmentTimeUntilIrreparableIntVal.Observe(int64(segmentAge.Seconds()))
stats.segmentStats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds()))

fork.totalStats.remoteSegmentsLost++
fork.totalStats[segment.Placement].remoteSegmentsLost++
stats.iterationAggregates.remoteSegmentsLost++

segmentsBelowMinReqCounter.Inc(1)
Expand All @@ -493,12 +511,14 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
fork.log.Info("segment needs repair only because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets))
}
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(
fork.totalStats[segment.Placement].remoteSegmentsOverThreshold,
)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range fork.totalStats.remoteSegmentsOverThreshold {
for i := range fork.totalStats[segment.Placement].remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
fork.totalStats.remoteSegmentsOverThreshold[i]++
fork.totalStats[segment.Placement].remoteSegmentsOverThreshold[i]++
break
}
}
Expand Down
17 changes: 17 additions & 0 deletions satellite/repair/checker/observerstats.go
Expand Up @@ -150,6 +150,23 @@ func (stats *observerRSStats) collectAggregates() {
stats.iterationAggregates = aggregateStats{}
}

// aggregateStatsPlacements aggregate stats per placement.
// storj.PlacementConstraint values are the indexes of the slice.
type aggregateStatsPlacements []aggregateStats

func (ap *aggregateStatsPlacements) combine(stats aggregateStatsPlacements) {
lenStats := len(stats)
if lenStats > len(*ap) {
// We don't append directly the stats which ap doesn't have because aggregateStats.objectsLost
// is a slice and ap must not refer to the same slice to avoid unnoticed changes.
*ap = append(*ap, make([]aggregateStats, lenStats-len(*ap))...)
}

for i := 0; i < lenStats; i++ {
(*ap)[i].combine(stats[i])
}
}

// aggregateStats tallies data over the full checker iteration.
type aggregateStats struct {
objectsChecked int64
Expand Down

0 comments on commit 6ec8420

Please sign in to comment.