-
Notifications
You must be signed in to change notification settings - Fork 59
/
reader.go
105 lines (93 loc) · 2.16 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package csv
import (
"bufio"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"github.com/vesoft-inc/nebula-importer/v3/pkg/base"
"github.com/vesoft-inc/nebula-importer/v3/pkg/config"
"github.com/vesoft-inc/nebula-importer/v3/pkg/logger"
)
type CSVReader struct {
CSVConfig *config.CSVConfig
reader *csv.Reader
lineNum uint64
rr *recordReader
br *bufio.Reader
totalBytes int64
initComplete bool
runnerLogger *logger.RunnerLogger
}
type recordReader struct {
io.Reader
remainingBytes int
}
func (r *recordReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.remainingBytes += n
return
}
func (r *CSVReader) InitReader(file *os.File, runnerLogger *logger.RunnerLogger) {
r.runnerLogger = runnerLogger
r.rr = &recordReader{
Reader: file,
}
r.br = bufio.NewReader(r.rr)
r.reader = csv.NewReader(r.br)
if r.CSVConfig.Delimiter != nil {
d := []rune(*r.CSVConfig.Delimiter)
if len(d) > 0 {
r.reader.Comma = d[0]
logger.Log.Infof("The delimiter of %s is %#U", file.Name(), r.reader.Comma)
}
}
if r.CSVConfig.LazyQuotes != nil {
r.reader.LazyQuotes = *r.CSVConfig.LazyQuotes
}
stat, err := file.Stat()
if err != nil {
logger.Log.Infof("The stat of %s is wrong, %s", file.Name(), err)
}
r.totalBytes = stat.Size()
defer func() {
r.initComplete = true
}()
}
func (r *CSVReader) ReadLine() (base.Data, error) {
line, err := r.reader.Read()
if err != nil {
return base.Data{}, err
}
r.lineNum++
n := r.rr.remainingBytes - r.br.Buffered()
r.rr.remainingBytes -= n
if *r.CSVConfig.WithHeader && r.lineNum == 1 {
if *r.CSVConfig.WithLabel {
return base.HeaderData(line[1:], n), nil
} else {
return base.HeaderData(line, n), nil
}
}
if *r.CSVConfig.WithLabel {
switch line[0] {
case "+":
return base.InsertData(line[1:], n), nil
case "-":
return base.DeleteData(line[1:], n), nil
default:
return base.Data{
Bytes: n,
}, fmt.Errorf("Invalid label: %s", line[0])
}
} else {
return base.InsertData(line, n), nil
}
}
func (r *CSVReader) TotalBytes() (int64, error) {
if r.initComplete {
return r.totalBytes, nil
}
return 0, errors.New("init not complete")
}