forked from xitongsys/parquet-go
/
parquet-tools.go
145 lines (126 loc) · 3.9 KB
/
parquet-tools.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/url"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/zyreio/parquet-go-source/local"
"github.com/zyreio/parquet-go-source/s3"
"github.com/zyreio/parquet-go/reader"
"github.com/zyreio/parquet-go/source"
"github.com/zyreio/parquet-go/tool/parquet-tools/schematool"
"github.com/zyreio/parquet-go/tool/parquet-tools/sizetool"
)
func main() {
cmd := flag.String("cmd", "schema", "command to run. Allowed values: schema, rowcount, size, cat")
fileName := flag.String("file", "", "file name")
withTags := flag.Bool("tag", false, "show struct tags")
withPrettySize := flag.Bool("pretty", false, "show pretty size")
uncompressedSize := flag.Bool("uncompressed", false, "show uncompressed size")
catCount := flag.Int("count", 1000, "max count to cat. If it is nil, only show first 1000 records.")
skipCount := flag.Int64("skip", 0, "skip count with cat. If it is nil,skip 0 records.")
schemaFormat := flag.String("schema-format", "json", "schema format go/json (default to JSON schema)")
flag.Parse()
// validate schema output format
if *schemaFormat != "json" && *schemaFormat != "go" {
fmt.Fprintf(os.Stderr, "schema format can only be json or go\n")
os.Exit(1)
}
// validate file name
if *fileName == "" {
fmt.Fprintf(os.Stderr, "missing location of parquet file\n")
os.Exit(1)
}
// validate file scheme (s3 or file)
uri, err := url.Parse(*fileName)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to parse file location [%s]\n", *fileName)
os.Exit(1)
}
if uri.Scheme == "" {
uri.Scheme = "file"
}
var fr source.ParquetFile
switch uri.Scheme {
case "s3":
// determine S3 bucket's region
ctx := context.Background()
sess := session.Must(session.NewSession())
region, err := s3manager.GetBucketRegion(ctx, sess, uri.Host, "us-east-1")
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "NotFound" {
fmt.Fprintf(os.Stderr, "unable to find bucket %s's region not found", uri.Host)
} else {
fmt.Fprintf(os.Stderr, "AWS error: %s", err.Error())
}
os.Exit(1)
}
fr, err = s3.NewS3FileReader(ctx, uri.Host, strings.TrimLeft(uri.Path, "/"), &aws.Config{Region: aws.String(region)})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open S3 object [%s]: %s\n", *fileName, err.Error())
os.Exit(1)
}
case "file":
fr, err = local.NewLocalFileReader(uri.Path)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open local file [%s]: %s\n", uri.Path, err.Error())
os.Exit(1)
}
default:
fmt.Fprintf(os.Stderr, "unknown location scheme [%s]\n", uri.Scheme)
os.Exit(1)
}
pr, err := reader.NewParquetReader(fr, nil, 1)
if err != nil {
fmt.Fprintf(os.Stderr, "Can't create parquet reader: %s\n", err)
os.Exit(1)
}
switch *cmd {
case "schema":
tree := schematool.CreateSchemaTree(pr.SchemaHandler.SchemaElements)
if *schemaFormat == "go" {
fmt.Printf("%s\n", tree.OutputStruct(*withTags))
} else {
fmt.Printf("%s\n", tree.OutputJsonSchema())
}
case "rowcount":
fmt.Println(pr.GetNumRows())
case "size":
fmt.Println(sizetool.GetParquetFileSize(*fileName, pr, *withPrettySize, *uncompressedSize))
case "cat":
totCnt := 0
for totCnt < *catCount {
cnt := *catCount - totCnt
if cnt > 1000 {
cnt = 1000
}
err = pr.SkipRows(*skipCount)
if err != nil {
fmt.Fprintf(os.Stderr, "Can't skip[: %s\n", err)
os.Exit(1)
}
res, err := pr.ReadByNumber(cnt)
if err != nil {
fmt.Fprintf(os.Stderr, "Can't cat: %s\n", err)
os.Exit(1)
}
jsonBs, err := json.Marshal(res)
if err != nil {
fmt.Fprintf(os.Stderr, "Can't to json: %s\n", err)
os.Exit(1)
}
fmt.Println(string(jsonBs))
totCnt += cnt
}
default:
fmt.Fprintf(os.Stderr, "Unknown command %s\n", *cmd)
os.Exit(1)
}
}