-
Notifications
You must be signed in to change notification settings - Fork 3
/
stats_collector.go
129 lines (119 loc) · 3.36 KB
/
stats_collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package daemon
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/execdriver"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/libcontainer/system"
)
// newStatsCollector returns a new statsCollector that collections
// network and cgroup stats for a registered container at the specified
// interval. The collector allows non-running containers to be added
// and will start processing stats when they are started.
func newStatsCollector(interval time.Duration) *statsCollector {
s := &statsCollector{
interval: interval,
publishers: make(map[*Container]*pubsub.Publisher),
clockTicks: uint64(system.GetClockTicks()),
}
go s.run()
return s
}
// statsCollector manages and provides container resource stats
type statsCollector struct {
m sync.Mutex
interval time.Duration
clockTicks uint64
publishers map[*Container]*pubsub.Publisher
}
// collect registers the container with the collector and adds it to
// the event loop for collection on the specified interval returning
// a channel for the subscriber to receive on.
func (s *statsCollector) collect(c *Container) chan interface{} {
s.m.Lock()
defer s.m.Unlock()
publisher, exists := s.publishers[c]
if !exists {
publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
s.publishers[c] = publisher
}
return publisher.Subscribe()
}
// stopCollection closes the channels for all subscribers and removes
// the container from metrics collection.
func (s *statsCollector) stopCollection(c *Container) {
s.m.Lock()
if publisher, exists := s.publishers[c]; exists {
publisher.Close()
delete(s.publishers, c)
}
s.m.Unlock()
}
// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
s.m.Lock()
publisher := s.publishers[c]
if publisher != nil {
publisher.Evict(ch)
if publisher.Len() == 0 {
delete(s.publishers, c)
}
}
s.m.Unlock()
}
func (s *statsCollector) run() {
for _ = range time.Tick(s.interval) {
for container, publisher := range s.publishers {
systemUsage, err := s.getSystemCpuUsage()
if err != nil {
log.Errorf("collecting system cpu usage for %s: %v", container.ID, err)
continue
}
stats, err := container.Stats()
if err != nil {
if err != execdriver.ErrNotRunning {
log.Errorf("collecting stats for %s: %v", container.ID, err)
}
continue
}
stats.SystemUsage = systemUsage
publisher.Publish(stats)
}
}
}
const nanoSeconds = 1e9
// getSystemCpuUSage returns the host system's cpu usage in nanoseconds
// for the system to match the cgroup readings are returned in the same format.
func (s *statsCollector) getSystemCpuUsage() (uint64, error) {
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
parts := strings.Fields(sc.Text())
switch parts[0] {
case "cpu":
if len(parts) < 8 {
return 0, fmt.Errorf("invalid number of cpu fields")
}
var sum uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)
if err != nil {
return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
}
sum += v
}
return (sum * nanoSeconds) / s.clockTicks, nil
}
}
return 0, fmt.Errorf("invalid stat format")
}