/
watcher.go
163 lines (141 loc) · 3.9 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package watcher
import (
"log"
"strconv"
"sync"
"github.com/sapk/GoWatch/modules/db"
"github.com/sapk/GoWatch/modules/rrd"
"github.com/sapk/GoWatch/modules/tools"
"golang.org/x/net/icmp"
// "fmt"
// "time"
)
//CPingMap with mutex for concurrency
type CPingMap struct {
sync.RWMutex
m map[string]Ping
}
//Watcher contain watcher informations
type Watcher struct {
DB *db.Db
PingListener *icmp.PacketConn
PingToListen CPingMap
PingSeq uint
PingChannels chan PingResponse
}
var w Watcher
//TODO better handler concurrecny on map
//Init init the Watcher
func Init(d *db.Db) *Watcher {
//TODO get ip to mintor form db at start up
c, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
//c, err := icmp.ListenPacket("udp4", "0.0.0.0")
if err != nil {
log.Fatalf("listen err, %s", err)
}
w = Watcher{PingListener: c, DB: d, PingToListen: CPingMap{m: make(map[string]Ping)}}
StartPingWatcher()
UpdatePingChannels()
//TODO stop and restart
StartLoopPing()
return &w
}
//UpdatePingChannels clean a set the watching of channels
func UpdatePingChannels() {
count, equipements := w.DB.GetEquipements()
log.Println("There is ", count, " elements in db")
//var channels []chan PingResponse
channels := make(map[string]*tools.BroadcastReceiver) //We reset completely the map
for _, eq := range equipements {
channels[eq.IP], _ = RegisterPingWatch(eq.IP, 0) //We add all equipement to continuous ping
}
//Clearing PingToListen map from removed elements
w.PingToListen.RLock()
for ip, listen := range w.PingToListen.m {
//channels[strconv.FormatUint(eq.ID,10)] = RegisterPingWatch(eq.IP, 0); //We add all equipement to continuous ping
if listen.Timeout == 0 {
//We only clear long running ping
if _, ok := channels[ip]; !ok {
//We clear if it's not in long running ping
delete(w.PingToListen.m, ip)
}
}
}
w.PingToListen.RUnlock()
if w.PingChannels != nil {
close(w.PingChannels) //TODO use WaitGroup to close the go routine parsing the PingChannels
}
var outIsClosed *bool
w.PingChannels, outIsClosed = merge(channels)
go func() {
for {
rep, ok := <-w.PingChannels
if !ok {
log.Println("Done the chan must has been reset")
*outIsClosed = true
return
}
log.Println(rep)
eq, err := w.DB.GetEquipementbyIP(db.Equipement{IP: rep.IP}) //TODO check if it exist before logging
//eq.Data=fmt.Sprintf("%v",rep)
if err != nil {
log.Println("Not found in database : ", err)
} else {
eq.Update()
rrd.AddPing(strconv.FormatUint(eq.ID, 10), rep.Time)
}
}
}()
}
//Get get the Watcher
func Get() *Watcher {
return &w
}
func merge(cs map[string]*tools.BroadcastReceiver) (chan PingResponse, *bool) {
var wg sync.WaitGroup
outIsClosed := false
out := make(chan PingResponse)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c *tools.BroadcastReceiver) {
for {
if c == nil {
log.Println("This chan must has been close")
wg.Done()
return
}
n := c.Read().(PingResponse)
if outIsClosed {
log.Println("The output chan as been closed")
wg.Done() // clear the wait group for closing all still open chan
SendPing(n.IP) //We resend for any other chan taht will listen after
return
}
/*
if _, ok := <- out; !ok {
log.Println("The output chan as been closed")
for _, c := range cs {
if _, ok := <- c; ok {
wg.Done()// clear the wait group for closing all still open chan
}
}
return;
}
*/
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
if !outIsClosed {
close(out)
}
}()
return out, &outIsClosed
}