/
io_reader_writer.go
44 lines (37 loc) · 1.34 KB
/
io_reader_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package processors
import (
"io"
"github.com/teambenny/goetl/etldata"
)
// IoReaderWriter performs both the job of a IoReader and IoWriter.
// It will read data from the given io.Reader, write the resulting data to
// the given io.Writer, and (if the write was successful) send the data
// to the next stage of processing.
//
// IoReaderWriter is composed of both a IoReader and IoWriter, so it
// supports all of the same properties and usage options.
type IoReaderWriter struct {
IoReader
IoWriter
}
// NewIoReaderWriter returns a new IoReaderWriter wrapping the given io.Reader object
func NewIoReaderWriter(reader io.Reader, writer io.Writer) *IoReaderWriter {
r := IoReaderWriter{}
r.IoReader = *NewIoReader(reader)
r.IoWriter = *NewIoWriter(writer)
return &r
}
// ProcessData grabs data from IoReader.ForEachData, then sends it to IoWriter.ProcessData in addition
// to sending it upstream on the outputChan
func (r *IoReaderWriter) ProcessData(d etldata.Payload, outputChan chan etldata.Payload, killChan chan error) {
r.ForEachData(killChan, func(d etldata.Payload) {
r.IoWriter.ProcessData(d, outputChan, killChan)
outputChan <- d
})
}
// Finish - see interface for documentation.
func (r *IoReaderWriter) Finish(outputChan chan etldata.Payload, killChan chan error) {
}
func (r *IoReaderWriter) String() string {
return "IoReaderWriter"
}