Skip to content

Commit

Permalink
satellite/metrics: Split metrics by placement
Browse files Browse the repository at this point in the history
We want to track the segment metrics by placement through monkit tags to
filter out Storj Select data from our public stats API.

This commit only tackle the metrics handled by the satelite/metrics
package. There will be other commits for adjusting the other metrics
that have to be classified by placement too.

Change-Id: I7b57f8ca7475a7b5be8a3859ca1e711b791758a2
  • Loading branch information
ifraixedes authored and Storj Robot committed Mar 15, 2024
1 parent 7e8e166 commit 048e16e
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 48 deletions.
66 changes: 51 additions & 15 deletions satellite/metrics/metrics.go
Expand Up @@ -3,7 +3,48 @@

package metrics

// Metrics represents the metrics that are tracked by this package.
import (
"storj.io/common/storj"
)

// PlacementsMetrics tracks metrics related to object and segments by placement.
//
// storj.PlacmentConstraints are the indexes of the slice.
type PlacementsMetrics []Metrics

// Reset resets all the metrics to zero.
func (metrics *PlacementsMetrics) Reset() {
// Reuse the same allocated slice.
(*metrics) = (*metrics)[:0]
}

// Aggregate aggregates the given metrics into the receiver.
func (metrics *PlacementsMetrics) Aggregate(partial PlacementsMetrics) {
mlen := len(*metrics)
// Resize the metrics slice if it has less registered placements than partial.
if len(partial) > mlen {
*metrics = append(*metrics, partial[mlen:]...)
}

// Adjust the iterations to aggregate partials, except the placements that weren't accounted
// before.
if mlen > len(partial) {
mlen = len(partial)
}

for i := 0; i < mlen; i++ {
(*metrics)[i].Aggregate(partial[i])
}
}

// Read reads the metrics for all the placements and calls cb for each placement and its metrics.
func (metrics *PlacementsMetrics) Read(cb func(_ storj.PlacementConstraint, _ Metrics)) {
for i, pm := range *metrics {
cb(storj.PlacementConstraint(i), pm)
}
}

