-
Notifications
You must be signed in to change notification settings - Fork 348
/
parquet_reader.go
132 lines (121 loc) · 3.83 KB
/
parquet_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package s3inventory
import (
"fmt"
"time"
"github.com/cznic/mathutil"
"github.com/spf13/cast"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/schema"
)
type ParquetInventoryFileReader struct {
*reader.ParquetReader
nextRow int64
fieldToParquetPath map[string]string
}
func NewParquetInventoryFileReader(parquetReader *reader.ParquetReader) (*ParquetInventoryFileReader, error) {
fieldToParquetPath := getParquetPaths(parquetReader.SchemaHandler)
for _, required := range requiredFields {
if _, ok := fieldToParquetPath[required]; !ok {
return nil, fmt.Errorf("%w: %s", ErrRequiredFieldNotFound, required)
}
}
return &ParquetInventoryFileReader{
ParquetReader: parquetReader,
fieldToParquetPath: fieldToParquetPath,
}, nil
}
func (p *ParquetInventoryFileReader) Close() error {
p.ReadStop()
return p.PFile.Close()
}
func (p *ParquetInventoryFileReader) getKeyColumnStatistics(rowGroupIdx int) *parquet.Statistics {
columns := p.Footer.RowGroups[rowGroupIdx].Columns
for _, c := range columns {
metaData := c.GetMetaData()
if metaData.GetPathInSchema()[len(metaData.GetPathInSchema())-1] == "Key" {
return metaData.GetStatistics()
}
}
return columns[1].GetMetaData().GetStatistics()
}
func (p *ParquetInventoryFileReader) FirstObjectKey() string {
statistics := p.getKeyColumnStatistics(0)
if len(statistics.GetMin()) > 0 {
return string(statistics.GetMin())
}
return string(statistics.GetMinValue())
}
func (p *ParquetInventoryFileReader) LastObjectKey() string {
statistics := p.getKeyColumnStatistics(len(p.Footer.RowGroups) - 1)
if len(statistics.GetMax()) > 0 {
return string(statistics.GetMax())
}
return string(statistics.GetMinValue())
}
func (p *ParquetInventoryFileReader) Read(n int) ([]*InventoryObject, error) {
num := mathutil.MinInt64(int64(n), p.GetNumRows()-p.nextRow)
p.nextRow += num
res := make([]*InventoryObject, num)
for fieldName, path := range p.fieldToParquetPath {
columnRes, _, dls, err := p.ReadColumnByPath(path, num)
if err != nil {
return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err)
}
for i, v := range columnRes {
if !isRequired(fieldName) && dls[i] == 0 {
// got no value for non-required field, move on
continue
}
if res[i] == nil {
res[i] = NewInventoryObject()
}
err := set(res[i], fieldName, v)
if err != nil {
return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err)
}
}
}
return res, nil
}
func set(o *InventoryObject, f string, v interface{}) error {
var err error
switch f {
case bucketFieldName:
o.Bucket, err = cast.ToStringE(v)
case keyFieldName:
o.Key, err = cast.ToStringE(v)
case isLatestFieldName:
o.IsLatest, err = cast.ToBoolE(v)
case isDeleteMarkerFieldName:
o.IsDeleteMarker, err = cast.ToBoolE(v)
case sizeFieldName:
o.Size, err = cast.ToInt64E(v)
case lastModifiedDateFieldName:
var lastModifiedMillis int64
lastModifiedMillis, err = cast.ToInt64E(v)
secToMS := int64(time.Second / time.Millisecond)
seconds := lastModifiedMillis / secToMS
ns := (lastModifiedMillis % secToMS) * int64(time.Millisecond/time.Nanosecond)
tm := time.Unix(seconds, ns)
o.LastModified = &tm
case eTagFieldName:
o.Checksum, err = cast.ToStringE(v)
default:
return fmt.Errorf("%w: %s", ErrUnknownField, f)
}
return err
}
// getParquetPaths returns parquet schema fields as a mapping from their base column name to their path in ParquetReader
// only known inventory fields are returned
func getParquetPaths(schemaHandler *schema.SchemaHandler) map[string]string {
res := make(map[string]string)
for i, fieldInfo := range schemaHandler.Infos {
for _, field := range inventoryFields {
if fieldInfo.ExName == field {
res[field] = schemaHandler.IndexMap[int32(i)]
}
}
}
return res
}