/
jsonreader.go
64 lines (51 loc) · 1.33 KB
/
jsonreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// jsonreader.go
package crdt
import (
"context"
"encoding/json"
"io"
"github.com/pkg/errors"
)
//
// Iterator for json objects presented through a reader; file,
// http request, stdin etc.
//
// Reads json data from an array in a stream such as a file
// and feeds each parsed object into the ingest pipeline.
//
// ctx - required context for pipeline management
// r - reader accessing json data
//
func jsonReaderSource(ctx context.Context, r io.Reader) (
<-chan map[string]interface{}, // source emits json objects read from file as map
<-chan error, // emits any errors encountered to the pipeline
error) { // any error when creating the source stage itself
out := make(chan map[string]interface{})
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
d := json.NewDecoder(r)
// read opening brace "["
_, err := d.Token()
if err != nil {
errc <- errors.Wrap(err, "unexpected token; json file should be json array")
return
}
// read json objects one by one
for d.More() {
var m map[string]interface{}
err := d.Decode(&m)
if err != nil {
errc <- errors.Wrap(err, "unable to decode json object.")
return
}
select {
case out <- m: // pass the map onto the next stage
case <-ctx.Done(): // listen for pipeline shutdown
return
}
}
}()
return out, errc, nil
}