forked from pydio/cells
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
148 lines (133 loc) · 3.98 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package proxy
import (
"fmt"
"strings"
"sync"
"github.com/pydio/cells/common/caddy"
"go.uber.org/zap/zapcore"
"github.com/pydio/cells/common/config"
"github.com/micro/go-micro/broker"
"go.uber.org/zap"
"github.com/pydio/cells/common"
)
var (
servicesPrefix = []string{
common.ServiceGatewayNamespace_,
common.ServiceWebNamespace_,
}
)
type watcher struct {
services map[string]map[string]bool
servicesLock *sync.Mutex
restartFunc func()
loggerFunc func(msg string, fields ...zapcore.Field)
}
// newWatcher initialize internal resources for a watcher
func newWatcher(loggerFunc func(msg string, fields ...zapcore.Field), restartFunc func()) *watcher {
w := &watcher{
restartFunc: restartFunc,
loggerFunc: loggerFunc,
services: make(map[string]map[string]bool),
servicesLock: &sync.Mutex{},
}
return w
}
func (w *watcher) subscribeToBroker() error {
// Adding subscriber
_, err := broker.Subscribe(common.TopicServiceRegistration, func(p broker.Publication) error {
sType := string(p.Message().Body)
sName := p.Message().Header[common.EventHeaderServiceRegisterService]
sPeer := p.Message().Header[common.EventHeaderServiceRegisterPeer]
switch sType {
case common.EventTypeServiceRegistered:
if w.restartOnStarted(sName, sPeer) {
w.loggerFunc("Register Message triggers Caddy restart", zap.Any("srvName", sName), zap.Any("headers", p.Message().Header))
w.restartFunc()
}
case common.EventTypeServiceUnregistered:
if w.restartOnStopped(sName, sPeer) {
w.loggerFunc("Unregister Message triggers Caddy restart", zap.Any("srvName", sName), zap.Any("headers", p.Message().Header))
w.restartFunc()
}
case common.EventTypeDebugPrintInternals:
if sName == common.ServiceGatewayProxy && caddy.LastKnownCaddyFile != "" {
fmt.Println("***********************************************************************************")
fmt.Println(" => Caddy file currently served by Gateway Proxy 👇 ")
fmt.Println(caddy.LastKnownCaddyFile)
fmt.Println("***********************************************************************************")
}
}
return nil
})
return err
}
func (w *watcher) subscribeToConfigs(path ...string) error {
cw, err := config.Watch(path...)
if err != nil {
return err
}
go func() {
defer cw.Stop()
for {
if _, err := cw.Next(); err != nil {
break
}
w.loggerFunc("Triggers Caddy restart on config change for path", zap.Strings("path", path))
w.restartFunc()
}
}()
return nil
}
// restartOnStarted checks if service/peer is not already registered and states if we should restart Caddy
func (w *watcher) restartOnStarted(serviceName, peerAddress string) bool {
if !w.isWatchable(serviceName) {
return false
}
w.servicesLock.Lock()
defer w.servicesLock.Unlock()
var knownPeers map[string]bool
if pp, ok := w.services[serviceName]; ok {
knownPeers = pp
} else {
knownPeers = make(map[string]bool)
}
if _, found := knownPeers[peerAddress]; found {
// Already known, ignore
return false
}
// Register peer and trigger restart
knownPeers[peerAddress] = true
w.services[serviceName] = knownPeers
return true
}
// restartOnStopped checks if service/peer was already registered and states if we should restart Caddy
func (w *watcher) restartOnStopped(serviceName, peerAddress string) bool {
if !w.isWatchable(serviceName) {
return false
}
w.servicesLock.Lock()
defer w.servicesLock.Unlock()
var knownPeers map[string]bool
if pp, ok := w.services[serviceName]; ok {
knownPeers = pp
if _, found := knownPeers[peerAddress]; found {
delete(knownPeers, peerAddress)
if len(knownPeers) > 0 {
w.services[serviceName] = knownPeers
} else {
delete(w.services, serviceName)
}
return true
}
}
return false
}
// isWatchable check if events must be checked on this service based on prefixes
func (w *watcher) isWatchable(serviceName string) bool {
for _, prefix := range servicesPrefix {
if strings.HasPrefix(serviceName, prefix) {
return true
}
}
return false
}