-
Notifications
You must be signed in to change notification settings - Fork 0
/
merger.go
91 lines (82 loc) · 1.9 KB
/
merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package merger
import (
"bufio"
"log"
"os"
"strconv"
"strings"
)
type line struct {
id int
restOfLine string
}
/*func Merge(metadata string, branded string) {
// read metadata
metadataChan := make(chan *line)
go reader(metadata, metadataChan)
// read strength set IDs
brandedChan := make(chan *line)
go reader(branded, brandedChan)
// join the two data streams
mergedLinesChan := make(chan *line)
go joiner(metadataChan, brandedChan, mergedLinesChan)
for l := range mergedLinesChan {
fmt.Printf("%v,%v\n", l.id, l.restOfLine)
}
}*/
func Reader(fname string, out chan<- *line) {
defer close(out) // close channel on return
// open the file
file, err := os.Open(fname)
if err != nil {
log.Fatalf("open: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
header := true
for scanner.Scan() {
var l line
columns := strings.SplitN(scanner.Text(), ",", 2)
// ignore first line (header)
if header {
header = false
continue
}
// convert ID to integer for easier comparison
id, err := strconv.Atoi(strings.ReplaceAll(columns[0], "\"", ""))
if err != nil {
log.Fatalf("ParseInt: %v", err)
}
l.id = id
l.restOfLine = columns[1]
// send the line to the channel
out <- &l
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
func Joiner(metadata, setIDs <-chan *line, out chan<- *line) {
defer close(out) // close channel on return
bf := &line{}
for md := range metadata {
sep := ","
// add matching branded_foods.csv line (if left over from previous iteration)
if bf.id == md.id {
md.restOfLine += sep + bf.restOfLine
sep = " "
}
// look for matching branded foods
for bf = range setIDs {
// add all branded_foods.csv with matching IDs
if bf.id == md.id {
md.restOfLine += sep + bf.restOfLine
sep = " "
} else if bf.id > md.id {
break
}
}
// send the augmented line into the channel
out <- md
}
}