/
crdtjson.go
55 lines (43 loc) · 1.14 KB
/
crdtjson.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// crdtjson.go
package crdt
import (
"context"
"github.com/pkg/errors"
"github.com/tidwall/sjson"
)
//
// crdtJSON takes a feed of previously merged crdts, and turns them
// into the json data of the object, returned as a byte array.
//
func crdtJSON(ctx context.Context, iterator chan []byte, in <-chan CRDTData) (
<-chan error, // emits errors encountered to the pipeline manager
error) { // any error encountered when creating this component
errc := make(chan error, 1)
go func() {
defer close(errc)
for cd := range in {
var jsonDoc []byte
for _, id := range cd.CRDT.Keys() {
tuples, ok := cd.CRDT.Get(id).(map[string]interface{})
if !ok {
errc <- errors.New("unexpected content in crdt crdtJSON():")
return
}
for k, v := range tuples {
var err error
jsonDoc, err = sjson.SetBytes(jsonDoc, k, v)
if err != nil {
errc <- errors.Wrap(err, "unable to assign k/v pair crdtJSON():")
return
}
}
}
select {
case iterator <- jsonDoc: // pass the json data out to the consumer
case <-ctx.Done(): // listen for pipeline shutdown
return
}
}
}()
return errc, nil
}