Skip to content

Commit

Permalink
metrics: track total pins, queued, pinning, pin error.
Browse files Browse the repository at this point in the history
This fixes #1470 and #1187.
  • Loading branch information
hsanjuan committed Apr 22, 2022
1 parent 4b351ca commit 3169fba
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 58 deletions.
4 changes: 3 additions & 1 deletion consensus/crdt/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (css *Consensus) setup() {
css.crdt = crdt

clusterState, err := dsstate.New(
css.ctx,
css.crdt,
// unsure if we should set something else but crdt is already
// namespaced and this would only namespace the keys, which only
Expand All @@ -290,6 +291,7 @@ func (css *Consensus) setup() {
css.state = clusterState

batchingState, err := dsstate.NewBatching(
css.ctx,
css.crdt,
"",
dsstate.DefaultHandle(),
Expand Down Expand Up @@ -663,5 +665,5 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error)
if err != nil {
return nil, err
}
return dsstate.NewBatching(crdt, "", dsstate.DefaultHandle())
return dsstate.NewBatching(context.Background(), crdt, "", dsstate.DefaultHandle())
}
8 changes: 5 additions & 3 deletions consensus/raft/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,30 @@ func NewConsensus(
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())

logger.Debug("starting Consensus and waiting for a leader...")
baseOp := &LogOp{tracing: cfg.Tracing}
state, err := dsstate.New(
ctx,
store,
cfg.DatastoreNamespace,
dsstate.DefaultHandle(),
)
if err != nil {
cancel()
return nil, err
}
consensus := libp2praft.NewOpLog(state, baseOp)
raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging)
if err != nil {
logger.Error("error creating raft: ", err)
cancel()
return nil, err
}
actor := libp2praft.NewActor(raft.raft)
consensus.SetActor(actor)

ctx, cancel := context.WithCancel(context.Background())

cc := &Consensus{
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -550,7 +552,7 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.State, error) {
return nil, err
}

st, err := dsstate.New(store, cfg.DatastoreNamespace, dsstate.DefaultHandle())
st, err := dsstate.New(context.Background(), store, cfg.DatastoreNamespace, dsstate.DefaultHandle())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/raft/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
}

