forked from xitongsys/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv.go
71 lines (64 loc) · 1.94 KB
/
csv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package writer
import (
"fmt"
"io"
"github.com/zyreio/parquet-go-source/writerfile"
"github.com/zyreio/parquet-go/layout"
"github.com/zyreio/parquet-go/marshal"
"github.com/zyreio/parquet-go/parquet"
"github.com/zyreio/parquet-go/schema"
"github.com/zyreio/parquet-go/source"
"github.com/zyreio/parquet-go/types"
)
type CSVWriter struct {
ParquetWriter
}
func NewCSVWriterFromWriter(md []string, w io.Writer, np int64) (*CSVWriter, error) {
wf := writerfile.NewWriterFile(w)
return NewCSVWriter(md, wf, np)
}
// Create CSV writer
func NewCSVWriter(md []string, pfile source.ParquetFile, np int64) (*CSVWriter, error) {
var err error
res := new(CSVWriter)
res.SchemaHandler, err = schema.NewSchemaHandlerFromMetadata(md)
if err != nil {
return nil, fmt.Errorf("failed to create schema from metadata: %s", err.Error())
}
res.PFile = pfile
res.PageSize = 8 * 1024 //8K
res.RowGroupSize = 128 * 1024 * 1024 //128M
res.CompressionType = parquet.CompressionCodec_SNAPPY
res.PagesMapBuf = make(map[string][]*layout.Page)
res.DictRecs = make(map[string]*layout.DictRecType)
res.NP = np
res.Footer = parquet.NewFileMetaData()
res.Footer.Version = 1
res.Footer.Schema = append(res.Footer.Schema, res.SchemaHandler.SchemaElements...)
res.Offset = 4
_, err = res.PFile.Write([]byte("PAR1"))
res.MarshalFunc = marshal.MarshalCSV
return res, err
}
// Write string values to parquet file
func (w *CSVWriter) WriteString(recsi interface{}) error {
var err error
recs := recsi.([]*string)
lr := len(recs)
rec := make([]interface{}, lr)
for i := 0; i < lr; i++ {
rec[i] = nil
if recs[i] != nil {
rec[i], err = types.StrToParquetType(*recs[i],
w.SchemaHandler.SchemaElements[i+1].Type,
w.SchemaHandler.SchemaElements[i+1].ConvertedType,
int(w.SchemaHandler.SchemaElements[i+1].GetTypeLength()),
int(w.SchemaHandler.SchemaElements[i+1].GetScale()),
)
if err != nil {
return err
}
}
}
return w.Write(rec)
}