-
Notifications
You must be signed in to change notification settings - Fork 111
/
textreader.go
133 lines (113 loc) · 3.36 KB
/
textreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package blob
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"gocloud.dev/blob"
)
var _newLineSeparator = []byte("\n")
type textExtractOption struct {
extractOption *extractOption
hasCSVHeader bool // set if first row is header
}
// downloadText copies partial data to fw with the assumption that rows are separated by \n
// the data format doesn't necessarily have to be csv
func downloadText(ctx context.Context, bucket *blob.Bucket, obj *blob.ListObject, option *textExtractOption, fw *os.File) error {
reader := NewBlobObjectReader(ctx, bucket, obj)
rows, err := rows(reader, option)
if err != nil {
return err
}
_, err = fw.Write(rows)
return err
}
func rows(reader *ObjectReader, option *textExtractOption) ([]byte, error) {
switch option.extractOption.strategy {
case runtimev1.Source_ExtractPolicy_STRATEGY_HEAD:
return rowsHead(reader, option.extractOption)
case runtimev1.Source_ExtractPolicy_STRATEGY_TAIL:
return rowsTail(reader, option)
default:
panic(fmt.Sprintf("unsupported strategy %s", option.extractOption.strategy))
}
}
func rowsTail(reader *ObjectReader, option *textExtractOption) ([]byte, error) {
header := make([]byte, 0)
if option.hasCSVHeader {
// csv has header, need to read header first
headerRow, err := getHeader(reader)
if err != nil {
return nil, err
}
headerRow = append(headerRow, _newLineSeparator...)
header = headerRow
}
bytesToRead := option.extractOption.limitInBytes - uint64(len(header))
if _, err := reader.Seek(0-int64(bytesToRead), io.SeekEnd); err != nil {
return nil, err
}
p := make([]byte, bytesToRead)
_, err := reader.Read(p)
if err := unsucessfullError(err); err != nil {
return nil, err
}
lastLineIndex := bytes.Index(p, _newLineSeparator)
// remove data before \n since its possibly incomplete
// append header at start
return append(header, p[lastLineIndex+1:]...), nil
}
func rowsHead(reader *ObjectReader, option *extractOption) ([]byte, error) {
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return nil, err
}
p := make([]byte, option.limitInBytes)
_, err := reader.Read(p)
if err := unsucessfullError(err); err != nil {
return nil, err
}
lastLineIndex := bytes.LastIndex(p, _newLineSeparator)
if lastLineIndex == -1 {
// data can still be complete in case there is a single row without any newline delimitter
// let ingestion system decide
return p, nil
}
// remove data after \n since its incomplete
return p[:lastLineIndex+1], nil
}
// tries to get csv header from reader by incrmentally reading 1KB bytes
func getHeader(r *ObjectReader) ([]byte, error) {
fetchLength := 1024
var p []byte
for {
temp := make([]byte, fetchLength)
n, err := r.Read(temp)
if err := unsucessfullError(err); err != nil {
return nil, err
}
p = append(p, temp...)
rows := bytes.Split(p, _newLineSeparator)
if len(rows) > 1 {
// complete header found
return rows[0], nil
}
if n < fetchLength {
// end of csv
return nil, io.EOF
}
}
}
// unsucessfullError silents the io.EOF and io.ErrUnexpectedEOF
// the reader.Read can succeed as well as return the two errors in case more data is requested than what is present
func unsucessfullError(err error) error {
if err == nil {
return nil
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return nil
}
return err
}