forked from aswinkarthik/csvdiff
-
Notifications
You must be signed in to change notification settings - Fork 0
/
digest.go
97 lines (80 loc) · 2.29 KB
/
digest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package digest
import (
"encoding/csv"
"runtime"
"sync"
"github.com/cespare/xxhash"
)
// Digest represents the binding of the key of each csv line
// and the digest that gets created for the entire line
type Digest struct {
Key uint64
Value uint64
Source []string
}
// CreateDigest creates a Digest for each line of csv.
// There will be one Digest per line
func CreateDigest(csv []string, separator string, pKey Positions, pRow Positions) Digest {
key := xxhash.Sum64String(pKey.Join(csv, separator))
digest := xxhash.Sum64String(pRow.Join(csv, separator))
return Digest{Key: key, Value: digest, Source: csv}
}
const bufferSize = 512
// Create can create a Digest using the Configurations passed.
// It returns the digest as a map[uint64]uint64.
// It can also keep track of the Source line.
func Create(config *Config) (map[uint64]uint64, map[uint64][]string, error) {
maxProcs := runtime.NumCPU()
reader := csv.NewReader(config.Reader)
reader.Comma = config.Separator
reader.LazyQuotes = config.LazyQuotes
output := make(map[uint64]uint64)
sourceMap := make(map[uint64][]string)
digestChannel := make(chan []Digest, bufferSize*maxProcs)
errorChannel := make(chan error)
defer close(errorChannel)
go readAndProcess(config, reader, digestChannel, errorChannel)
for digests := range digestChannel {
for _, digest := range digests {
output[digest.Key] = digest.Value
sourceMap[digest.Key] = digest.Source
}
}
if err := <-errorChannel; err != nil {
return nil, nil, err
}
return output, sourceMap, nil
}
func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest, errorChannel chan<- error) {
var wg sync.WaitGroup
for {
lines, eofReached, err := getNextNLines(reader)
if err != nil {
wg.Wait()
close(digestChannel)
errorChannel <- err
return
}
wg.Add(1)
go createDigestForNLines(lines, config, digestChannel, &wg)
if eofReached {
break
}
}
wg.Wait()
close(digestChannel)
errorChannel <- nil
}
func createDigestForNLines(lines [][]string,
config *Config,
digestChannel chan<- []Digest,
wg *sync.WaitGroup,
) {
output := make([]Digest, len(lines))
separator := string(config.Separator)
for i, line := range lines {
output[i] = CreateDigest(line, separator, config.Key, config.Value)
}
digestChannel <- output
wg.Done()
}