-
Notifications
You must be signed in to change notification settings - Fork 13
/
multi_configuration_source.go
73 lines (65 loc) · 1.94 KB
/
multi_configuration_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package app
import (
"github.com/jitsucom/bulker/jitsubase/safego"
"reflect"
)
type MultiConfigurationSource struct {
configurationSources []ConfigurationSource
changesChan chan bool
closeChan chan struct{}
}
func NewMultiConfigurationSource(configurationSources []ConfigurationSource) *MultiConfigurationSource {
mcs := MultiConfigurationSource{configurationSources: configurationSources,
changesChan: make(chan bool, 1),
closeChan: make(chan struct{})}
// gather signals from all changes channels from configuration sources into one channel
safego.RunWithRestart(func() {
cases := make([]reflect.SelectCase, len(configurationSources))
for i, cs := range configurationSources {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(cs.ChangesChannel())}
}
for {
select {
case <-mcs.closeChan:
return
default:
// Use reflect.Select to select from the dynamic numbers of channels
i, _, ok := reflect.Select(cases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
cases[i].Chan = reflect.ValueOf(nil)
} else {
mcs.changesChan <- true
}
}
}
})
return &mcs
}
func (mcs *MultiConfigurationSource) GetDestinationConfigs() []*DestinationConfig {
results := make([]*DestinationConfig, 0)
for _, cs := range mcs.configurationSources {
results = append(results, cs.GetDestinationConfigs()...)
}
return results
}
func (mcs *MultiConfigurationSource) GetDestinationConfig(id string) *DestinationConfig {
for _, cs := range mcs.configurationSources {
cfg := cs.GetDestinationConfig(id)
if cfg != nil {
return cfg
}
}
return nil
}
func (mcs *MultiConfigurationSource) ChangesChannel() <-chan bool {
return mcs.changesChan
}
func (mcs *MultiConfigurationSource) Close() error {
close(mcs.closeChan)
for _, cs := range mcs.configurationSources {
_ = cs.Close()
}
close(mcs.changesChan)
return nil
}