-
Notifications
You must be signed in to change notification settings - Fork 31
/
multi.go
78 lines (67 loc) · 1.71 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package multi
import (
"context"
"github.com/grab/async"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/key"
"golang.org/x/sync/errgroup"
)
// SubWriter represents the sub-writer
type SubWriter interface {
Write(key.Key, []block.Block) error
}
// streamer represents the sub-streamer
type streamer interface {
Stream(row block.Row) error
Run(ctx context.Context) (async.Task, error)
}
// Writer represents a writer that writes into multiple sub-writers.
type Writer struct {
writers []SubWriter
streamers []streamer
}
// New ...
func New(writers ...SubWriter) *Writer {
streamers := make([]streamer, 0, len(writers))
for _, v := range writers {
if streamer, ok := v.(streamer); ok {
streamers = append(streamers, streamer)
}
}
return &Writer{
writers: writers,
streamers: streamers,
}
}
// Write writes the data to the sink.
func (w *Writer) Write(key key.Key, blocks []block.Block) error {
eg := new(errgroup.Group)
for _, w := range w.writers {
w := w
eg.Go(func() error {
return w.Write(key, blocks)
})
}
// Wait blocks until all finished, and returns the first non-nil error (if any) from them
return eg.Wait()
}
// Run launches the asynchronous infinite loop for streamers to start streaming data
func (w *Writer) Run(ctx context.Context) (async.Task, error) {
for _, w := range w.streamers {
_, err := w.Run(ctx)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Stream streams the data to the sink
func (w *Writer) Stream(row block.Row) error {
for _, w := range w.streamers {
// When a stream fails, the following streams will not be run
if err := w.Stream(row); err != nil {
return err
}
}
return nil
}