Skip to content

Commit

Permalink
satellite/durability: add exemplar and report time to the reported re…
Browse files Browse the repository at this point in the history
…sults

Exemplars are representative elements for the stat. For example if a stat min is `30`, we can save one example with that value.

More details about the concept is here: https://grafana.com/docs/grafana/latest/fundamentals/exemplars/

In our context, which save the segment + position in case of min is updated, to make it easier to look after the segment in danger.

Change-Id: I19be482f1ddc7f1711e722c7b17480366d2c8312
  • Loading branch information
elek committed Nov 6, 2023
1 parent 86decb1 commit 015cb94
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 45 deletions.
34 changes: 26 additions & 8 deletions satellite/durability/observer.go
Expand Up @@ -5,6 +5,7 @@ package durability

import (
"context"
"fmt"
"time"

"github.com/jtolio/eventkit"
Expand All @@ -23,20 +24,25 @@ var ek = eventkit.Package()
type HealthStat struct {
// because 0 means uninitialized, we store the min +1
minPlusOne int
Exemplar string
}

// Update updates the stat with one measurement: number of pieces which are available even without the nodes of the selected class.
func (h *HealthStat) Update(num int) {
// Exemplar is one example identifier with such measurement. Useful to dig deeper, based on this one example.
func (h *HealthStat) Update(num int, exemplar string) {
if num < h.minPlusOne-1 || h.minPlusOne == 0 {
h.minPlusOne = num + 1
h.Exemplar = exemplar
}
}

// Merge can merge two stat to one, without losing information.
func (h *HealthStat) Merge(stat *HealthStat) {
if stat.minPlusOne < h.minPlusOne && stat.minPlusOne > 0 {
h.minPlusOne = stat.minPlusOne
h.Exemplar = stat.Exemplar
}

}

// Min returns the minimal number.
Expand Down Expand Up @@ -72,7 +78,7 @@ type Report struct {
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(name string, stat *HealthStat)
reporter func(n time.Time, name string, stat *HealthStat)
reportThreshold int
asOfSystemInterval time.Duration
}
Expand Down Expand Up @@ -136,6 +142,7 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro
if !found {
c.healthStat[name] = &HealthStat{
minPlusOne: stat.minPlusOne,
Exemplar: stat.Exemplar,
}
} else {
existing.Merge(&stat)
Expand All @@ -147,14 +154,15 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro

// Finish implements rangedloop.Observer.
func (c *Report) Finish(ctx context.Context) error {
reportTime := time.Now()
for name, stat := range c.healthStat {
c.reporter(name, stat)
c.reporter(reportTime, name, stat)
}
return nil
}

// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(name string, stat *HealthStat)) {
func (c *Report) TestChangeReporter(r func(n time.Time, name string, stat *HealthStat)) {
c.reporter = r
}

Expand Down Expand Up @@ -213,6 +221,10 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
controlledByClass := c.controlledByClassCache
for i := range segments {
s := &segments[i]

if s.Inline() {
continue
}
healthyPieceCount := 0
for _, piece := range s.AliasPieces {
if len(c.classified) <= int(piece.Alias) {
Expand All @@ -233,6 +245,7 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
}
}

streamLocation := fmt.Sprintf("%s/%d", s.StreamID, s.Position.Encode())
for classID, count := range controlledByClass {
if count == 0 {
continue
Expand All @@ -243,18 +256,23 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen

diff := healthyPieceCount - int(count)

// if value is high, it's not a problem. faster to ignore it...
if c.reportThreshold > 0 && diff > c.reportThreshold {
continue
}
c.healthStat[classID].Update(diff)
c.healthStat[classID].Update(diff, streamLocation)

}
}
return nil
}

func reportToEventkit(name string, stat *HealthStat) {
ek.Event("durability", eventkit.String("name", name), eventkit.Int64("min", int64(stat.Min())))
func reportToEventkit(n time.Time, name string, stat *HealthStat) {
ek.Event("durability",
eventkit.String("name", name),
eventkit.String("exemplar", stat.Exemplar),
eventkit.Timestamp("report_time", n),
eventkit.Int64("min", int64(stat.Min())),
)
}

var _ rangedloop.Observer = &Report{}
Expand Down
3 changes: 2 additions & 1 deletion satellite/durability/observer_integration_test.go
Expand Up @@ -6,6 +6,7 @@ package durability_test
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestDurabilityIntegration(t *testing.T) {
}

result := make(map[string]*durability.HealthStat)
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(name string, stat *durability.HealthStat) {
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(n time.Time, name string, stat *durability.HealthStat) {
result[name] = stat
})

Expand Down
72 changes: 36 additions & 36 deletions satellite/durability/observer_test.go
Expand Up @@ -38,6 +38,31 @@ func TestDurability(t *testing.T) {
})
}

segment := func(nodes []*nodeselection.SelectedNode, ix ...int) (res rangedloop.Segment) {
var pieces metabase.AliasPieces
for n, i := range ix {
pieces = append(pieces, metabase.AliasPiece{
Number: uint16(n),
Alias: metabase.NodeAlias(i),
})
}

res.StreamID = testrand.UUID()
res.Position = metabase.SegmentPosition{
Part: 0,
Index: 0,
}

// it's not inline if non-default redundancy is set.
res.Redundancy = storj.RedundancyScheme{
ShareSize: 123,
}

res.AliasPieces = pieces

return res
}

ctx := testcontext.New(t)
c := NewDurability(nil, nil, []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
Expand All @@ -52,17 +77,11 @@ func TestDurability(t *testing.T) {
fork, err := c.Fork(ctx)
require.NoError(t, err)

segment1 := segment(storageNodes, 3, 6, 9, 1)
{
// first batch
err = fork.Process(ctx, []rangedloop.Segment{
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 3, 6, 9, 1),
},
segment1,
})
require.NoError(t, err)

Expand All @@ -73,30 +92,18 @@ func TestDurability(t *testing.T) {
{
// second batch
err = fork.Process(ctx, []rangedloop.Segment{
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 2, 3, 4, 7),
},
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 1, 2, 3, 4, 6, 7, 8),
},
segment(storageNodes, 2, 3, 4, 7),
segment(storageNodes, 1, 2, 3, 4, 6, 7, 8),
})
require.NoError(t, err)

err = c.Join(ctx, fork)
require.NoError(t, err)
}

require.NotNil(t, c.healthStat["net:127.0.0.0"])
require.Equal(t, 1, c.healthStat["net:127.0.0.0"].Min())
require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["net:127.0.0.0"].Exemplar)
require.Equal(t, 2, c.healthStat["net:127.0.1.0"].Min())
require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min())
}
Expand Down Expand Up @@ -134,8 +141,11 @@ func TestDurabilityUnknownNode(t *testing.T) {
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
Part: 0,
Index: 0,
},
Redundancy: storj.RedundancyScheme{
ShareSize: 123,
},
AliasPieces: metabase.AliasPieces{
metabase.AliasPiece{
Expand All @@ -157,16 +167,6 @@ func TestDurabilityUnknownNode(t *testing.T) {
require.Equal(t, 0, c.healthStat["net:127.0.0.1"].Min())
}

func pieces(nodes []*nodeselection.SelectedNode, ix ...int) (res metabase.AliasPieces) {
for n, i := range ix {
res = append(res, metabase.AliasPiece{
Number: uint16(n),
Alias: metabase.NodeAlias(i),
})
}
return res
}

func BenchmarkDurabilityProcess(b *testing.B) {
ctx := context.TODO()

Expand Down

0 comments on commit 015cb94

Please sign in to comment.