-
Notifications
You must be signed in to change notification settings - Fork 65
/
row.go
120 lines (105 loc) 路 2.65 KB
/
row.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package cmd
import (
"fmt"
"io"
"strconv"
"github.com/charmbracelet/lipgloss"
"github.com/charmbracelet/lipgloss/table"
"github.com/parquet-go/parquet-go"
"github.com/spf13/cobra"
)
var rowCmd = &cobra.Command{
Use: "row",
Example: "parquet-tool row <row_start> <num_rows> <parqeut-file>",
Short: "print out row(s) for a given file",
Args: cobra.ExactArgs(3),
RunE: func(cmd *cobra.Command, args []string) error {
rowStart, err := strconv.Atoi(args[0])
if err != nil {
return err
}
numRows, err := strconv.Atoi(args[1])
if err != nil {
return err
}
return row(args[2], rowStart, numRows)
},
}
func row(file string, rowStart, numRows int) error {
pf, closer, err := openParquetFile(file)
if err != nil {
return fmt.Errorf("failed to open file: ", err)
}
defer closer.Close()
// Find the row group that contains the row we want
var rowgroup parquet.RowGroup
rowsSeen := 0
rgoffset := 0
for _, rg := range pf.RowGroups() {
if rowsSeen+int(rg.NumRows()) >= rowStart { // This row group contains the row we want
rowgroup = rg
rgoffset = rowStart - rowsSeen
break
}
rowsSeen += int(rg.NumRows())
}
headers := []string{"column"}
for i := rowStart; i < rowStart+numRows; i++ {
headers = append(headers, strconv.Itoa(i))
}
fields := rowgroup.Schema().Fields()
tbl := table.New().
Border(lipgloss.NormalBorder()).
BorderStyle(lipgloss.NewStyle().Foreground(lipgloss.Color("99"))).
StyleFunc(func(row, col int) lipgloss.Style {
switch {
case row == 0:
return HeaderStyle
case row%2 == 0:
return EvenRowStyle
default:
return OddRowStyle
}
}).
Headers(headers...)
defer fmt.Println(tbl)
for i, chunk := range rowgroup.ColumnChunks() {
if err := printPageSubset(tbl, fields[i].Name(), chunk.Pages(), rgoffset, numRows); err != nil {
return err
}
}
return nil
}
func printPageSubset(tbl *table.Table, name string, page parquet.Pages, start, num int) error {
defer page.Close()
for pg, err := page.ReadPage(); err != io.EOF; pg, err = page.ReadPage() {
if err != nil {
return err
}
if int(pg.NumRows()) < start {
start -= int(pg.NumRows())
continue
}
vr := pg.Values()
values := make([]parquet.Value, pg.NumValues())
_, err = vr.ReadValues(values)
if err != nil && err != io.EOF {
return err
}
end := start + num
remainder := 0
if end > int(pg.NumRows()) {
end = int(pg.NumRows()) - start
remainder = num - end
}
strs := make([]string, 0, len(values))
for _, v := range values[start:end] {
strs = append(strs, fmt.Sprintf("%v", v))
}
tbl.Row(append([]string{name}, strs...)...)
if remainder <= 0 {
break
}
}
return nil
}