forked from aliyun/aliyun-odps-go-sdk
/
record_batch_writer.go
58 lines (48 loc) · 1.3 KB
/
record_batch_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package ipc
import (
"github.com/jiuzhiqian/aliyun-odps-go-sdk/arrow"
"github.com/jiuzhiqian/aliyun-odps-go-sdk/arrow/array"
"github.com/jiuzhiqian/aliyun-odps-go-sdk/arrow/internal/flatbuf"
"github.com/jiuzhiqian/aliyun-odps-go-sdk/arrow/memory"
"golang.org/x/xerrors"
"io"
)
type RecordBatchWriter struct {
w io.Writer
mem memory.Allocator
pw PayloadWriter
schema *arrow.Schema
codec flatbuf.CompressionType
compressNP int
}
func NewRecordBatchWriter(w io.Writer, opts ...Option) *RecordBatchWriter {
cfg := newConfig(opts...)
return &RecordBatchWriter{
w: w,
mem: cfg.alloc,
pw: &swriter{w: w},
schema: cfg.schema,
codec: cfg.codec,
compressNP: cfg.compressNP,
}
}
func (w *RecordBatchWriter) Write(rec array.Record) error {
schema := rec.Schema()
if schema == nil || !schema.Equal(w.schema) {
return errInconsistentSchema
}
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec, w.compressNP)
)
defer data.Release()
if err := enc.Encode(&data, rec); err != nil {
return xerrors.Errorf("arrow/ipc: could not encode record to payload: %w", err)
}
return w.pw.WritePayload(data)
}
func (w *RecordBatchWriter) Close() error {
w.pw = nil
return nil
}