This repository has been archived by the owner on Nov 16, 2023. It is now read-only.
/
writer_go18.go
242 lines (211 loc) · 7.94 KB
/
writer_go18.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//go:build go1.18
package parquet
import (
"io"
"reflect"
)
// GenericWriter is similar to a Writer but uses a type parameter to define the
// Go type representing the schema of rows being written.
//
// Using this type over Writer has multiple advantages:
//
// - By leveraging type information, the Go compiler can provide greater
// guarantees that the code is correct. For example, the parquet.Writer.Write
// method accepts an argument of type interface{}, which delays type checking
// until runtime. The parquet.GenericWriter[T].Write method ensures at
// compile time that the values it receives will be of type T, reducing the
// risk of introducing errors.
//
// - Since type information is known at compile time, the implementation of
// parquet.GenericWriter[T] can make safe assumptions, removing the need for
// runtime validation of how the parameters are passed to its methods.
// Optimizations relying on type information are more effective, some of the
// writer's state can be precomputed at initialization, which was not possible
// with parquet.Writer.
//
// - The parquet.GenericWriter[T].Write method uses a data-oriented design,
// accepting an slice of T instead of a single value, creating more
// opportunities to amortize the runtime cost of abstractions.
// This optimization is not available for parquet.Writer because its Write
// method's argument would be of type []interface{}, which would require
// conversions back and forth from concrete types to empty interfaces (since
// a []T cannot be interpreted as []interface{} in Go), would make the API
// more difficult to use and waste compute resources in the type conversions,
// defeating the purpose of the optimization in the first place.
//
// Note that this type is only available when compiling with Go 1.18 or later.
type GenericWriter[T any] struct {
// At this time GenericWriter is expressed in terms of Writer to reuse the
// underlying logic. In the future, and if we accepted to break backward
// compatibility on the Write method, we could modify Writer to be an alias
// to GenericWriter with:
//
// type Writer = GenericWriter[any]
//
base Writer
// This function writes rows of type T to the writer, it gets generated by
// the NewGenericWriter function based on the type T and the underlying
// schema of the parquet file.
write writeFunc[T]
// This field is used to leverage the optimized writeRowsFunc algorithms.
columns []ColumnBuffer
}
// NewGenericWriter is like NewWriter but returns a GenericWriter[T] suited to
// write rows of Go type T.
//
// The type parameter T should be a map, struct, or any. Any other types will
// cause a panic at runtime. Type checking is a lot more effective when the
// generic parameter is a struct type, using map and interface types is somewhat
// similar to using a Writer.
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
//
// Sorting columns may be set on the writer to configure the generated row
// groups metadata. However, rows are always written in the order they were
// seen, no reordering is performed, the writer expects the application to
// ensure proper correlation between the order of rows and the list of sorting
// columns. See SortingWriter[T] for a writer which handles reordering rows
// based on the configured sorting columns.
func NewGenericWriter[T any](output io.Writer, options ...WriterOption) *GenericWriter[T] {
config, err := NewWriterConfig(options...)
if err != nil {
panic(err)
}
schema := config.Schema
t := typeOf[T]()
if schema == nil && t != nil {
schema = schemaOf(dereference(t))
config.Schema = schema
}
if config.Schema == nil {
panic("generic writer must be instantiated with schema or concrete type.")
}
return &GenericWriter[T]{
base: Writer{
output: output,
config: config,
schema: schema,
writer: newWriter(output, config),
},
write: writeFuncOf[T](t, config.Schema),
}
}
type writeFunc[T any] func(*GenericWriter[T], []T) (int, error)
func writeFuncOf[T any](t reflect.Type, schema *Schema) writeFunc[T] {
if t == nil {
return (*GenericWriter[T]).writeAny
}
switch t.Kind() {
case reflect.Interface, reflect.Map:
return (*GenericWriter[T]).writeRows
case reflect.Struct:
return makeWriteFunc[T](t, schema)
case reflect.Pointer:
if e := t.Elem(); e.Kind() == reflect.Struct {
return makeWriteFunc[T](t, schema)
}
}
panic("cannot create writer for values of type " + t.String())
}
func makeWriteFunc[T any](t reflect.Type, schema *Schema) writeFunc[T] {
writeRows := writeRowsFuncOf(t, schema, nil)
return func(w *GenericWriter[T], rows []T) (n int, err error) {
if w.columns == nil {
w.columns = make([]ColumnBuffer, len(w.base.writer.columns))
for i, c := range w.base.writer.columns {
// These fields are usually lazily initialized when writing rows,
// we need them to exist now tho.
c.columnBuffer = c.newColumnBuffer()
w.columns[i] = c.columnBuffer
}
}
err = writeRows(w.columns, makeArrayOf(rows), columnLevels{})
if err == nil {
n = len(rows)
}
return n, err
}
}
func (w *GenericWriter[T]) Close() error {
return w.base.Close()
}
func (w *GenericWriter[T]) Flush() error {
return w.base.Flush()
}
func (w *GenericWriter[T]) Reset(output io.Writer) {
w.base.Reset(output)
}
func (w *GenericWriter[T]) Write(rows []T) (int, error) {
return w.base.writer.writeRows(len(rows), func(i, j int) (int, error) {
n, err := w.write(w, rows[i:j:j])
if err != nil {
return n, err
}
for _, c := range w.base.writer.columns {
if c.columnBuffer.Size() >= int64(c.bufferSize) {
if err := c.flush(); err != nil {
return n, err
}
}
}
return n, nil
})
}
func (w *GenericWriter[T]) WriteRows(rows []Row) (int, error) {
return w.base.WriteRows(rows)
}
func (w *GenericWriter[T]) WriteRowGroup(rowGroup RowGroup) (int64, error) {
return w.base.WriteRowGroup(rowGroup)
}
// SetKeyValueMetadata sets a key/value pair in the Parquet file metadata.
//
// Keys are assumed to be unique, if the same key is repeated multiple times the
// last value is retained. While the parquet format does not require unique keys,
// this design decision was made to optimize for the most common use case where
// applications leverage this extension mechanism to associate single values to
// keys. This may create incompatibilities with other parquet libraries, or may
// cause some key/value pairs to be lost when open parquet files written with
// repeated keys. We can revisit this decision if it ever becomes a blocker.
func (w *GenericWriter[T]) SetKeyValueMetadata(key, value string) {
w.base.SetKeyValueMetadata(key, value)
}
func (w *GenericWriter[T]) ReadRowsFrom(rows RowReader) (int64, error) {
return w.base.ReadRowsFrom(rows)
}
func (w *GenericWriter[T]) Schema() *Schema {
return w.base.Schema()
}
func (w *GenericWriter[T]) writeRows(rows []T) (int, error) {
if cap(w.base.rowbuf) < len(rows) {
w.base.rowbuf = make([]Row, len(rows))
} else {
w.base.rowbuf = w.base.rowbuf[:len(rows)]
}
defer clearRows(w.base.rowbuf)
schema := w.base.Schema()
for i := range rows {
w.base.rowbuf[i] = schema.Deconstruct(w.base.rowbuf[i], &rows[i])
}
return w.base.WriteRows(w.base.rowbuf)
}
func (w *GenericWriter[T]) writeAny(rows []T) (n int, err error) {
for i := range rows {
if err = w.base.Write(rows[i]); err != nil {
return n, err
}
n++
}
return n, nil
}
var (
_ RowWriterWithSchema = (*GenericWriter[any])(nil)
_ RowReaderFrom = (*GenericWriter[any])(nil)
_ RowGroupWriter = (*GenericWriter[any])(nil)
_ RowWriterWithSchema = (*GenericWriter[struct{}])(nil)
_ RowReaderFrom = (*GenericWriter[struct{}])(nil)
_ RowGroupWriter = (*GenericWriter[struct{}])(nil)
_ RowWriterWithSchema = (*GenericWriter[map[struct{}]struct{}])(nil)
_ RowReaderFrom = (*GenericWriter[map[struct{}]struct{}])(nil)
_ RowGroupWriter = (*GenericWriter[map[struct{}]struct{}])(nil)
)