-
Notifications
You must be signed in to change notification settings - Fork 0
/
fan.go
55 lines (50 loc) · 1.16 KB
/
fan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cortical
import "sync"
func fanOut(ch <-chan []byte, size, lag int) []chan []byte {
cs := make([]chan []byte, size)
for i := range cs {
// The size of the channels buffer controls how far behind the receivers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan []byte, lag)
}
go func() {
for msg := range ch {
for _, c := range cs {
c <- msg
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func merge(done <-chan struct{}, cs ...<-chan []byte) <-chan []byte {
var wg sync.WaitGroup
out := make(chan []byte)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan []byte) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}