/
pipe.go
125 lines (103 loc) · 2.34 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package csv
// Implements a unidirectional channel that can connect a reader process to a writer process.
type Pipe interface {
Builder() WriterBuilder // Builds a Writer for the write end of the pipe
Reader() Reader // Returns the Reader for the read end of the pipe
}
type pipe struct {
header []string
ch chan Record
init chan interface{}
err error
}
type pipeWriter struct {
pipe *pipe
builder RecordBuilder
}
// Answer a new Pipe whose Builder and Reader can be used to connect two chained
// processes.
func NewPipe() Pipe {
return &pipe{
ch: make(chan Record),
err: nil,
init: make(chan interface{}),
}
}
func (p *pipe) Reader() Reader {
return p
}
func (p *pipe) C() <-chan Record {
<-p.init
return p.ch
}
func (p *pipe) Close() {
}
func (p *pipe) Header() []string {
<-p.init
return p.header
}
func (p *pipe) Error() error {
<-p.init
return p.err
}
func (p *pipe) Builder() WriterBuilder {
return func(header []string) Writer {
p.header = header
close(p.init)
return &pipeWriter{pipe: p, builder: NewRecordBuilder(header)}
}
}
func (p *pipeWriter) Blank() Record {
return p.builder(make([]string, len(p.pipe.header)))
}
func (p *pipeWriter) Close(err error) error {
p.pipe.err = err
close(p.pipe.ch)
return nil
}
func (p *pipeWriter) Error() error {
return p.pipe.err
}
func (p *pipeWriter) Header() []string {
return p.pipe.header
}
func (p *pipeWriter) Write(r Record) error {
p.pipe.ch <- r
return nil
}
// A pipeline of processes.
type pipeline struct {
stages []Process
}
// Join a sequence of processes by connecting them with pipes, returning a new process that
// represents the entire pipeline.
func NewPipeLine(p []Process) Process {
if p == nil || len(p) == 0 {
p = []Process{&CatProcess{}}
}
return &pipeline{
stages: p,
}
}
// Run the pipeline by connecting each stage with pipes and then running each stage
// as a goroutine.
func (p *pipeline) Run(r Reader, b WriterBuilder, errCh chan<- error) {
errCh <- func() (err error) {
errors := make(chan error, len(p.stages))
for _, c := range p.stages[:len(p.stages)-1] {
p := NewPipe()
go c.Run(r, p.Builder(), errors)
r = p.Reader()
}
go p.stages[len(p.stages)-1].Run(r, b, errors)
running := len(p.stages)
for running > 0 {
e := <-errors
running--
if err == nil {
err = e
}
}
return err
}()
}