-
Notifications
You must be signed in to change notification settings - Fork 7
/
main.go
100 lines (83 loc) · 2.13 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// span-update-labels takes a TSV of IDs and ISILs and updates an
// intermediate schema record x.labels field accordingly. The mapping is kept
// in memory, so there is limit to the number of lines in the input file.
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"os"
"runtime"
"strings"
log "github.com/sirupsen/logrus"
"bufio"
"github.com/miku/span"
"github.com/miku/span/formats/finc"
"github.com/miku/span/parallel"
)
// SplitTrim splits a strings s on a separator and trims whitespace off the resulting parts.
func SplitTrim(s, sep string) (result []string) {
for _, r := range strings.Split(s, sep) {
result = append(result, strings.TrimSpace(r))
}
return
}
func main() {
showVersion := flag.Bool("v", false, "prints current program version")
labelFile := flag.String("f", "", "path to comma separated file with ID and ISIL")
separator := flag.String("s", ",", "separator value")
size := flag.Int("b", 25000, "batch size")
numWorkers := flag.Int("w", runtime.NumCPU(), "number of workers")
flag.Parse()
if *showVersion {
fmt.Println(span.AppVersion)
os.Exit(0)
}
// No label file, nothing to change.
if *labelFile == "" {
os.Exit(0)
}
f, err := os.Open(*labelFile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
br := bufio.NewReader(f)
// Map record ids to a list of labels (ISIL). This is a memory bottleneck.
labelMap := make(map[string][]string)
for {
line, err := br.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
if parts := SplitTrim(line, *separator); len(parts) > 0 {
labelMap[parts[0]] = parts[1:]
}
}
w := bufio.NewWriter(os.Stdout)
defer w.Flush()
p := parallel.NewProcessor(bufio.NewReader(os.Stdin), w, func(_ int64, b []byte) ([]byte, error) {
var is finc.IntermediateSchema
if err := json.Unmarshal(b, &is); err != nil {
return nil, err
}
if v, ok := labelMap[is.ID]; ok {
is.Labels = v
}
bb, err := json.Marshal(is)
if err != nil {
return bb, err
}
bb = append(bb, '\n')
return bb, nil
})
p.NumWorkers = *numWorkers
p.BatchSize = *size
if err := p.Run(); err != nil {
log.Fatal(err)
}
}