// Metrics tracks metrics related to objects and segments.
type Metrics struct {
// RemoteObjects is the count of objects with at least one remote segment.
RemoteObjects int64
Expand All @@ -27,18 +68,13 @@ type Metrics struct {
TotalSegmentsWithExpiresAt int64
}

// Reset resets the invidual metrics back to zero.
func (metrics *Metrics) Reset() {
*metrics = Metrics{}
}

// Aggregate aggregates the given metrics into the receiver.
func (metrics *Metrics) Aggregate(partial Metrics) {
metrics.RemoteObjects += partial.RemoteObjects
metrics.InlineObjects += partial.InlineObjects
metrics.TotalInlineBytes += partial.TotalInlineBytes
metrics.TotalRemoteBytes += partial.TotalRemoteBytes
metrics.TotalInlineSegments += partial.TotalInlineSegments
metrics.TotalRemoteSegments += partial.TotalRemoteSegments
metrics.TotalSegmentsWithExpiresAt += partial.TotalSegmentsWithExpiresAt
// Aggregate aggregates the partial metrics into the receiver.
func (pm *Metrics) Aggregate(partial Metrics) {
pm.RemoteObjects += partial.RemoteObjects
pm.InlineObjects += partial.InlineObjects
pm.TotalInlineBytes += partial.TotalInlineBytes
pm.TotalRemoteBytes += partial.TotalRemoteBytes
pm.TotalInlineSegments += partial.TotalInlineSegments
pm.TotalRemoteSegments += partial.TotalRemoteSegments
pm.TotalSegmentsWithExpiresAt += partial.TotalSegmentsWithExpiresAt
}
51 changes: 32 additions & 19 deletions satellite/metrics/observer.go
Expand Up @@ -5,11 +5,13 @@ package metrics

import (
"context"
"strconv"
"time"

"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"

"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/rangedloop"
)
Expand All @@ -23,7 +25,7 @@ var (
// Observer implements the ranged segment loop observer interface for data
// science metrics collection.
type Observer struct {
metrics Metrics
metrics PlacementsMetrics
}

var _ rangedloop.Observer = (*Observer)(nil)
Expand Down Expand Up @@ -63,29 +65,35 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) error

// Finish emits the aggregated metrics.
func (obs *Observer) Finish(ctx context.Context) error {
mon.IntVal("remote_dependent_object_count").Observe(obs.metrics.RemoteObjects)
mon.IntVal("inline_object_count").Observe(obs.metrics.InlineObjects)
obs.metrics.Read(func(p storj.PlacementConstraint, m Metrics) {
tag := monkit.NewSeriesTag("placement", strconv.FormatUint(uint64(p), 10))

mon.IntVal("total_inline_bytes").Observe(obs.metrics.TotalInlineBytes) //mon:locked
mon.IntVal("total_remote_bytes").Observe(obs.metrics.TotalRemoteBytes) //mon:locked
mon.IntVal("remote_dependent_object_count", tag).Observe(m.RemoteObjects)
mon.IntVal("inline_object_count", tag).Observe(m.InlineObjects)

mon.IntVal("total_inline_segments").Observe(obs.metrics.TotalInlineSegments) //mon:locked
mon.IntVal("total_remote_segments").Observe(obs.metrics.TotalRemoteSegments) //mon:locked
mon.IntVal("total_inline_bytes", tag).Observe(m.TotalInlineBytes) //mon:locked
mon.IntVal("total_remote_bytes", tag).Observe(m.TotalRemoteBytes) //mon:locked

mon.IntVal("total_inline_segments", tag).Observe(m.TotalInlineSegments) //mon:locked
mon.IntVal("total_remote_segments", tag).Observe(m.TotalRemoteSegments) //mon:locked

mon.IntVal("total_segments_with_expires_at", tag).Observe(m.TotalSegmentsWithExpiresAt) //mon:locked
})

mon.IntVal("total_segments_with_expires_at").Observe(obs.metrics.TotalSegmentsWithExpiresAt) //mon:locked
return nil
}

// TestingMetrics returns the accumulated metrics. It is intended to be called
// from tests.
func (obs *Observer) TestingMetrics() Metrics {
func (obs *Observer) TestingMetrics() PlacementsMetrics {
return obs.metrics
}

type observerFork struct {
totals Metrics
stream streamMetrics
streamID uuid.UUID
totals PlacementsMetrics
stream streamMetrics
streamID uuid.UUID
streamPlacement storj.PlacementConstraint
}

// Process aggregates metrics about a range of metrics provided by the
Expand All @@ -96,6 +104,7 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg
// Stream ID has changed. Flush what we have so far.
fork.Flush()
fork.streamID = segment.StreamID
fork.streamPlacement = segment.Placement
}
if segment.Inline() {
fork.stream.inlineSegments++
Expand All @@ -114,19 +123,23 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg
// Flush is called whenever a new stream is observed and when the fork is
// joined to aggregate the accumulated stream stats into the totals.
func (fork *observerFork) Flush() {
fork.totals.TotalInlineSegments += fork.stream.inlineSegments
fork.totals.TotalRemoteSegments += fork.stream.remoteSegments
fork.totals.TotalInlineBytes += fork.stream.inlineBytes
fork.totals.TotalRemoteBytes += fork.stream.remoteBytes
fork.totals.TotalSegmentsWithExpiresAt += fork.stream.segmentsWithExpiresAt
if ldiff := (int(fork.streamPlacement) + 1) - len(fork.totals); ldiff > 0 {
fork.totals = append(fork.totals, make([]Metrics, ldiff)...)
}

fork.totals[fork.streamPlacement].TotalInlineSegments += fork.stream.inlineSegments
fork.totals[fork.streamPlacement].TotalRemoteSegments += fork.stream.remoteSegments
fork.totals[fork.streamPlacement].TotalInlineBytes += fork.stream.inlineBytes
fork.totals[fork.streamPlacement].TotalRemoteBytes += fork.stream.remoteBytes
fork.totals[fork.streamPlacement].TotalSegmentsWithExpiresAt += fork.stream.segmentsWithExpiresAt
if fork.stream.remoteSegments > 0 {
// At least one remote segment was found for this stream so classify
// as a remote object.
fork.totals.RemoteObjects++
fork.totals[fork.streamPlacement].RemoteObjects++
} else if fork.stream.inlineSegments > 0 {
// Only count an inline object if there is at least one inline segment
// and no remote segments.
fork.totals.InlineObjects++
fork.totals[fork.streamPlacement].InlineObjects++
}
fork.stream = streamMetrics{}
}
Expand Down
76 changes: 62 additions & 14 deletions satellite/metrics/observer_test.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
Expand All @@ -18,25 +19,36 @@ import (
)

var (
// Segments in the EU placement.
inline1 = []rangedloop.Segment{
{StreamID: uuid.UUID{1}, EncryptedSize: 10},
{StreamID: uuid.UUID{1}, EncryptedSize: 10, Placement: storj.EU},
}
remote2 = []rangedloop.Segment{
{StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{2}, EncryptedSize: 10},
{StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU},
{StreamID: uuid.UUID{2}, EncryptedSize: 10, Placement: storj.EU},
}
remote3 = []rangedloop.Segment{
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 10, ExpiresAt: &time.Time{}},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}, Placement: storj.EU},
{StreamID: uuid.UUID{3}, EncryptedSize: 10, ExpiresAt: &time.Time{}, Placement: storj.EU},
}

// Segments in the US placement.
inline4 = []rangedloop.Segment{
{StreamID: uuid.UUID{4}, EncryptedSize: 9, Placement: storj.US},
}
remote5 = []rangedloop.Segment{
{StreamID: uuid.UUID{5}, EncryptedSize: 20, Pieces: metabase.Pieces{{}}, Placement: storj.US},
{StreamID: uuid.UUID{5}, EncryptedSize: 40, Pieces: metabase.Pieces{{}}, Placement: storj.US},
{StreamID: uuid.UUID{5}, EncryptedSize: 5, Placement: storj.US},
}
)

