/
lxd_events.go
127 lines (104 loc) · 2.65 KB
/
lxd_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package lxd
import (
"encoding/json"
"time"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
)
// Event handling functions
// GetEvents connects to the LXD monitoring interface
func (r *ProtocolLXD) GetEvents() (*EventListener, error) {
// Prevent anything else from interacting with the listeners
r.eventListenersLock.Lock()
defer r.eventListenersLock.Unlock()
// Setup a new listener
listener := EventListener{
r: r,
chActive: make(chan bool),
}
if r.eventListeners != nil {
// There is an existing Go routine setup, so just add another target
r.eventListeners = append(r.eventListeners, &listener)
return &listener, nil
}
// Initialize the list if needed
r.eventListeners = []*EventListener{}
// Setup a new connection with LXD
url, err := r.setQueryAttributes("/events")
if err != nil {
return nil, err
}
conn, err := r.websocket(url)
if err != nil {
return nil, err
}
// Add the listener
r.eventListeners = append(r.eventListeners, &listener)
// Spawn a watcher that will close the websocket connection after all
// listeners are gone.
stopCh := make(chan struct{}, 0)
go func() {
for {
select {
case <-time.After(time.Minute):
case <-stopCh:
break
}
r.eventListenersLock.Lock()
if len(r.eventListeners) == 0 {
// We don't need the connection anymore, disconnect
conn.Close()
r.eventListeners = nil
r.eventListenersLock.Unlock()
break
}
r.eventListenersLock.Unlock()
}
}()
// Spawn the listener
go func() {
for {
_, data, err := conn.ReadMessage()
if err != nil {
// Prevent anything else from interacting with the listeners
r.eventListenersLock.Lock()
defer r.eventListenersLock.Unlock()
// Tell all the current listeners about the failure
for _, listener := range r.eventListeners {
listener.err = err
listener.disconnected = true
close(listener.chActive)
}
// And remove them all from the list
r.eventListeners = nil
conn.Close()
close(stopCh)
return
}
// Attempt to unpack the message
event := api.Event{}
err = json.Unmarshal(data, &event)
if err != nil {
continue
}
// Extract the message type
if event.Type == "" {
continue
}
// Send the message to all handlers
r.eventListenersLock.Lock()
for _, listener := range r.eventListeners {
listener.targetsLock.Lock()
for _, target := range listener.targets {
if target.types != nil && !shared.StringInSlice(event.Type, target.types) {
continue
}
go target.function(event)
}
listener.targetsLock.Unlock()
}
r.eventListenersLock.Unlock()
}
}()
return &listener, nil
}