/
modules.go
114 lines (98 loc) · 2.49 KB
/
modules.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
// This file contains the implementation of a map of strings and booleans that are updated
// by the servers critical components. Clients can also request that if any of the modules goes into
// a false condition that they will be informed using a channel and when the entire collection
// goes into a true condition that they will also be informed.
//
// This allows for a server to for example maintain an overall check on whether any of its critical
// modules are down and when they are all alive.
import (
"fmt"
"sync"
"time"
"golang.org/x/net/context"
)
type Modules struct{}
type catalog struct {
listeners []chan bool
m map[string]bool
sync.Mutex
}
var (
modules = catalog{
listeners: []chan bool{},
m: map[string]bool{},
}
modulesUpdateC = make(chan struct{})
)
func (*Modules) AddListener(listener chan bool) {
modules.Lock()
defer modules.Unlock()
modules.listeners = append(modules.listeners, listener)
}
func (*Modules) SetModule(module string, up bool) {
modules.Lock()
defer modules.Unlock()
modules.m[module] = up
select {
case modulesUpdateC <- struct{}{}:
default:
}
}
func (*Modules) doUpdate() {
modules.Lock()
defer modules.Unlock()
downModules := make([]string, 0, len(modules.m))
upModules := make([]string, 0, len(modules.m))
// Is the sever entirely up or not
up := true
for k, v := range modules.m {
if v != true {
up = false
downModules = append(downModules, k)
} else {
upModules = append(upModules, k)
}
}
if !up {
logger.Info(fmt.Sprintf("down modules %v, up modules %v", downModules, upModules))
}
// Tell everyone what the collective state is for the server
for i, listener := range modules.listeners {
func() {
defer func() {
// A send to a closed channel will panic and so if a
// panic does occur we remove the listener
if r := recover(); r != nil {
modules.Lock()
defer modules.Unlock()
if len(modules.listeners) <= 1 {
modules.listeners = []chan bool{}
return
}
modules.listeners = append(modules.listeners[:i], modules.listeners[i+1:]...)
}
}()
select {
case <-time.After(20 * time.Millisecond):
case listener <- up:
}
}()
}
}
func initModuleTracking(ctx context.Context) {
go func() {
internalCheck := time.Duration(5 * time.Second)
modules := &Modules{}
for {
select {
case <-time.After(internalCheck):
modules.doUpdate()
case <-modulesUpdateC:
modules.doUpdate()
case <-ctx.Done():
return
}
}
}()
}