forked from containerd/containerd
/
oom.go
80 lines (73 loc) · 1.59 KB
/
oom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package containerd
import (
"reflect"
"sync"
)
func newNotifier(s *Supervisor) *notifier {
n := ¬ifier{
s: s,
channels: make(map[<-chan struct{}]string),
controller: make(chan struct{}),
}
go n.start()
return n
}
type notifier struct {
m sync.Mutex
channels map[<-chan struct{}]string
controller chan struct{}
s *Supervisor
}
func (n *notifier) start() {
for {
c := n.createCase()
i, _, ok := reflect.Select(c)
if i == 0 {
continue
}
if ok {
ch := c[i].Chan.Interface().(<-chan struct{})
id := n.channels[ch]
e := NewEvent(OOMEventType)
e.ID = id
n.s.SendEvent(e)
continue
}
// the channel was closed and we should remove it
ch := c[i].Chan.Interface().(<-chan struct{})
n.removeChan(ch)
}
}
func (n *notifier) Add(ch <-chan struct{}, id string) {
n.m.Lock()
n.channels[ch] = id
n.m.Unlock()
// signal the main loop to break and add the new
// channels
n.controller <- struct{}{}
}
func (n *notifier) createCase() []reflect.SelectCase {
var out []reflect.SelectCase
// add controller chan so that we can signal when we need to make
// changes in the select. The controller chan will always be at
// index 0 in the slice
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(n.controller),
})
n.m.Lock()
for ch := range n.channels {
v := reflect.ValueOf(ch)
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: v,
})
}
n.m.Unlock()
return out
}
func (n *notifier) removeChan(ch <-chan struct{}) {
n.m.Lock()
delete(n.channels, ch)
n.m.Unlock()
}