forked from taggledevel2/ratchet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
io_reader.go
73 lines (63 loc) · 1.59 KB
/
io_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package processors
import (
"bufio"
"compress/gzip"
"io"
"github.com/dailyburn/ratchet/data"
"github.com/dailyburn/ratchet/util"
)
// IoReader wraps an io.Reader and reads it.
type IoReader struct {
Reader io.Reader
LineByLine bool // defaults to true
BufferSize int
Gzipped bool
}
// NewIoReader returns a new IoReader wrapping the given io.Reader object.
func NewIoReader(reader io.Reader) *IoReader {
return &IoReader{Reader: reader, LineByLine: true, BufferSize: 1024}
}
func (r *IoReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
if r.Gzipped {
gzReader, err := gzip.NewReader(r.Reader)
util.KillPipelineIfErr(err, killChan)
r.Reader = gzReader
}
r.ForEachData(killChan, func(d data.JSON) {
outputChan <- d
})
}
func (r *IoReader) Finish(outputChan chan data.JSON, killChan chan error) {
}
func (r *IoReader) ForEachData(killChan chan error, foo func(d data.JSON)) {
if r.LineByLine {
r.scanLines(killChan, foo)
} else {
r.bufferedRead(killChan, foo)
}
}
func (r *IoReader) scanLines(killChan chan error, forEach func(d data.JSON)) {
scanner := bufio.NewScanner(r.Reader)
for scanner.Scan() {
forEach(data.JSON(scanner.Text()))
}
err := scanner.Err()
util.KillPipelineIfErr(err, killChan)
}
func (r *IoReader) bufferedRead(killChan chan error, forEach func(d data.JSON)) {
reader := bufio.NewReader(r.Reader)
d := make([]byte, r.BufferSize)
for {
n, err := reader.Read(d)
if err != nil && err != io.EOF {
killChan <- err
}
if n == 0 {
break
}
forEach(d)
}
}
func (r *IoReader) String() string {
return "IoReader"
}