Skip to content
Permalink
Browse files

Replace underlying slice with ring.Ring in metrics window

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
  • Loading branch information...
hsanjuan authored and lanzafame committed Mar 8, 2019
1 parent 5a1dfc2 commit 7711ab8cfdc67d8d2e0e8e08899097683f6416ff
Showing with 355 additions and 62 deletions.
  1. +3 −0 api/types.go
  2. +59 −26 monitor/metrics/window.go
  3. +293 −36 monitor/metrics/window_test.go
@@ -785,12 +785,15 @@ func (n *NodeWithMeta) Size() uint64 {
// Metric transports information about a peer.ID. It is used to decide
// pin allocations by a PinAllocator. IPFS cluster is agnostic to
// the Value, which should be interpreted by the PinAllocator.
// The TS value is a timestamp representing when a peer has received
// the metric value.
type Metric struct {
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
TS int64 `json:"ts" codec:"t,omitempty"` // TS contains a UnixNano timestamp
}

// SetTTL sets Metric to expire after the given time.Duration
@@ -4,11 +4,17 @@
package metrics

import (
"container/ring"
"errors"
"sync"
"time"

logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
)

var logger = logging.Logger("metricwin")

// DefaultWindowCap sets the amount of metrics to store per peer.
var DefaultWindowCap = 25

@@ -17,8 +23,11 @@ var ErrNoMetrics = errors.New("no metrics have been added to this window")

// Window implements a circular queue to store metrics.
type Window struct {
last int
window []*api.Metric
lMu sync.RWMutex
last *api.Metric

wMu sync.RWMutex
window *ring.Ring
}

// NewWindow creates an instance with the given
@@ -28,52 +37,76 @@ func NewWindow(windowCap int) *Window {
panic("invalid windowCap")
}

w := make([]*api.Metric, 0, windowCap)
w := ring.New(windowCap)
return &Window{
last: 0,
last: nil,
window: w,
}
}

// Add adds a new metric to the window. If the window capacity
// has been reached, the oldest metric (by the time it was added),
// will be discarded.
// will be discarded. Add leaves the cursor on the next spot,
// which is either empty or the oldest record.
func (mw *Window) Add(m *api.Metric) {
if len(mw.window) < cap(mw.window) {
mw.window = append(mw.window, m)
mw.last = len(mw.window) - 1
return
}
m.TS = time.Now().UnixNano()

mw.wMu.Lock()
mw.window.Value = m
mw.window = mw.window.Next()
mw.wMu.Unlock()

mw.lMu.Lock()
mw.last = m
mw.lMu.Unlock()

// len == cap
mw.last = (mw.last + 1) % cap(mw.window)
mw.window[mw.last] = m
return
}

// Latest returns the last metric added. It returns an error
// if no metrics were added.
func (mw *Window) Latest() (*api.Metric, error) {
if len(mw.window) == 0 {
mw.lMu.RLock()
if mw.last == nil {
return nil, ErrNoMetrics
}
return mw.window[mw.last], nil
last := mw.last
mw.lMu.RUnlock()
return last, nil
}

// All returns all the metrics in the window, in the inverse order
// they were Added. That is, result[0] will be the last added
// metric.
func (mw *Window) All() []*api.Metric {
wlen := len(mw.window)
res := make([]*api.Metric, 0, wlen)
if wlen == 0 {
return res
}
for i := mw.last; i >= 0; i-- {
res = append(res, mw.window[i])
}
for i := wlen - 1; i > mw.last; i-- {
res = append(res, mw.window[i])
mw.wMu.RLock()
values := make([]*api.Metric, 0, mw.window.Len())
mw.window.Do(func(v interface{}) {
if i, ok := v.(*api.Metric); ok {
// append younger values to older value
values = append([]*api.Metric{i}, values...)
}
})
mw.wMu.RUnlock()
return values
}

// Distribution returns the deltas between all the current
// values contained in the current window. This will
// only return values if the api.Metric.Type() is "ping",
// which are used for accural failure detection.
func (mw *Window) Distribution() []int64 {
ms := mw.All()
dist := make([]int64, 0, len(ms)-1)
for i, v := range ms {
// the last value can't be used to calculate a delta
if i == len(ms)-1 {
break
}
// All() provides an order slice, where ms[i] is younger than ms[i+1]
delta := v.TS - ms[i+1].TS
dist = append(dist, delta)
}
return res

return dist
}

0 comments on commit 7711ab8

Please sign in to comment.
You can’t perform that action at this time.