// Call raft.LastState and ensure we get the correct state
snapState, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
snapState, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/raft/log_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestApplyToPin(t *testing.T) {
defer cleanRaft(1)
defer cc.Shutdown(ctx)

st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestApplyToUnpin(t *testing.T) {
defer cleanRaft(1)
defer cc.Shutdown(ctx)

st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 0 additions & 9 deletions monitor/metrics/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (
"time"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"

peer "github.com/libp2p/go-libp2p-core/peer"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

// AlertChannelCap specifies how much buffer the alerts channel has.
Expand Down Expand Up @@ -135,11 +131,6 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
}
select {
case mc.alertCh <- alrt:
stats.RecordWithTags(
mc.ctx,
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
observations.Alerts.M(1),
)
default:
return ErrAlertChannelFull
}
Expand Down
60 changes: 31 additions & 29 deletions observations/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
var logger = logging.Logger("observations")

var (
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
// latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
// bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
// latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
// bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
// messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
)

// attributes
Expand All @@ -31,47 +31,49 @@ var (

// metrics
var (
// Pins counts the number of pins ipfs-cluster is tracking.
Pins = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless)
// TrackerPins counts the number of pins the local peer is tracking.
TrackerPins = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless)
// Peers counts the number of ipfs-cluster peers are currently in the cluster.
Peers = stats.Int64("cluster/peers", "Number of cluster peers", stats.UnitDimensionless)
// Alerts is the number of alerts that have been sent due to peers not sending "ping" heartbeats in time.
Alerts = stats.Int64("cluster/alerts", "Number of alerts triggered", stats.UnitDimensionless)
// This metric is managed in state/dsstate.
Pins = stats.Int64("pins", "Total number of pins", stats.UnitDimensionless)

// These metrics are managed by the pintracker/optracker module.
PinsQueued = stats.Int64("pins/pin_queued", "Number of pins queued for pinning", stats.UnitDimensionless)
PinsPinning = stats.Int64("pins/pinning", "Number of pins currently pinning", stats.UnitDimensionless)
PinsPinError = stats.Int64("pins/pin_error", "Number of pins in pin_error state", stats.UnitDimensionless)
)

// views, which is just the aggregation of the metrics
var (
PinsView = &view.View{
Measure: Pins,
TagKeys: []tag.Key{HostKey},
Measure: Pins,
// This would add a tag to the metric if a value for this key
// is present in the context when recording the observation.

//TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(),
}

TrackerPinsView = &view.View{
Measure: TrackerPins,
TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(),
PinsQueuedView = &view.View{
Measure: PinsQueued,
//TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(),
}

PeersView = &view.View{
Measure: Peers,
TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(),
PinsPinningView = &view.View{
Measure: PinsPinning,
//TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(),
}

AlertsView = &view.View{
Measure: Alerts,
TagKeys: []tag.Key{HostKey, RemotePeerKey},
Aggregation: messageCountDistribution,
PinsPinErrorView = &view.View{
Measure: PinsPinError,
//TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(),
}

DefaultViews = []*view.View{
PinsView,
TrackerPinsView,
PeersView,
AlertsView,
PinsQueuedView,
PinsPinningView,
PinsPinErrorView,
}
)

Expand Down
4 changes: 4 additions & 0 deletions pintracker/optracker/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
recordMetric(op, -1)
op.mu.Lock()
{
op.phase = ph
op.ts = time.Now()
}
op.mu.Unlock()
recordMetric(op, 1)
span.End()
}

Expand Down Expand Up @@ -194,13 +196,15 @@ func (op *Operation) Error() string {
// an error message. It updates the timestamp.
func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError")
recordMetric(op, -1)
op.mu.Lock()
{
op.phase = PhaseError
op.error = err.Error()
op.ts = time.Now()
}
op.mu.Unlock()
recordMetric(op, 1)
span.End()
}

Expand Down
29 changes: 29 additions & 0 deletions pintracker/optracker/operationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"time"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"

logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"

"go.opencensus.io/stats"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -52,6 +54,8 @@ func (opt *OperationTracker) String() string {

// NewOperationTracker creates a new OperationTracker.
func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *OperationTracker {
initializeMetrics(ctx)

return &OperationTracker{
ctx: ctx,
pid: pid,
Expand All @@ -78,6 +82,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone {
return nil // an ongoing operation of the same sign exists
}
recordMetric(op, -1)
op.Cancel() // cancel ongoing operation and replace it
}

Expand All @@ -89,6 +94,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
}
logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph)
opt.operations[pin.Cid] = op2
recordMetric(op2, 1)
return op2
}

Expand Down Expand Up @@ -121,6 +127,7 @@ func (opt *OperationTracker) Status(ctx context.Context, c api.Cid) (api.Tracker
// is PhaseDone. Any other phases are considered in-flight and not touched.
// For things already in error, the error message is updated.
// Remote pins are ignored too.
// Only used in tests right now.
func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) {
opt.mu.Lock()
defer opt.mu.Unlock()
Expand Down Expand Up @@ -287,6 +294,7 @@ func (opt *OperationTracker) Filter(ctx context.Context, ipfs api.IPFSID, filter
// with the matching filter. Note, only supports
// filters of type OperationType or Phase, any other type
// will result in a nil slice being returned.
// Only used in tests right now.
func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface{}) []*Operation {
var fltops []*Operation
opt.mu.RLock()
Expand Down Expand Up @@ -328,3 +336,24 @@ func filter(ctx context.Context, in, out map[api.Cid]*Operation, filter interfac
}
}
}

func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.PinsPinError.M(0))
stats.Record(ctx, observations.PinsQueued.M(0))
stats.Record(ctx, observations.PinsPinning.M(0))
}

func recordMetric(op *Operation, val int64) {
if op.Type() == OperationPin {
switch op.Phase() {
case PhaseError:
stats.Record(op.Context(), observations.PinsPinError.M(val))
case PhaseQueued:
stats.Record(op.Context(), observations.PinsQueued.M(val))
case PhaseInProgress:
stats.Record(op.Context(), observations.PinsPinning.M(val))
case PhaseDone:
// we have no metric to log anything
}
}
}
5 changes: 2 additions & 3 deletions pintracker/pintracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var sortPinInfoByCid = func(p []api.PinInfo) {
// - Cid2 - weird / remote // replication factor set to 0, no allocations
// - Cid3 - remote - this pin is on ipfs
// - Cid4 - pin everywhere - this pin is not on ipfs
func prefilledState(context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
func prefilledState(ctx context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
return nil, err
}
Expand All @@ -59,7 +59,6 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
api.PinWithOpts(test.Cid4, pinOpts),
}

ctx := context.Background()
for _, pin := range pins {
err = st.Add(ctx, pin)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pintracker/stateless/stateless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func getStateFunc(t testing.TB, items ...api.Pin) func(context.Context) (state.R
t.Helper()
ctx := context.Background()

st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 3169fba

Please sign in to comment.