diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index 56ee13311202..cd1ea6487510 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -28,8 +28,10 @@ import ( "storj.io/storj/satellite/repair/queue" ) -var _ rangedloop.Observer = (*Observer)(nil) -var _ rangedloop.Partial = (*observerFork)(nil) +var ( + _ rangedloop.Observer = (*Observer)(nil) + _ rangedloop.Partial = (*observerFork)(nil) +) // Observer implements the ranged loop Observer interface. // @@ -49,7 +51,7 @@ type Observer struct { // the following are reset on each iteration startTime time.Time - TotalStats aggregateStats + TotalStats aggregateStatsPlacements mu sync.Mutex statsCollector map[storj.RedundancyScheme]*observerRSStats @@ -141,7 +143,8 @@ func (observer *Observer) Start(ctx context.Context, startTime time.Time) (err e defer mon.Task()(&ctx)(&err) observer.startTime = startTime - observer.TotalStats = aggregateStats{} + // Reuse the allocated slice. + observer.TotalStats = observer.TotalStats[:0] return nil } @@ -188,24 +191,30 @@ func (observer *Observer) Finish(ctx context.Context) (err error) { observer.collectAggregates() - mon.IntVal("remote_files_checked").Observe(observer.TotalStats.objectsChecked) //mon:locked - mon.IntVal("remote_segments_checked").Observe(observer.TotalStats.remoteSegmentsChecked) //mon:locked - mon.IntVal("remote_segments_failed_to_check").Observe(observer.TotalStats.remoteSegmentsFailedToCheck) //mon:locked - mon.IntVal("remote_segments_needing_repair").Observe(observer.TotalStats.remoteSegmentsNeedingRepair) //mon:locked - mon.IntVal("new_remote_segments_needing_repair").Observe(observer.TotalStats.newRemoteSegmentsNeedingRepair) //mon:locked - mon.IntVal("remote_segments_lost").Observe(observer.TotalStats.remoteSegmentsLost) //mon:locked - mon.IntVal("remote_files_lost").Observe(int64(len(observer.TotalStats.objectsLost))) //mon:locked - mon.IntVal("remote_segments_over_threshold_1").Observe(observer.TotalStats.remoteSegmentsOverThreshold[0]) //mon:locked - mon.IntVal("remote_segments_over_threshold_2").Observe(observer.TotalStats.remoteSegmentsOverThreshold[1]) //mon:locked - mon.IntVal("remote_segments_over_threshold_3").Observe(observer.TotalStats.remoteSegmentsOverThreshold[2]) //mon:locked - mon.IntVal("remote_segments_over_threshold_4").Observe(observer.TotalStats.remoteSegmentsOverThreshold[3]) //mon:locked - mon.IntVal("remote_segments_over_threshold_5").Observe(observer.TotalStats.remoteSegmentsOverThreshold[4]) //mon:locked - mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //mon:locked - allUnhealthy := observer.TotalStats.remoteSegmentsNeedingRepair + observer.TotalStats.remoteSegmentsFailedToCheck - allChecked := observer.TotalStats.remoteSegmentsChecked + var allUnhealthy, allChecked int64 + for p, s := range observer.TotalStats { + t := monkit.NewSeriesTag("placement", strconv.FormatUint(uint64(p), 10)) + + mon.IntVal("remote_files_checked", t).Observe(s.objectsChecked) //mon:locked + mon.IntVal("remote_segments_checked", t).Observe(s.remoteSegmentsChecked) //mon:locked + mon.IntVal("remote_segments_failed_to_check", t).Observe(s.remoteSegmentsFailedToCheck) //mon:locked + mon.IntVal("remote_segments_needing_repair", t).Observe(s.remoteSegmentsNeedingRepair) //mon:locked + mon.IntVal("new_remote_segments_needing_repair", t).Observe(s.newRemoteSegmentsNeedingRepair) //mon:locked + mon.IntVal("remote_segments_lost", t).Observe(s.remoteSegmentsLost) //mon:locked + mon.IntVal("remote_files_lost", t).Observe(int64(len(s.objectsLost))) //mon:locked + mon.IntVal("remote_segments_over_threshold_1", t).Observe(s.remoteSegmentsOverThreshold[0]) //mon:locked + mon.IntVal("remote_segments_over_threshold_2", t).Observe(s.remoteSegmentsOverThreshold[1]) //mon:locked + mon.IntVal("remote_segments_over_threshold_3", t).Observe(s.remoteSegmentsOverThreshold[2]) //mon:locked + mon.IntVal("remote_segments_over_threshold_4", t).Observe(s.remoteSegmentsOverThreshold[3]) //mon:locked + mon.IntVal("remote_segments_over_threshold_5", t).Observe(s.remoteSegmentsOverThreshold[4]) //mon:locked + + allUnhealthy = s.remoteSegmentsNeedingRepair + s.remoteSegmentsFailedToCheck + allChecked = s.remoteSegmentsChecked + } + + mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //mon:locked allHealthy := allChecked - allUnhealthy mon.FloatVal("remote_segments_healthy_percentage").Observe(100 * float64(allHealthy) / float64(allChecked)) //mon:locked - return nil } @@ -257,7 +266,7 @@ type observerFork struct { getNodesEstimate func(ctx context.Context) (int, error) log *zap.Logger lastStreamID uuid.UUID - totalStats aggregateStats + totalStats aggregateStatsPlacements // reuse those slices to optimize memory usage nodeIDs []storj.NodeID @@ -322,7 +331,6 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg var ( // initialize monkit metrics once for better performance. segmentTotalCountIntVal = mon.IntVal("checker_segment_total_count") //mon:locked - segmentHealthyCountIntVal = mon.IntVal("checker_segment_healthy_count") //mon:locked segmentClumpedCountIntVal = mon.IntVal("checker_segment_clumped_count") //mon:locked segmentExitingCountIntVal = mon.IntVal("checker_segment_exiting_count") segmentAgeIntVal = mon.IntVal("checker_segment_age") //mon:locked @@ -333,10 +341,15 @@ var ( ) func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segment) (err error) { + // Grow the fork.totalStats if this placement doesn't fit. + if l := int(segment.Placement+1) - len(fork.totalStats); l > 0 { + fork.totalStats = append(fork.totalStats, make([]aggregateStats, l)...) + } + if segment.Inline() { if fork.lastStreamID.Compare(segment.StreamID) != 0 { fork.lastStreamID = segment.StreamID - fork.totalStats.objectsChecked++ + fork.totalStats[segment.Placement].objectsChecked++ } return nil @@ -351,10 +364,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme if fork.lastStreamID.Compare(segment.StreamID) != 0 { fork.lastStreamID = segment.StreamID stats.iterationAggregates.objectsChecked++ - fork.totalStats.objectsChecked++ + fork.totalStats[segment.Placement].objectsChecked++ } - fork.totalStats.remoteSegmentsChecked++ + fork.totalStats[segment.Placement].remoteSegmentsChecked++ stats.iterationAggregates.remoteSegmentsChecked++ // ensure we get values, even if only zero values, so that redash can have an alert based on this @@ -384,7 +397,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme } selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, fork.nodeIDs, fork.nodes) if err != nil { - fork.totalStats.remoteSegmentsFailedToCheck++ + fork.totalStats[segment.Placement].remoteSegmentsFailedToCheck++ stats.iterationAggregates.remoteSegmentsFailedToCheck++ return Error.New("error getting node information for pieces: %w", err) } @@ -395,7 +408,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme segmentTotalCountIntVal.Observe(int64(len(pieces))) stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces))) - segmentHealthyCountIntVal.Observe(int64(numHealthy)) + mon.IntVal("checker_segment_healthy_count", monkit.NewSeriesTag( + "placement", strconv.FormatUint(uint64(segment.Placement), 10), + )).Observe(int64(numHealthy)) //mon:locked + stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy)) segmentClumpedCountIntVal.Observe(int64(piecesCheck.Clumped.Count())) @@ -423,7 +439,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme if (numHealthy <= repairThreshold && numHealthy < successThreshold) || piecesCheck.ForcingRepair.Count() > 0 { injuredSegmentHealthFloatVal.Observe(segmentHealth) stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth) - fork.totalStats.remoteSegmentsNeedingRepair++ + fork.totalStats[segment.Placement].remoteSegmentsNeedingRepair++ stats.iterationAggregates.remoteSegmentsNeedingRepair++ err := fork.repairQueue.Insert(ctx, &queue.InjuredSegment{ StreamID: segment.StreamID, @@ -434,7 +450,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme }, func() { // Counters are increased after the queue has determined // that the segment wasn't already queued for repair. - fork.totalStats.newRemoteSegmentsNeedingRepair++ + fork.totalStats[segment.Placement].newRemoteSegmentsNeedingRepair++ stats.iterationAggregates.newRemoteSegmentsNeedingRepair++ }) if err != nil { @@ -444,8 +460,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme // monitor irreparable segments if piecesCheck.Retrievable.Count() < required { - if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) { - fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID) + if !slices.Contains(fork.totalStats[segment.Placement].objectsLost, segment.StreamID) { + fork.totalStats[segment.Placement].objectsLost = append( + fork.totalStats[segment.Placement].objectsLost, segment.StreamID, + ) } if !slices.Contains(stats.iterationAggregates.objectsLost, segment.StreamID) { @@ -467,7 +485,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme segmentTimeUntilIrreparableIntVal.Observe(int64(segmentAge.Seconds())) stats.segmentStats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds())) - fork.totalStats.remoteSegmentsLost++ + fork.totalStats[segment.Placement].remoteSegmentsLost++ stats.iterationAggregates.remoteSegmentsLost++ segmentsBelowMinReqCounter.Inc(1) @@ -493,12 +511,14 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme fork.log.Info("segment needs repair only because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets)) } } else { - if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) { + if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len( + fork.totalStats[segment.Placement].remoteSegmentsOverThreshold, + )) { // record metrics for segments right above repair threshold // numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5 - for i := range fork.totalStats.remoteSegmentsOverThreshold { + for i := range fork.totalStats[segment.Placement].remoteSegmentsOverThreshold { if numHealthy == (repairThreshold + i + 1) { - fork.totalStats.remoteSegmentsOverThreshold[i]++ + fork.totalStats[segment.Placement].remoteSegmentsOverThreshold[i]++ break } } diff --git a/satellite/repair/checker/observerstats.go b/satellite/repair/checker/observerstats.go index cb078499cef5..ff10c1afa4bb 100644 --- a/satellite/repair/checker/observerstats.go +++ b/satellite/repair/checker/observerstats.go @@ -150,6 +150,23 @@ func (stats *observerRSStats) collectAggregates() { stats.iterationAggregates = aggregateStats{} } +// aggregateStatsPlacements aggregate stats per placement. +// storj.PlacementConstraint values are the indexes of the slice. +type aggregateStatsPlacements []aggregateStats + +func (ap *aggregateStatsPlacements) combine(stats aggregateStatsPlacements) { + lenStats := len(stats) + if lenStats > len(*ap) { + // We don't append directly the stats which ap doesn't have because aggregateStats.objectsLost + // is a slice and ap must not refer to the same slice to avoid unnoticed changes. + *ap = append(*ap, make([]aggregateStats, lenStats-len(*ap))...) + } + + for i := 0; i < lenStats; i++ { + (*ap)[i].combine(stats[i]) + } +} + // aggregateStats tallies data over the full checker iteration. type aggregateStats struct { objectsChecked int64