-
Notifications
You must be signed in to change notification settings - Fork 0
/
loadbalancer.go
210 lines (181 loc) · 4.82 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package tinylb
import (
"os"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
type LoadBalancer struct {
sync.RWMutex
Config
// configPath is the path to the load balancer's config file.
configPath string
// stop signals that all long-running goroutines for this target group should stop.
stop bool
// closedWatcher signals that the config watcher has exited.
closedWatcher chan bool
// targetGroups maps target group names to target groups.
targetGroups map[string]*TargetGroup
log *logrus.Logger
}
// Open starts a new load balancer with the given config, ready to receive
// incoming TCP connections.
func Open(configPath string, log *logrus.Logger) (*LoadBalancer, error) {
lb := &LoadBalancer{
configPath: configPath,
closedWatcher: make(chan bool),
targetGroups: make(map[string]*TargetGroup),
log: log,
}
err := lb.ReloadConfig()
if err != nil {
return nil, err
}
go lb.configWatcher()
return lb, nil
}
// UpdateConfig applies the given config to this load balancer without downtime
// for existing or new connections.
func (lb *LoadBalancer) UpdateConfig(config Config) error {
lb.Lock()
defer lb.Unlock()
lb.log.Info("updating config")
// Find target groups that need to be created.
create := make([]TargetGroupConfig, 0)
targetGroupConfigs := make(map[string]TargetGroupConfig, len(config.TargetGroups))
for _, targetGroupConfig := range config.TargetGroups {
if _, ok := lb.targetGroups[targetGroupConfig.Name]; !ok {
create = append(create, targetGroupConfig)
}
targetGroupConfigs[targetGroupConfig.Name] = targetGroupConfig
}
// Find target groups that need to be updated or removed.
update := make([]*TargetGroup, 0)
remove := make([]*TargetGroup, 0)
for _, target := range lb.targetGroups {
if _, ok := targetGroupConfigs[target.Name]; ok {
update = append(update, target)
} else {
remove = append(remove, target)
}
}
// Open new target groups.
for _, targetConfig := range create {
target, err := openTargetGroup(targetConfig, lb.log)
if err != nil {
return err
}
lb.targetGroups[target.Name] = target
}
// Update existing target groups.
for _, targetGroup := range update {
targetGroup := lb.targetGroups[targetGroup.Name]
targetGroupConfig := targetGroupConfigs[targetGroup.Name]
err := targetGroup.updateConfig(targetGroupConfig)
if err != nil {
return err
}
}
// Remove and drain target groups that are absent from the config.
var wg sync.WaitGroup
for _, targetGroup := range remove {
wg.Add(1)
go func(tg *TargetGroup) {
tg.drain()
wg.Done()
}(lb.targetGroups[targetGroup.Name])
delete(lb.targetGroups, targetGroup.Name)
}
wg.Wait()
lb.log.Info("done updating config")
return nil
}
// Close gracefully shuts down this load balancer, draining all load balancer target
// groups and shutting down the control plane server if it's running.
func (lb *LoadBalancer) Close() error {
lb.Lock()
if lb.stop {
lb.Unlock()
lb.log.Info("already closed - noop")
return nil
}
lb.stop = true
lb.Unlock()
lb.log.Info("closing config watcher")
<-lb.closedWatcher
lb.log.Info("config watcher closed")
lb.Lock()
defer lb.Unlock()
lb.log.Info("draining all targets")
var wg sync.WaitGroup
for _, targetGroup := range lb.targetGroups {
wg.Add(1)
func(tg *TargetGroup) {
defer wg.Done()
tg.drain()
}(targetGroup)
}
wg.Wait()
lb.log.Info("done draining all targets")
return nil
}
func (lb *LoadBalancer) ReloadConfig() error {
configFile, err := os.Open(lb.configPath)
if err != nil {
return err
}
defer configFile.Close()
config, err := LoadConfig(configFile)
if err != nil {
return err
}
err = lb.UpdateConfig(*config)
if err != nil {
return err
}
return nil
}
// configWatcher listens for config file changes at the load balancer's configPath.
// Adapted from: https://martensson.io/go-fsnotify-and-kubernetes-configmaps
func (lb *LoadBalancer) configWatcher() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
lb.log.Panicln(err)
}
defer watcher.Close()
err = watcher.Add(lb.configPath)
if err != nil {
lb.log.Errorln("error watching config path: ", err)
}
for {
lb.RLock()
shouldExit := lb.stop
lb.RUnlock()
if shouldExit {
break
}
select {
case <-time.After(1 * time.Second):
continue
case event := <-watcher.Events:
var err error
// Support k8s configmap updates, which look like removals.
if event.Op&fsnotify.Remove == fsnotify.Remove {
watcher.Remove(event.Name)
watcher.Add(lb.configPath)
err = lb.ReloadConfig()
}
// Support normal file updates.
if event.Op&fsnotify.Write == fsnotify.Write {
err = lb.ReloadConfig()
}
if err != nil {
lb.log.Error("error reloading config: ", err)
}
case err := <-watcher.Errors:
lb.log.Error("error watching config: ", err)
}
}
lb.closedWatcher <- true
}