diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index bdcf8e4c8..30a1c87a1 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -276,6 +276,7 @@ func (css *Consensus) setup() { css.crdt = crdt clusterState, err := dsstate.New( + css.ctx, css.crdt, // unsure if we should set something else but crdt is already // namespaced and this would only namespace the keys, which only @@ -290,6 +291,7 @@ func (css *Consensus) setup() { css.state = clusterState batchingState, err := dsstate.NewBatching( + css.ctx, css.crdt, "", dsstate.DefaultHandle(), @@ -663,5 +665,5 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) if err != nil { return nil, err } - return dsstate.NewBatching(crdt, "", dsstate.DefaultHandle()) + return dsstate.NewBatching(context.Background(), crdt, "", dsstate.DefaultHandle()) } diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index a183914ef..0de1e717a 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -71,28 +71,30 @@ func NewConsensus( if err != nil { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) logger.Debug("starting Consensus and waiting for a leader...") baseOp := &LogOp{tracing: cfg.Tracing} state, err := dsstate.New( + ctx, store, cfg.DatastoreNamespace, dsstate.DefaultHandle(), ) if err != nil { + cancel() return nil, err } consensus := libp2praft.NewOpLog(state, baseOp) raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging) if err != nil { logger.Error("error creating raft: ", err) + cancel() return nil, err } actor := libp2praft.NewActor(raft.raft) consensus.SetActor(actor) - ctx, cancel := context.WithCancel(context.Background()) - cc := &Consensus{ ctx: ctx, cancel: cancel, @@ -550,7 +552,7 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.State, error) { return nil, err } - st, err := dsstate.New(store, cfg.DatastoreNamespace, dsstate.DefaultHandle()) + st, err := dsstate.New(context.Background(), store, cfg.DatastoreNamespace, dsstate.DefaultHandle()) if err != nil { return nil, err } diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index bf49d0a25..e33eb5c0b 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -316,7 +316,7 @@ func TestRaftLatestSnapshot(t *testing.T) { } // Call raft.LastState and ensure we get the correct state - snapState, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + snapState, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle()) if err != nil { t.Fatal(err) } diff --git a/consensus/raft/log_op_test.go b/consensus/raft/log_op_test.go index 94deceed4..9aa893502 100644 --- a/consensus/raft/log_op_test.go +++ b/consensus/raft/log_op_test.go @@ -21,7 +21,7 @@ func TestApplyToPin(t *testing.T) { defer cleanRaft(1) defer cc.Shutdown(ctx) - st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle()) if err != nil { t.Fatal(err) } @@ -54,7 +54,7 @@ func TestApplyToUnpin(t *testing.T) { defer cleanRaft(1) defer cc.Shutdown(ctx) - st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle()) if err != nil { t.Fatal(err) } diff --git a/monitor/metrics/checker.go b/monitor/metrics/checker.go index 2dc996285..a035c2531 100644 --- a/monitor/metrics/checker.go +++ b/monitor/metrics/checker.go @@ -7,12 +7,8 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/observations" peer "github.com/libp2p/go-libp2p-core/peer" - - "go.opencensus.io/stats" - "go.opencensus.io/tag" ) // AlertChannelCap specifies how much buffer the alerts channel has. @@ -135,11 +131,6 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error { } select { case mc.alertCh <- alrt: - stats.RecordWithTags( - mc.ctx, - []tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())}, - observations.Alerts.M(1), - ) default: return ErrAlertChannelFull } diff --git a/observations/metrics.go b/observations/metrics.go index 9a9dc5e3f..b2adbffd5 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -12,10 +12,10 @@ import ( var logger = logging.Logger("observations") var ( - // taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) - // latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) - // bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) - messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) +// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) +// latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) +// bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) +// messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) ) // attributes @@ -31,47 +31,49 @@ var ( // metrics var ( - // Pins counts the number of pins ipfs-cluster is tracking. - Pins = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless) - // TrackerPins counts the number of pins the local peer is tracking. - TrackerPins = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless) - // Peers counts the number of ipfs-cluster peers are currently in the cluster. - Peers = stats.Int64("cluster/peers", "Number of cluster peers", stats.UnitDimensionless) - // Alerts is the number of alerts that have been sent due to peers not sending "ping" heartbeats in time. - Alerts = stats.Int64("cluster/alerts", "Number of alerts triggered", stats.UnitDimensionless) + // This metric is managed in state/dsstate. + Pins = stats.Int64("pins", "Total number of pins", stats.UnitDimensionless) + + // These metrics are managed by the pintracker/optracker module. + PinsQueued = stats.Int64("pins/pin_queued", "Number of pins queued for pinning", stats.UnitDimensionless) + PinsPinning = stats.Int64("pins/pinning", "Number of pins currently pinning", stats.UnitDimensionless) + PinsPinError = stats.Int64("pins/pin_error", "Number of pins in pin_error state", stats.UnitDimensionless) ) // views, which is just the aggregation of the metrics var ( PinsView = &view.View{ - Measure: Pins, - TagKeys: []tag.Key{HostKey}, + Measure: Pins, + // This would add a tag to the metric if a value for this key + // is present in the context when recording the observation. + + //TagKeys: []tag.Key{HostKey}, Aggregation: view.LastValue(), } - TrackerPinsView = &view.View{ - Measure: TrackerPins, - TagKeys: []tag.Key{HostKey}, - Aggregation: view.LastValue(), + PinsQueuedView = &view.View{ + Measure: PinsQueued, + //TagKeys: []tag.Key{HostKey}, + Aggregation: view.Sum(), } - PeersView = &view.View{ - Measure: Peers, - TagKeys: []tag.Key{HostKey}, - Aggregation: view.LastValue(), + PinsPinningView = &view.View{ + Measure: PinsPinning, + //TagKeys: []tag.Key{HostKey}, + Aggregation: view.Sum(), } - AlertsView = &view.View{ - Measure: Alerts, - TagKeys: []tag.Key{HostKey, RemotePeerKey}, - Aggregation: messageCountDistribution, + PinsPinErrorView = &view.View{ + Measure: PinsPinError, + //TagKeys: []tag.Key{HostKey}, + Aggregation: view.Sum(), } DefaultViews = []*view.View{ PinsView, - TrackerPinsView, - PeersView, - AlertsView, + PinsQueuedView, + PinsPinningView, + PinsPinErrorView, } ) diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index c9f9e08ea..a976460a9 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -137,12 +137,14 @@ func (op *Operation) Phase() Phase { // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") + recordMetric(op, -1) op.mu.Lock() { op.phase = ph op.ts = time.Now() } op.mu.Unlock() + recordMetric(op, 1) span.End() } @@ -194,6 +196,7 @@ func (op *Operation) Error() string { // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { _, span := trace.StartSpan(op.ctx, "optracker/SetError") + recordMetric(op, -1) op.mu.Lock() { op.phase = PhaseError @@ -201,6 +204,7 @@ func (op *Operation) SetError(err error) { op.ts = time.Now() } op.mu.Unlock() + recordMetric(op, 1) span.End() } diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index 767540b77..0859f4b5c 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -13,10 +13,12 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/observations" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/stats" "go.opencensus.io/trace" ) @@ -52,6 +54,8 @@ func (opt *OperationTracker) String() string { // NewOperationTracker creates a new OperationTracker. func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *OperationTracker { + initializeMetrics(ctx) + return &OperationTracker{ ctx: ctx, pid: pid, @@ -78,6 +82,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { return nil // an ongoing operation of the same sign exists } + recordMetric(op, -1) op.Cancel() // cancel ongoing operation and replace it } @@ -89,6 +94,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, } logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) opt.operations[pin.Cid] = op2 + recordMetric(op2, 1) return op2 } @@ -121,6 +127,7 @@ func (opt *OperationTracker) Status(ctx context.Context, c api.Cid) (api.Tracker // is PhaseDone. Any other phases are considered in-flight and not touched. // For things already in error, the error message is updated. // Remote pins are ignored too. +// Only used in tests right now. func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) { opt.mu.Lock() defer opt.mu.Unlock() @@ -287,6 +294,7 @@ func (opt *OperationTracker) Filter(ctx context.Context, ipfs api.IPFSID, filter // with the matching filter. Note, only supports // filters of type OperationType or Phase, any other type // will result in a nil slice being returned. +// Only used in tests right now. func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface{}) []*Operation { var fltops []*Operation opt.mu.RLock() @@ -328,3 +336,24 @@ func filter(ctx context.Context, in, out map[api.Cid]*Operation, filter interfac } } } + +func initializeMetrics(ctx context.Context) { + stats.Record(ctx, observations.PinsPinError.M(0)) + stats.Record(ctx, observations.PinsQueued.M(0)) + stats.Record(ctx, observations.PinsPinning.M(0)) +} + +func recordMetric(op *Operation, val int64) { + if op.Type() == OperationPin { + switch op.Phase() { + case PhaseError: + stats.Record(op.Context(), observations.PinsPinError.M(val)) + case PhaseQueued: + stats.Record(op.Context(), observations.PinsQueued.M(val)) + case PhaseInProgress: + stats.Record(op.Context(), observations.PinsPinning.M(val)) + case PhaseDone: + // we have no metric to log anything + } + } +} diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 78ef57501..113290246 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -40,8 +40,8 @@ var sortPinInfoByCid = func(p []api.PinInfo) { // - Cid2 - weird / remote // replication factor set to 0, no allocations // - Cid3 - remote - this pin is on ipfs // - Cid4 - pin everywhere - this pin is not on ipfs -func prefilledState(context.Context) (state.ReadOnly, error) { - st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) +func prefilledState(ctx context.Context) (state.ReadOnly, error) { + st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle()) if err != nil { return nil, err } @@ -59,7 +59,6 @@ func prefilledState(context.Context) (state.ReadOnly, error) { api.PinWithOpts(test.Cid4, pinOpts), } - ctx := context.Background() for _, pin := range pins { err = st.Add(ctx, pin) if err != nil { diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index 7dffc07e2..f1dfb8dd0 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -121,7 +121,7 @@ func getStateFunc(t testing.TB, items ...api.Pin) func(context.Context) (state.R t.Helper() ctx := context.Background() - st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle()) if err != nil { t.Fatal(err) } diff --git a/state/dsstate/datastore.go b/state/dsstate/datastore.go index 8cda7e1a8..7cc502931 100644 --- a/state/dsstate/datastore.go +++ b/state/dsstate/datastore.go @@ -6,8 +6,10 @@ import ( "context" "fmt" "io" + "sync/atomic" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/state" ds "github.com/ipfs/go-datastore" @@ -16,6 +18,7 @@ import ( logging "github.com/ipfs/go-log/v2" codec "github.com/ugorji/go/codec" + "go.opencensus.io/stats" trace "go.opencensus.io/trace" ) @@ -34,6 +37,8 @@ type State struct { codecHandle codec.Handle namespace ds.Key // version int + + totalPins int64 } // DefaultHandle returns the codec handler of choice (Msgpack). @@ -49,7 +54,7 @@ func DefaultHandle() codec.Handle { // // The Handle controls options for the serialization of the full state // (marshaling/unmarshaling). -func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) { +func New(ctx context.Context, dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) { if handle == nil { handle = DefaultHandle() } @@ -59,21 +64,34 @@ func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, er dsWrite: dstore, codecHandle: handle, namespace: ds.NewKey(namespace), + totalPins: 0, } + stats.Record(ctx, observations.Pins.M(0)) + return st, nil } // Add adds a new Pin or replaces an existing one. -func (st *State) Add(ctx context.Context, c api.Pin) error { +func (st *State) Add(ctx context.Context, c api.Pin) (err error) { _, span := trace.StartSpan(ctx, "state/dsstate/Add") defer span.End() ps, err := st.serializePin(c) if err != nil { - return err + return } - return st.dsWrite.Put(ctx, st.key(c.Cid), ps) + + has, _ := st.Has(ctx, c.Cid) + defer func() { + if !has && err == nil { + total := atomic.AddInt64(&st.totalPins, 1) + stats.Record(ctx, observations.Pins.M(total)) + } + }() + + err = st.dsWrite.Put(ctx, st.key(c.Cid), ps) + return } // Rm removes an existing Pin. It is a no-op when the @@ -86,6 +104,11 @@ func (st *State) Rm(ctx context.Context, c api.Cid) error { if err == ds.ErrNotFound { return nil } + if err == nil { + total := atomic.AddInt64(&st.totalPins, -1) + stats.Record(ctx, observations.Pins.M(total)) + } + return err } @@ -140,7 +163,7 @@ func (st *State) List(ctx context.Context, out chan<- api.Pin) error { } defer results.Close() - total := 0 + var total int64 for r := range results.Next() { // Abort if we shutdown. select { @@ -177,7 +200,8 @@ func (st *State) List(ctx context.Context, out chan<- api.Pin) error { if total >= 500000 { logger.Infof("Full pinset listing finished: %d pins", total) } - + atomic.StoreInt64(&st.totalPins, total) + stats.Record(ctx, observations.Pins.M(total)) return nil } @@ -308,7 +332,7 @@ type BatchingState struct { // // The Handle controls options for the serialization of the full state // (marshaling/unmarshaling). -func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) { +func NewBatching(ctx context.Context, dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) { if handle == nil { handle = DefaultHandle() } @@ -328,6 +352,8 @@ func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*Ba bst := &BatchingState{} bst.State = st bst.batch = batch + + stats.Record(ctx, observations.Pins.M(0)) return bst, nil } diff --git a/state/dsstate/datastore_test.go b/state/dsstate/datastore_test.go index 324e36993..2dfbe5493 100644 --- a/state/dsstate/datastore_test.go +++ b/state/dsstate/datastore_test.go @@ -29,7 +29,7 @@ var c = api.Pin{ func newState(t *testing.T) *State { store := inmem.New() - ds, err := New(store, "", DefaultHandle()) + ds, err := New(context.Background(), store, "", DefaultHandle()) if err != nil { t.Fatal(err) } diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index d02ba3c82..775d98319 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -110,7 +110,7 @@ type mockRepoGCResp struct { // NewIpfsMock returns a new mock. func NewIpfsMock(t *testing.T) *IpfsMock { store := inmem.New() - st, err := dsstate.New(store, "", dsstate.DefaultHandle()) + st, err := dsstate.New(context.Background(), store, "", dsstate.DefaultHandle()) if err != nil { t.Fatal(err) }