-
Notifications
You must be signed in to change notification settings - Fork 246
/
muxer.go
130 lines (121 loc) · 2.97 KB
/
muxer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package discovery
import (
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/discv5"
)
// NewMultiplexer creates Multiplexer instance.
func NewMultiplexer(discoveries []Discovery) Multiplexer {
return Multiplexer{discoveries}
}
// Multiplexer allows to use multiple discoveries behind single Discovery interface.
type Multiplexer struct {
discoveries []Discovery
}
// Running should return true if at least one discovery is running
func (m Multiplexer) Running() (rst bool) {
for i := range m.discoveries {
rst = rst || m.discoveries[i].Running()
}
return rst
}
// Start every discovery and stop every started in case if at least one fails.
func (m Multiplexer) Start() (err error) {
started := []int{}
for i := range m.discoveries {
if err = m.discoveries[i].Start(); err != nil {
break
}
started = append(started, i)
}
if err != nil {
for _, i := range started {
_ = m.discoveries[i].Stop()
}
}
return err
}
// Stop every discovery.
func (m Multiplexer) Stop() (err error) {
messages := []string{}
for i := range m.discoveries {
if err = m.discoveries[i].Stop(); err != nil {
messages = append(messages, err.Error())
}
}
if len(messages) != 0 {
return fmt.Errorf("failed to stop discoveries: %s", strings.Join(messages, "; "))
}
return nil
}
// Register passed topic and stop channel to every discovery and waits till it will return.
func (m Multiplexer) Register(topic string, stop chan struct{}) error {
errors := make(chan error, len(m.discoveries))
for i := range m.discoveries {
i := i
go func() {
errors <- m.discoveries[i].Register(topic, stop)
}()
}
total := 0
messages := []string{}
for err := range errors {
total++
if err != nil {
messages = append(messages, err.Error())
}
if total == len(m.discoveries) {
break
}
}
if len(messages) != 0 {
return fmt.Errorf("failed to register %s: %s", topic, strings.Join(messages, "; "))
}
return nil
}
// Discover shares topic and channles for receiving results. And multiplexer periods that are sent to period channel.
func (m Multiplexer) Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error {
var (
periods = make([]chan time.Duration, len(m.discoveries))
messages = []string{}
wg sync.WaitGroup
mu sync.Mutex
)
wg.Add(len(m.discoveries) + 1)
for i := range m.discoveries {
i := i
periods[i] = make(chan time.Duration, 2)
go func() {
err := m.discoveries[i].Discover(topic, periods[i], found, lookup)
if err != nil {
mu.Lock()
messages = append(messages, err.Error())
mu.Unlock()
}
wg.Done()
}()
}
go func() {
for {
newPeriod, ok := <-period
for i := range periods {
if !ok {
close(periods[i])
} else {
periods[i] <- newPeriod
}
}
if !ok {
wg.Done()
return
}
}
}()
wg.Wait()
if len(messages) != 0 {
return fmt.Errorf("failed to discover topic %s: %s", topic, strings.Join(messages, "; "))
}
return nil
}