diff --git a/satellite/metrics/metrics.go b/satellite/metrics/metrics.go index 5a270339602d..4ffd1525c3a4 100644 --- a/satellite/metrics/metrics.go +++ b/satellite/metrics/metrics.go @@ -3,7 +3,48 @@ package metrics -// Metrics represents the metrics that are tracked by this package. +import ( + "storj.io/common/storj" +) + +// PlacementsMetrics tracks metrics related to object and segments by placement. +// +// storj.PlacmentConstraints are the indexes of the slice. +type PlacementsMetrics []Metrics + +// Reset resets all the metrics to zero. +func (metrics *PlacementsMetrics) Reset() { + // Reuse the same allocated slice. + (*metrics) = (*metrics)[:0] +} + +// Aggregate aggregates the given metrics into the receiver. +func (metrics *PlacementsMetrics) Aggregate(partial PlacementsMetrics) { + mlen := len(*metrics) + // Resize the metrics slice if it has less registered placements than partial. + if len(partial) > mlen { + *metrics = append(*metrics, partial[mlen:]...) + } + + // Adjust the iterations to aggregate partials, except the placements that weren't accounted + // before. + if mlen > len(partial) { + mlen = len(partial) + } + + for i := 0; i < mlen; i++ { + (*metrics)[i].Aggregate(partial[i]) + } +} + +// Read reads the metrics for all the placements and calls cb for each placement and its metrics. +func (metrics *PlacementsMetrics) Read(cb func(_ storj.PlacementConstraint, _ Metrics)) { + for i, pm := range *metrics { + cb(storj.PlacementConstraint(i), pm) + } +} + +// Metrics tracks metrics related to objects and segments. type Metrics struct { // RemoteObjects is the count of objects with at least one remote segment. RemoteObjects int64 @@ -27,18 +68,13 @@ type Metrics struct { TotalSegmentsWithExpiresAt int64 } -// Reset resets the invidual metrics back to zero. -func (metrics *Metrics) Reset() { - *metrics = Metrics{} -} - -// Aggregate aggregates the given metrics into the receiver. -func (metrics *Metrics) Aggregate(partial Metrics) { - metrics.RemoteObjects += partial.RemoteObjects - metrics.InlineObjects += partial.InlineObjects - metrics.TotalInlineBytes += partial.TotalInlineBytes - metrics.TotalRemoteBytes += partial.TotalRemoteBytes - metrics.TotalInlineSegments += partial.TotalInlineSegments - metrics.TotalRemoteSegments += partial.TotalRemoteSegments - metrics.TotalSegmentsWithExpiresAt += partial.TotalSegmentsWithExpiresAt +// Aggregate aggregates the partial metrics into the receiver. +func (pm *Metrics) Aggregate(partial Metrics) { + pm.RemoteObjects += partial.RemoteObjects + pm.InlineObjects += partial.InlineObjects + pm.TotalInlineBytes += partial.TotalInlineBytes + pm.TotalRemoteBytes += partial.TotalRemoteBytes + pm.TotalInlineSegments += partial.TotalInlineSegments + pm.TotalRemoteSegments += partial.TotalRemoteSegments + pm.TotalSegmentsWithExpiresAt += partial.TotalSegmentsWithExpiresAt } diff --git a/satellite/metrics/observer.go b/satellite/metrics/observer.go index 3a0a2df97437..3e6a17abf98e 100644 --- a/satellite/metrics/observer.go +++ b/satellite/metrics/observer.go @@ -5,11 +5,13 @@ package metrics import ( "context" + "strconv" "time" "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/satellite/metabase/rangedloop" ) @@ -23,7 +25,7 @@ var ( // Observer implements the ranged segment loop observer interface for data // science metrics collection. type Observer struct { - metrics Metrics + metrics PlacementsMetrics } var _ rangedloop.Observer = (*Observer)(nil) @@ -63,29 +65,35 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) error // Finish emits the aggregated metrics. func (obs *Observer) Finish(ctx context.Context) error { - mon.IntVal("remote_dependent_object_count").Observe(obs.metrics.RemoteObjects) - mon.IntVal("inline_object_count").Observe(obs.metrics.InlineObjects) + obs.metrics.Read(func(p storj.PlacementConstraint, m Metrics) { + tag := monkit.NewSeriesTag("placement", strconv.FormatUint(uint64(p), 10)) - mon.IntVal("total_inline_bytes").Observe(obs.metrics.TotalInlineBytes) //mon:locked - mon.IntVal("total_remote_bytes").Observe(obs.metrics.TotalRemoteBytes) //mon:locked + mon.IntVal("remote_dependent_object_count", tag).Observe(m.RemoteObjects) + mon.IntVal("inline_object_count", tag).Observe(m.InlineObjects) - mon.IntVal("total_inline_segments").Observe(obs.metrics.TotalInlineSegments) //mon:locked - mon.IntVal("total_remote_segments").Observe(obs.metrics.TotalRemoteSegments) //mon:locked + mon.IntVal("total_inline_bytes", tag).Observe(m.TotalInlineBytes) //mon:locked + mon.IntVal("total_remote_bytes", tag).Observe(m.TotalRemoteBytes) //mon:locked + + mon.IntVal("total_inline_segments", tag).Observe(m.TotalInlineSegments) //mon:locked + mon.IntVal("total_remote_segments", tag).Observe(m.TotalRemoteSegments) //mon:locked + + mon.IntVal("total_segments_with_expires_at", tag).Observe(m.TotalSegmentsWithExpiresAt) //mon:locked + }) - mon.IntVal("total_segments_with_expires_at").Observe(obs.metrics.TotalSegmentsWithExpiresAt) //mon:locked return nil } // TestingMetrics returns the accumulated metrics. It is intended to be called // from tests. -func (obs *Observer) TestingMetrics() Metrics { +func (obs *Observer) TestingMetrics() PlacementsMetrics { return obs.metrics } type observerFork struct { - totals Metrics - stream streamMetrics - streamID uuid.UUID + totals PlacementsMetrics + stream streamMetrics + streamID uuid.UUID + streamPlacement storj.PlacementConstraint } // Process aggregates metrics about a range of metrics provided by the @@ -96,6 +104,7 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg // Stream ID has changed. Flush what we have so far. fork.Flush() fork.streamID = segment.StreamID + fork.streamPlacement = segment.Placement } if segment.Inline() { fork.stream.inlineSegments++ @@ -114,19 +123,23 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg // Flush is called whenever a new stream is observed and when the fork is // joined to aggregate the accumulated stream stats into the totals. func (fork *observerFork) Flush() { - fork.totals.TotalInlineSegments += fork.stream.inlineSegments - fork.totals.TotalRemoteSegments += fork.stream.remoteSegments - fork.totals.TotalInlineBytes += fork.stream.inlineBytes - fork.totals.TotalRemoteBytes += fork.stream.remoteBytes - fork.totals.TotalSegmentsWithExpiresAt += fork.stream.segmentsWithExpiresAt + if ldiff := (int(fork.streamPlacement) + 1) - len(fork.totals); ldiff > 0 { + fork.totals = append(fork.totals, make([]Metrics, ldiff)...) + } + + fork.totals[fork.streamPlacement].TotalInlineSegments += fork.stream.inlineSegments + fork.totals[fork.streamPlacement].TotalRemoteSegments += fork.stream.remoteSegments + fork.totals[fork.streamPlacement].TotalInlineBytes += fork.stream.inlineBytes + fork.totals[fork.streamPlacement].TotalRemoteBytes += fork.stream.remoteBytes + fork.totals[fork.streamPlacement].TotalSegmentsWithExpiresAt += fork.stream.segmentsWithExpiresAt if fork.stream.remoteSegments > 0 { // At least one remote segment was found for this stream so classify // as a remote object. - fork.totals.RemoteObjects++ + fork.totals[fork.streamPlacement].RemoteObjects++ } else if fork.stream.inlineSegments > 0 { // Only count an inline object if there is at least one inline segment // and no remote segments. - fork.totals.InlineObjects++ + fork.totals[fork.streamPlacement].InlineObjects++ } fork.stream = streamMetrics{} } diff --git a/satellite/metrics/observer_test.go b/satellite/metrics/observer_test.go index 4de758309c88..3676efd0c5ec 100644 --- a/satellite/metrics/observer_test.go +++ b/satellite/metrics/observer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/uuid" "storj.io/storj/satellite/metabase" @@ -18,25 +19,36 @@ import ( ) var ( + // Segments in the EU placement. inline1 = []rangedloop.Segment{ - {StreamID: uuid.UUID{1}, EncryptedSize: 10}, + {StreamID: uuid.UUID{1}, EncryptedSize: 10, Placement: storj.EU}, } remote2 = []rangedloop.Segment{ - {StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, - {StreamID: uuid.UUID{2}, EncryptedSize: 10}, + {StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU}, + {StreamID: uuid.UUID{2}, EncryptedSize: 10, Placement: storj.EU}, } remote3 = []rangedloop.Segment{ - {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, - {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, - {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, - {StreamID: uuid.UUID{3}, EncryptedSize: 10, ExpiresAt: &time.Time{}}, + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU}, + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU}, + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU}, + {StreamID: uuid.UUID{3}, EncryptedSize: 10, ExpiresAt: &time.Time{}, Placement: storj.EU}, + } + + // Segments in the US placement. + inline4 = []rangedloop.Segment{ + {StreamID: uuid.UUID{4}, EncryptedSize: 9, Placement: storj.US}, + } + remote5 = []rangedloop.Segment{ + {StreamID: uuid.UUID{5}, EncryptedSize: 20, Pieces: metabase.Pieces{{}}, Placement: storj.US}, + {StreamID: uuid.UUID{5}, EncryptedSize: 40, Pieces: metabase.Pieces{{}}, Placement: storj.US}, + {StreamID: uuid.UUID{5}, EncryptedSize: 5, Placement: storj.US}, } ) func TestObserver(t *testing.T) { ctx := testcontext.New(t) - loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) Metrics { + loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) PlacementsMetrics { service := rangedloop.NewService( zap.NewNop(), rangedloop.Config{BatchSize: 2, Parallelism: 2}, @@ -50,9 +62,15 @@ func TestObserver(t *testing.T) { t.Run("stats aggregation", func(t *testing.T) { obs := NewObserver() - metrics := loop(t, obs, inline1, remote2, remote3) + metrics := loop(t, obs, inline1, remote2, remote3, inline4, remote5) - require.Equal(t, Metrics{ + metricsLen := storj.EU + 1 + if storj.EU < storj.US { + metricsLen = storj.US + 1 + } + + expectedMetrics := PlacementsMetrics(make([]Metrics, metricsLen)) + expectedMetrics[storj.EU] = Metrics{ InlineObjects: 1, RemoteObjects: 2, TotalInlineSegments: 3, @@ -60,7 +78,19 @@ func TestObserver(t *testing.T) { TotalInlineBytes: 30, TotalRemoteBytes: 64, TotalSegmentsWithExpiresAt: 1, - }, metrics) + } + expectedMetrics[storj.US] = Metrics{ + InlineObjects: 1, + RemoteObjects: 1, + TotalInlineSegments: 2, + TotalRemoteSegments: 2, + TotalInlineBytes: 14, + TotalRemoteBytes: 60, + TotalSegmentsWithExpiresAt: 0, + } + + require.Len(t, metrics, len(expectedMetrics)) + require.Equal(t, expectedMetrics, metrics) }) t.Run("stats reset by start", func(t *testing.T) { @@ -69,9 +99,15 @@ func TestObserver(t *testing.T) { _ = loop(t, obs, inline1) // Any metrics gathered during the first loop should be dropped. - metrics := loop(t, obs, remote3) + metrics := loop(t, obs, remote3, remote5) + + metricsLen := storj.EU + 1 + if storj.EU < storj.US { + metricsLen = storj.US + 1 + } - require.Equal(t, Metrics{ + expectedMetrics := PlacementsMetrics(make([]Metrics, metricsLen)) + expectedMetrics[storj.EU] = Metrics{ InlineObjects: 0, RemoteObjects: 1, TotalInlineSegments: 1, @@ -79,7 +115,19 @@ func TestObserver(t *testing.T) { TotalInlineBytes: 10, TotalRemoteBytes: 48, TotalSegmentsWithExpiresAt: 1, - }, metrics) + } + expectedMetrics[storj.US] = Metrics{ + InlineObjects: 0, + RemoteObjects: 1, + TotalInlineSegments: 1, + TotalRemoteSegments: 2, + TotalInlineBytes: 5, + TotalRemoteBytes: 60, + TotalSegmentsWithExpiresAt: 0, + } + + require.Len(t, metrics, len(expectedMetrics)) + require.Equal(t, expectedMetrics, metrics) }) t.Run("join fails gracefully on bad partial type", func(t *testing.T) {