func TestObserver(t *testing.T) {
ctx := testcontext.New(t)

loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) Metrics {
loop := func(tb testing.TB, obs *Observer, streams ...[]rangedloop.Segment) PlacementsMetrics {
service := rangedloop.NewService(
zap.NewNop(),
rangedloop.Config{BatchSize: 2, Parallelism: 2},
Expand All @@ -50,17 +62,35 @@ func TestObserver(t *testing.T) {
t.Run("stats aggregation", func(t *testing.T) {
obs := NewObserver()

metrics := loop(t, obs, inline1, remote2, remote3)
metrics := loop(t, obs, inline1, remote2, remote3, inline4, remote5)

require.Equal(t, Metrics{
metricsLen := storj.EU + 1
if storj.EU < storj.US {
metricsLen = storj.US + 1
}

expectedMetrics := PlacementsMetrics(make([]Metrics, metricsLen))
expectedMetrics[storj.EU] = Metrics{
InlineObjects: 1,
RemoteObjects: 2,
TotalInlineSegments: 3,
TotalRemoteSegments: 4,
TotalInlineBytes: 30,
TotalRemoteBytes: 64,
TotalSegmentsWithExpiresAt: 1,
}, metrics)
}
expectedMetrics[storj.US] = Metrics{
InlineObjects: 1,
RemoteObjects: 1,
TotalInlineSegments: 2,
TotalRemoteSegments: 2,
TotalInlineBytes: 14,
TotalRemoteBytes: 60,
TotalSegmentsWithExpiresAt: 0,
}

require.Len(t, metrics, len(expectedMetrics))
require.Equal(t, expectedMetrics, metrics)
})

t.Run("stats reset by start", func(t *testing.T) {
Expand All @@ -69,17 +99,35 @@ func TestObserver(t *testing.T) {
_ = loop(t, obs, inline1)

// Any metrics gathered during the first loop should be dropped.
metrics := loop(t, obs, remote3)
metrics := loop(t, obs, remote3, remote5)

metricsLen := storj.EU + 1
if storj.EU < storj.US {
metricsLen = storj.US + 1
}

require.Equal(t, Metrics{
expectedMetrics := PlacementsMetrics(make([]Metrics, metricsLen))
expectedMetrics[storj.EU] = Metrics{
InlineObjects: 0,
RemoteObjects: 1,
TotalInlineSegments: 1,
TotalRemoteSegments: 3,
TotalInlineBytes: 10,
TotalRemoteBytes: 48,
TotalSegmentsWithExpiresAt: 1,
}, metrics)
}
expectedMetrics[storj.US] = Metrics{
InlineObjects: 0,
RemoteObjects: 1,
TotalInlineSegments: 1,
TotalRemoteSegments: 2,
TotalInlineBytes: 5,
TotalRemoteBytes: 60,
TotalSegmentsWithExpiresAt: 0,
}

require.Len(t, metrics, len(expectedMetrics))
require.Equal(t, expectedMetrics, metrics)
})

t.Run("join fails gracefully on bad partial type", func(t *testing.T) {
Expand Down

0 comments on commit 048e16e

Please sign in to comment.