/
resultstidy.go
59 lines (48 loc) · 1.3 KB
/
resultstidy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// resultstidy.go
package deep6
import (
"context"
"errors"
)
//
// resultsTidy removes n3 specific properties from objects
// has the side effect of pruning PropertyLink objects
// from the results stream, where they add no value.
// Also collates objects by type in the results receiver.
//
func resultsTidy(ctx context.Context, resultsReceiver *map[string][]map[string]interface{}, in <-chan map[string]interface{}) (
<-chan map[string]interface{},
<-chan error, // emits errors encountered to the pipeline
error) {
out := make(chan map[string]interface{})
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
for m := range in {
objectType, ok := m["is-a"].(string)
if !ok {
errc <- errors.New("object with no type encountered in objectTidy():")
return
}
//
// remove n3 object properties
//
delete(m, "is-a")
delete(m, "unique")
if len(m) == 0 { // property.links/unique.links will be empty after tidy-up
continue
}
//
// store the object by type in the results collection
//
(*resultsReceiver)[objectType] = append((*resultsReceiver)[objectType], m)
select {
case out <- m: // pass the data on to the next stage
case <-ctx.Done(): // listen for pipeline shutdown
return
}
}
}()
return out, errc, nil
}