/
store.go
208 lines (176 loc) · 4.33 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package metrics
import (
"sort"
"sync"
"github.com/mtdepin/rep-mgr/api"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// PeerMetrics maps a peer IDs to a metrics window.
type PeerMetrics map[peer.ID]*Window
// Store can be used to store and access metrics.
type Store struct {
mux sync.RWMutex
byName map[string]PeerMetrics
}
// NewStore can be used to create a Store.
func NewStore() *Store {
return &Store{
byName: make(map[string]PeerMetrics),
}
}
// Add inserts a new metric in Metrics.
func (mtrs *Store) Add(m api.Metric) {
mtrs.mux.Lock()
defer mtrs.mux.Unlock()
name := m.Name
peer := m.Peer
mbyp, ok := mtrs.byName[name]
if !ok {
mbyp = make(PeerMetrics)
mtrs.byName[name] = mbyp
}
window, ok := mbyp[peer]
if !ok {
// We always lock the outer map, so we can use unsafe
// Window.
window = NewWindow(DefaultWindowCap)
mbyp[peer] = window
}
window.Add(m)
}
// RemovePeer removes all metrics related to a peer from the Store.
func (mtrs *Store) RemovePeer(pid peer.ID) {
mtrs.mux.Lock()
for _, metrics := range mtrs.byName {
delete(metrics, pid)
}
mtrs.mux.Unlock()
}
// RemovePeerMetrics removes all metrics of a given name for a given peer ID.
func (mtrs *Store) RemovePeerMetrics(pid peer.ID, name string) {
mtrs.mux.Lock()
metrics := mtrs.byName[name]
delete(metrics, pid)
mtrs.mux.Unlock()
}
// LatestValid returns all the last known valid metrics of a given type. A metric
// is valid if it has not expired.
func (mtrs *Store) LatestValid(name string) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return []api.Metric{}
}
metrics := make([]api.Metric, 0, len(byPeer))
for _, window := range byPeer {
m, err := window.Latest()
// TODO(ajl): for accrual, does it matter if a ping has expired?
if err != nil || m.Discard() {
continue
}
metrics = append(metrics, m)
}
sortedMetrics := api.MetricSlice(metrics)
sort.Stable(sortedMetrics)
return sortedMetrics
}
// AllMetrics returns the latest metrics for all peers and metrics types. It
// may return expired metrics.
func (mtrs *Store) AllMetrics() []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
result := make([]api.Metric, 0)
for _, byPeer := range mtrs.byName {
for _, window := range byPeer {
metric, err := window.Latest()
if err != nil || !metric.Valid {
continue
}
result = append(result, metric)
}
}
return result
}
func (mtrs *Store) DiscardedMetrics() []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
result := make([]api.Metric, 0)
for name, byPeer := range mtrs.byName {
if name == "discarded" {
for _, window := range byPeer {
metric, err := window.Latest()
if err != nil {
continue
}
result = append(result, metric)
}
}
}
return result
}
// PeerMetrics returns the latest metrics for a given peer ID for
// all known metrics types. It may return expired metrics.
func (mtrs *Store) PeerMetrics(pid peer.ID) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
result := make([]api.Metric, 0)
for _, byPeer := range mtrs.byName {
window, ok := byPeer[pid]
if !ok {
continue
}
metric, err := window.Latest()
if err != nil || !metric.Valid {
continue
}
result = append(result, metric)
}
return result
}
// PeerMetricAll returns all of a particular metrics for a
// particular peer.
func (mtrs *Store) PeerMetricAll(name string, pid peer.ID) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return nil
}
window, ok := byPeer[pid]
if !ok {
return nil
}
ms := window.All()
return ms
}
// PeerLatest returns the latest of a particular metric for a
// particular peer. It may return an expired metric.
func (mtrs *Store) PeerLatest(name string, pid peer.ID) api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return api.Metric{}
}
window, ok := byPeer[pid]
if !ok {
return api.Metric{}
}
m, err := window.Latest()
if err != nil {
// ignoring error, as nil metric is indicative enough
return api.Metric{}
}
return m
}
// MetricNames returns all the known metric names
func (mtrs *Store) MetricNames() []string {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
list := make([]string, 0, len(mtrs.byName))
for k := range mtrs.byName {
list = append(list, k)
}
return list
}