From 1e380548812d39a57df89bfd3ae80186d8d8a138 Mon Sep 17 00:00:00 2001 From: Thomas Pelletier Date: Tue, 27 Oct 2020 10:59:11 -0400 Subject: [PATCH] StructBuilder (#13) --- Makefile | 2 +- README.md | 15 +- cmd/ptools/cat.go | 2 - cmd/ptools/main.go | 2 +- decoding.go | 32 +- go.mod | 1 - go.sum | 2 - internal/benchmark/schema_builder_test.go | 64 ++++ internal/benchmark/trace.go | 49 +++ internal/thrift/reader.go | 2 +- iterators.go | 137 ++++---- iterators_test.go | 40 +-- parquet.go | 6 - plan.go | 25 ++ reader.go | 218 ++++++------ reflect.go | 111 ------- reflect_test.go | 33 -- schema.go | 156 ++++----- schema_test.go | 36 +- struct_builder.go | 111 +++---- struct_builder_test.go | 300 +++++++++++++++-- struct_planner.go | 387 ++++++++++++++++++++++ 22 files changed, 1158 insertions(+), 573 deletions(-) create mode 100644 internal/benchmark/schema_builder_test.go create mode 100644 internal/benchmark/trace.go create mode 100644 plan.go delete mode 100644 reflect.go delete mode 100644 reflect_test.go create mode 100644 struct_planner.go diff --git a/Makefile b/Makefile index cf50df8e..d3632e9e 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ internal/gen-go/parquet/parquet.go: Dockerfile.thrift buildkite: docker build . -f Dockerfile.buildkite -t parquet-buildkite - docker run -ti parquet-buildkite go test ./... + docker run -ti parquet-buildkite go test -race ./... diff --git a/README.md b/README.md index f089dd63..8306ab11 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ To test against the parquet-tools reference implementation, make sure to have `parquet-tools` in your `PATH`. [Build it][build] or [download it pre-built][dl]. -⚠️Buildkite does not test against parquet-tools because our base docker images +⚠️ Buildkite does not test against parquet-tools because our base docker images don't have both Go and Java available. [build]: https://github.com/apache/parquet-mr/tree/master/parquet-tools @@ -16,16 +16,3 @@ don't have both Go and Java available. * https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html * https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36632.pdf - -## Todo - -- [x] `parquet-tools cat` reimplementation. -- [x] Use builder to remove allocs. -- [ ] RowReader projection. -- [ ] Predicates pushdown. -- [ ] Re-introduce Column interface. -- [ ] ReadSeeker => ReaderAt + size -- [ ] S3 partial reader. -- [ ] Benchmark with parquet-go. -- [ ] Writer. -- [ ] Out-of-band writer. diff --git a/cmd/ptools/cat.go b/cmd/ptools/cat.go index 4a1b260d..872a167c 100644 --- a/cmd/ptools/cat.go +++ b/cmd/ptools/cat.go @@ -152,12 +152,10 @@ func (p *prettyPrinter) prefix() int { } func (p *prettyPrinter) Begin() { - debug.Format("BEGIN") p.stackClear() } func (p *prettyPrinter) Primitive(s *parquet.Schema, d parquet.Decoder) error { - debug.Format("PRIMITIVE: %s", s.Path) valueDepth := p.depth() for _, c := range s.Path[p.prefix() : len(s.Path)-1] { p.writeDepth(valueDepth) diff --git a/cmd/ptools/main.go b/cmd/ptools/main.go index 70a87068..59702d68 100644 --- a/cmd/ptools/main.go +++ b/cmd/ptools/main.go @@ -6,8 +6,8 @@ import ( "strings" color "github.com/logrusorgru/aurora/v3" - "github.com/segmentio/parquet/internal/debug" "github.com/segmentio/cli" + "github.com/segmentio/parquet/internal/debug" ) func main() { diff --git a/decoding.go b/decoding.go index 0215c748..55348772 100644 --- a/decoding.go +++ b/decoding.go @@ -19,10 +19,13 @@ func readUvarint(r io.Reader) (uint64, uint32, error) { var read uint32 buff := []byte{0} for i := 0; ; i++ { - _, err := r.Read(buff) + n, err := r.Read(buff) if err != nil { return x, read, err } + if n != len(buff) { + panic("was supposed to read 1 byte") + } b := buff[0] read++ if b < 0x80 { @@ -44,14 +47,6 @@ type Decoder interface { ByteArray(dst []byte) ([]byte, error) } -type EmptyDecoder struct{} - -func (e *EmptyDecoder) prepare(r io.Reader) {} -func (e *EmptyDecoder) Int32() (int32, error) { return 0, nil } -func (e *EmptyDecoder) Int64() (int64, error) { return 0, nil } -func (e *EmptyDecoder) Uint32(bitWidth int, out []uint32) error { return nil } -func (e *EmptyDecoder) ByteArray() ([]byte, error) { return nil, nil } - // Construct a Decoder for a given encoding. // Technically, not all encoding work everywhere. Let's say good-enough for now. func decoderFor(enc pthrift.Encoding) (Decoder, error) { @@ -84,10 +79,13 @@ func (d *rleDecoder) prepare(r io.Reader) { // All those values fit in uint32. func (d *rleDecoder) Uint32(bitWidth int, out []uint32) error { b := d.buff[:] - _, err := d.r.Read(b) + n, err := d.r.Read(b) if err != nil { return err } + if n != len(b) { + panic("not enough bytes") + } size := binary.LittleEndian.Uint32(b) read := uint32(0) @@ -115,16 +113,10 @@ func (d *rleDecoder) Uint32(bitWidth int, out []uint32) error { bytesInBuffer = 4 } - if len(d.scratch) < bytesInBuffer { - v := bytesInBuffer - v-- - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - v++ - d.scratch = make([]byte, v) + if cap(d.scratch) < bytesInBuffer { + d.scratch = make([]byte, bytesInBuffer) + } else { + d.scratch = d.scratch[:bytesInBuffer] } data := d.scratch[:bytesInRLE] diff --git a/go.mod b/go.mod index 5087f138..9a4c3061 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.15 require ( github.com/apache/thrift v0.13.0 github.com/golang/snappy v0.0.2 - github.com/iancoleman/strcase v0.1.2 github.com/logrusorgru/aurora/v3 v3.0.0 github.com/pmezard/go-difflib v1.0.0 github.com/segmentio/cli v0.3.4 diff --git a/go.sum b/go.sum index 2bcb1be0..1bcd3987 100644 --- a/go.sum +++ b/go.sum @@ -75,8 +75,6 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/hashicorp/go-uuid v0.0.0-20180228145832-27454136f036/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/iancoleman/strcase v0.1.2 h1:gnomlvw9tnV3ITTAxzKSgTF+8kFWcU/f+TgttpXGz1U= -github.com/iancoleman/strcase v0.1.2/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= diff --git a/internal/benchmark/schema_builder_test.go b/internal/benchmark/schema_builder_test.go new file mode 100644 index 00000000..a7972ef8 --- /dev/null +++ b/internal/benchmark/schema_builder_test.go @@ -0,0 +1,64 @@ +package benchmark_test + +import ( + "log" + "os" + "testing" + + "github.com/segmentio/parquet" + "github.com/segmentio/parquet/internal/benchmark" + "github.com/stretchr/testify/require" + "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/reader" +) + +func BenchmarkReflectReadRow(b *testing.B) { + p := "../../examples/stage-small.parquet" + planner := parquet.StructPlannerOf(new(benchmark.Trace)) + builder := planner.Builder() + plan := planner.Plan() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + f, err := os.Open(p) + require.NoError(b, err) + + pf, err := parquet.OpenFile(f) + require.NoError(b, err) + rowReader := parquet.NewRowReaderWithPlan(plan, pf) + + for { + target := &benchmark.Trace{} + err := rowReader.Read(builder.To(target)) + if err == parquet.EOF { + break + } + require.NoError(b, err) + } + require.NoError(b, f.Close()) + } +} + +func BenchmarkReflectReadRowParquetGo(b *testing.B) { + p := "../../examples/stage-small.parquet" + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + fr, err := local.NewLocalFileReader(p) + if err != nil { + log.Println("Can't open file", err) + return + } + pr, err := reader.NewParquetReader(fr, new(benchmark.Trace), 1) + require.NoError(b, err) + num := int(pr.GetNumRows()) + target := make([]benchmark.Trace, 1, 1) + for i := 0; i < num; i++ { + err := pr.Read(&target) + require.NoError(b, err) + } + pr.ReadStop() + require.NoError(b, err, fr.Close()) + } +} diff --git a/internal/benchmark/trace.go b/internal/benchmark/trace.go new file mode 100644 index 00000000..77c6567b --- /dev/null +++ b/internal/benchmark/trace.go @@ -0,0 +1,49 @@ +package benchmark + +type Tag struct { + Name string `parquet:"name=name, type=UTF8"` + Value string `parquet:"name=value, type=UTF8"` +} + +type Trace struct { + kafkaTopic string //not exporting it to parquet + KafkaPartition int64 `parquet:"name=kafka_partition, type=INT_64"` + KafkaOffset int64 `parquet:"name=kafka_offset, type=INT_64"` + TimestampMs int64 `parquet:"name=timestamp_ms, type=INT_64"` + TraceID string `parquet:"name=trace_id, type=UTF8"` + SpanID string `parquet:"name=span_id, type=UTF8"` + ParentSpanID string `parquet:"name=parent_span_id, type=UTF8"` + NextTraceID string `parquet:"name=next_trace_id, type=UTF8"` + ParentTraceIDs []string `parquet:"name=parent_trace_ids, type=LIST, valuetype=UTF8"` + Baggage []Tag `parquet:"name=baggage, type=LIST"` + Tags []Tag `parquet:"name=tags, type=LIST"` + + // Extracted from the exchange payload. Will be nil if they are not + // present in the payload or if the payload cannot be unmarshalled. + MessageID string `parquet:"name=message_id, type=UTF8"` + UserID string `parquet:"name=user_id, type=UTF8"` + EventType string `parquet:"name=event_type, type=UTF8"` + SourceID string `parquet:"name=source_id, type=UTF8"` + DestinationID string `parquet:"name=destination_id, type=UTF8"` + WorkspaceID string `parquet:"name=workspace_id, type=UTF8"` + + Name string `parquet:"name=name, type=UTF8"` + SpanTime int64 `parquet:"name=span_time, type=INT_64"` // ms + SpanDuration int64 `parquet:"name=span_duration, type=INT_64"` // ms + + ExchangeTime int64 `parquet:"name=exchange_time, type=INT_64"` // ms + ExchangeDuration int32 `parquet:"name=exchange_duration, type=INT_32"` // ms + + ExchangeRequestMethod string `parquet:"name=exchange_request_method, type=UTF8"` + ExchangeRequestURL string `parquet:"name=exchange_request_url, type=UTF8"` + ExchangeRequestHeaders map[string]string `parquet:"name=exchange_request_headers, type=MAP, keytype=UTF8, valuetype=UTF8"` + ExchangeRequestBody string `parquet:"name=exchange_request_body, type=UTF8"` + + ExchangeResponseStatusCode uint16 `parquet:"name=exchange_response_status_code, type=UINT_16"` + ExchangeResponseStatusText string `parquet:"name=exchange_response_status_text, type=UTF8"` + ExchangeResponseHeaders map[string]string `parquet:"name=exchange_response_headers, type=UTF8, keytype=UTF8, valuetype=UTF8"` + ExchangeResponseBody string `parquet:"name=exchange_response_body, type=UTF8"` + + ExchangeErrorType string `parquet:"name=exchange_error_type, type=UTF8"` + ExchangeErrorMessage string `parquet:"name=exchange_error_message, type=UTF8"` +} diff --git a/internal/thrift/reader.go b/internal/thrift/reader.go index 946fe993..7b6fd196 100644 --- a/internal/thrift/reader.go +++ b/internal/thrift/reader.go @@ -68,7 +68,7 @@ func (tr *Reader) Fork() *Reader { // Read implements the io.Reader interface, forwarding the call to the // underlying Thrift protocol Reader. func (tr *Reader) Read(p []byte) (int, error) { - debug.Format("thrift: Reader: Read %d", len(p)) + debug.Format("thrift: Reader: read %d", len(p)) return tr.protocol.Transport().Read(p) } diff --git a/iterators.go b/iterators.go index 99c4f789..dc548b9f 100644 --- a/iterators.go +++ b/iterators.go @@ -20,35 +20,35 @@ import ( // callback PageCallbackFn //TODO leaky // // err error -// rowGroupIterator *RowGroupIterator -// rowGroupColumnIterator *RowGroupColumnReader +// rowGroupIterator *rowGroupIterator +// rowGroupColumnIterator *rowGroupColumnReader //} // -//func (c *ColumnIterator) Next() bool { +//func (c *ColumnIterator) next() bool { // if c.err != nil { // return false // } // if c.rowGroupIterator == nil { -// c.rowGroupIterator = &RowGroupIterator{ +// c.rowGroupIterator = &rowGroupIterator{ // r: c.r, // } -// c.rowGroupIterator.Next() +// c.rowGroupIterator.next() // } -// if c.rowGroupIterator.Value() == nil { +// if c.rowGroupIterator.value() == nil { // return false // } // if c.rowGroupColumnIterator == nil { -// c.rowGroupColumnIterator = c.rowGroupIterator.Value().Column(c.path, c.callback) +// c.rowGroupColumnIterator = c.rowGroupIterator.value().Column(c.path, c.callback) // if c.rowGroupColumnIterator == nil { // c.err = fmt.Errorf("column not found in row group: %s", c.path) // return false // } // } -// if !c.rowGroupColumnIterator.Next() { +// if !c.rowGroupColumnIterator.next() { // c.rowGroupColumnIterator = nil -// return c.rowGroupIterator.Next() +// return c.rowGroupIterator.next() // } -// return c.rowGroupIterator.Value() != nil +// return c.rowGroupIterator.value() != nil //} // //func (c *ColumnIterator) Error() error { @@ -73,20 +73,26 @@ import ( // return combined //} +// levels represent repetition and definition levels for a given field. +type levels struct { + repetition uint32 + definition uint32 +} + // Iterator that goes over each row group of the file. -type RowGroupIterator struct { +type rowGroupIterator struct { r *File - rowGroup *RowGroup + rowGroup *rowGroup index int } -func (i *RowGroupIterator) Next() bool { +func (i *rowGroupIterator) next() bool { if i.index >= len(i.r.metadata.rowGroups) { i.rowGroup = nil return false } - i.rowGroup = &RowGroup{ + i.rowGroup = &rowGroup{ r: i.r, raw: i.r.metadata.rowGroups[i.index], } @@ -94,34 +100,30 @@ func (i *RowGroupIterator) Next() bool { return true } -func (i *RowGroupIterator) Value() *RowGroup { +func (i *rowGroupIterator) value() *rowGroup { return i.rowGroup } -type RowGroup struct { +type rowGroup struct { r *File raw *pthrift.RowGroup } -// Construct a ColumnIterator for the column at path in schema. -// returns nil if the column does not exist in the schema. -func (rg *RowGroup) Column(path []string) *RowGroupColumnReader { - s := rg.r.metadata.Schema.At(path...) - if s == nil { - return nil - } - md := rg.metadataForColumn(path) +// Construct a ColumnIterator for the column at path in s. +// returns nil if the column does not exist in the s. +func (rg *rowGroup) Column(schema *Schema) *rowGroupColumnReader { + md := rg.metadataForColumn(schema.Path) if md == nil { return nil } - return &RowGroupColumnReader{ - r: rg.r.thrift.Fork(), - schema: s, - md: md, + return &rowGroupColumnReader{ + r: rg.r.thrift.Fork(), + s: schema, + md: md, } } -func (rg *RowGroup) metadataForColumn(path []string) *pthrift.ColumnMetaData { +func (rg *rowGroup) metadataForColumn(path []string) *pthrift.ColumnMetaData { // TODO: build a hashmap of column path -> metadata? columns: for _, col := range rg.raw.Columns { @@ -141,36 +143,36 @@ columns: } // Iterator that goes over every value for a given column across all pages for a -// given RowGroup. Look at ColumnIterator if you want to iterate for all values +// given rowGroup. Look at ColumnIterator if you want to iterate for all values // of a column across rowGroups. -type RowGroupColumnReader struct { - r *thrift.Reader - schema *Schema - md *pthrift.ColumnMetaData +type rowGroupColumnReader struct { + r *thrift.Reader + s *Schema + md *pthrift.ColumnMetaData ready bool totalRows int64 rowsRead int64 - pageIterator *PageReader + pageIterator *pageReader } -func (i *RowGroupColumnReader) Schema() *Schema { return i.schema } +func (i *rowGroupColumnReader) schema() *Schema { return i.s } -func (i *RowGroupColumnReader) Peek() (Levels, error) { +func (i *rowGroupColumnReader) peek() (levels, error) { err := i.ensurePageAvailable() if err != nil { - return Levels{}, err + return levels{}, err } - return i.pageIterator.Peek() + return i.pageIterator.peek() } -func (i *RowGroupColumnReader) Read(b RowBuilder) error { +func (i *rowGroupColumnReader) read(b RowBuilder) error { err := i.ensurePageAvailable() if err != nil { return err } - err = i.pageIterator.Read(b) + err = i.pageIterator.read(b) if err != nil { return err } @@ -180,7 +182,7 @@ func (i *RowGroupColumnReader) Read(b RowBuilder) error { } // Ensures that the reader has seeked to the beginning of the first page. -func (i *RowGroupColumnReader) ensureReady() error { +func (i *rowGroupColumnReader) ensureReady() error { if i.ready { return nil } @@ -197,10 +199,10 @@ func (i *RowGroupColumnReader) ensureReady() error { return nil } -// Ensures the current pageIterator has at least one row available, or asks for +// Ensures the top pageIterator has at least one row available, or asks for // the next page. // Returns EOF if all the records have been read. -func (i *RowGroupColumnReader) ensurePageAvailable() error { +func (i *rowGroupColumnReader) ensurePageAvailable() error { err := i.ensureReady() if err != nil { return err @@ -210,11 +212,11 @@ func (i *RowGroupColumnReader) ensurePageAvailable() error { return EOF } - if i.pageIterator != nil && i.pageIterator.Done() { + if i.pageIterator != nil && i.pageIterator.done() { i.pageIterator = nil } - // TODO: that seems odd, why does the PageReader need to be recreated? + // TODO: that seems odd, why does the pageReader need to be recreated? if i.pageIterator == nil { codecName := i.md.GetCodec() var codec compressionCodec @@ -224,9 +226,9 @@ func (i *RowGroupColumnReader) ensurePageAvailable() error { default: return fmt.Errorf("unknown codec: %s", codecName) } - i.pageIterator = &PageReader{ + i.pageIterator = &pageReader{ r: i.r, - schema: i.schema, + schema: i.s, compressionCodec: codec, } } @@ -236,13 +238,8 @@ func (i *RowGroupColumnReader) ensurePageAvailable() error { // EOF indicates the end of the data stream. var EOF = errors.New("EOF") -type Raw struct { - Value interface{} - Levels Levels -} - -// PageReader lazily iterates over values of one page. -type PageReader struct { +// pageReader lazily iterates over values of one page. +type pageReader struct { r *thrift.Reader compressionCodec compressionCodec schema *Schema @@ -259,49 +256,49 @@ type PageReader struct { ready bool } -func (p *PageReader) Done() bool { +func (p *pageReader) done() bool { return p.valuesRead >= p.numValues } -// Peek returns error, repetitionLevel, definitionLevel of the next value. +// peek returns error, repetitionLevel, definitionLevel of the next value. // Returns EOF if there is no more value to read. // Return any other error encountered during opening the page. -func (p *PageReader) Peek() (Levels, error) { - levels := Levels{} +func (p *pageReader) peek() (levels, error) { + levels := levels{} err := p.ensureReady() if err != nil { return levels, err } - if p.Done() { + if p.done() { return levels, EOF } if p.repetitionLevels != nil { - levels.Repetition = p.repetitionLevels[p.valuesRead] + levels.repetition = p.repetitionLevels[p.valuesRead] } if p.definitionLevels != nil { - levels.Definition = p.definitionLevels[p.valuesRead] + levels.definition = p.definitionLevels[p.valuesRead] } return levels, nil } -func (p *PageReader) Read(b RowBuilder) error { +func (p *pageReader) read(b RowBuilder) error { err := p.ensureReady() if err != nil { return err } - if p.Done() { + if p.done() { return EOF } - levels, err := p.Peek() + levels, err := p.peek() if err != nil { return err } - if levels.Definition < p.schema.DefinitionLevel { + if levels.definition < p.schema.DefinitionLevel { err = b.PrimitiveNil(p.schema) } else { err = b.Primitive(p.schema, p.valueDecoder) @@ -310,7 +307,7 @@ func (p *PageReader) Read(b RowBuilder) error { return err } -func (p *PageReader) ensureReady() error { +func (p *pageReader) ensureReady() error { if p.ready { return nil } @@ -368,10 +365,10 @@ func (p *PageReader) ensureReady() error { // 2. maybe parse repetition levels. // - // Repetition levels are skipped when the column is not nested + // repetition levels are skipped when the column is not nested // (path = 1). In that case, p.repetitionLevels stays nil, and 0 // will always be provided to the callback. - if len(p.schema.Path) > 1 { + if p.schema.RepetitionLevel > 0 { // we need to figure out what is the maximum possible // level of repetition so that we can know how many bits // at most are required to express repetitions level. @@ -393,7 +390,7 @@ func (p *PageReader) ensureReady() error { // (if encoded, it will always have the value of the max // definition level). In that case, p.definitionLevels stays // nil, and 0 will always be provided to the callback. - if p.schema.DefinitionLevel >= 1 { + if p.schema.DefinitionLevel > 0 { bitWidth := bits.Len32(p.schema.DefinitionLevel) p.definitionLevels = make([]uint32, p.numValues) p.definitionLevelDecoder.prepare(p.reader) diff --git a/iterators_test.go b/iterators_test.go index d846c4ae..5ee41dc6 100644 --- a/iterators_test.go +++ b/iterators_test.go @@ -72,7 +72,7 @@ func TestPageIterator(t *testing.T) { _, err = reader.thrift.Seek(reader.metadata.rowGroups[0].Columns[0].GetMetaData().GetDataPageOffset(), io.SeekStart) require.NoError(t, err) - it := PageReader{ + it := pageReader{ r: reader.thrift, schema: reader.metadata.Schema.At("a"), compressionCodec: &snappyCodec{}, @@ -80,13 +80,13 @@ func TestPageIterator(t *testing.T) { require.NotNil(t, it.schema) b := &testBuilder{} - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(1), b.last) - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(2), b.last) - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(3), b.last) }) @@ -120,7 +120,7 @@ func TestPageIteratorLevels(t *testing.T) { _, err = reader.thrift.Seek(reader.metadata.rowGroups[0].Columns[0].GetMetaData().GetDataPageOffset(), io.SeekStart) require.NoError(t, err) - it := PageReader{ + it := pageReader{ r: reader.thrift, schema: reader.metadata.Schema.At("a"), compressionCodec: &snappyCodec{}, @@ -129,13 +129,13 @@ func TestPageIteratorLevels(t *testing.T) { b := &testBuilder{} - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(1), b.last) - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, nil, b.last) - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(3), b.last) }) @@ -171,18 +171,18 @@ func TestRowGroupColumnIterator(t *testing.T) { _, err = reader.thrift.Seek(md.GetDataPageOffset(), io.SeekStart) require.NoError(t, err) - it := RowGroupColumnReader{ - r: reader.thrift, - md: md, - schema: reader.metadata.Schema.At("a"), + it := rowGroupColumnReader{ + r: reader.thrift, + md: md, + s: reader.metadata.Schema.At("a"), } - require.NotNil(t, it.schema) + require.NotNil(t, it.s) b := &testBuilder{} for i := 0; i < recordsCount; i++ { t.Log("testing record", i) - err = it.Read(b) + err = it.read(b) assert.NoError(t, err) assert.Equal(t, int32(i), b.last) } @@ -256,13 +256,13 @@ func TestRowGroupColumnIterator(t *testing.T) { // } // // for i := 0; i < recordsCount; i++ { -// notLast := it.Next() +// notLast := it.next() // require.NoError(t, it.Error(), "record %d: error", i) // require.Equal(t, i < recordsCount-1, notLast, "record %d: not last", i) // assert.Equal(t, Raw{ -// Value: int32(i), -// Levels: Levels{0, 0}, -// }, *it.Value()) +// value: int32(i), +// levels: levels{0, 0}, +// }, *it.value()) // } // require.NoError(t, it.Error()) // }) @@ -327,12 +327,12 @@ func TestRowGroupColumnIterator(t *testing.T) { // } // // for i := 0; i < recordsCount; i++ { -// notLast := it1.Next() +// notLast := it1.next() // require.NoError(t, it1.Error(), "record %d: error", i) // require.Equal(t, i < recordsCount-1, notLast, "record %d: not last", i) // require.Equal(t, int32(i), value1, "record %d: incorrect value", i) // -// notLast = it2.Next() +// notLast = it2.next() // require.NoError(t, it2.Error(), "record %d: error", i) // require.Equal(t, i < recordsCount-1, notLast, "record %d: not last", i) // require.Equal(t, 10000+int32(i), value2, "record %d: incorrect value", i) diff --git a/parquet.go b/parquet.go index d0b44355..3ecf26ce 100644 --- a/parquet.go +++ b/parquet.go @@ -15,9 +15,3 @@ This package additionally provides tooling, similar to parquet-tools. The program is available at ./cmd/ptools. */ package parquet - -// Levels represent Repetition and Definition levels for a given field. -type Levels struct { - Repetition uint32 - Definition uint32 -} diff --git a/plan.go b/plan.go new file mode 100644 index 00000000..05adfedf --- /dev/null +++ b/plan.go @@ -0,0 +1,25 @@ +package parquet + +// Plan describes how a RowReader should behave. It describes which columns +// should be read, and the predicates to apply. The empty value is a plan that +// considers all the columns in the file's s a do not filter any row. Can +// be used concurrently, and by multiple RowReaders. +type Plan struct { + s *Schema +} + +var defaultPlan = &Plan{} + +// schema returns the Schema needed by the Plan. +// +// It is used by RowReader to build its readers. This is probably not a great +// abstraction as it will be difficult to implement predicate pushdown, but at +// least it is compatible with what we already have. +// +// Returns nil to instruct the user to use the whole file's schema. +func (p *Plan) schema() *Schema { + if p == nil { + return nil + } + return p.s +} diff --git a/reader.go b/reader.go index 73a58e47..244cbb65 100644 --- a/reader.go +++ b/reader.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "github.com/segmentio/parquet/internal/debug" pthrift "github.com/segmentio/parquet/internal/gen-go/parquet" "github.com/segmentio/parquet/internal/thrift" ) @@ -106,9 +105,9 @@ func OpenFile(r io.ReadSeeker) (*File, error) { return f, nil } -// RowGroups returns an iterator over all the File's row groups. -func (f *File) RowGroups() *RowGroupIterator { - return &RowGroupIterator{r: f} +// rowGroups returns an iterator over all the File's row groups. +func (f *File) rowGroups() *rowGroupIterator { + return &rowGroupIterator{r: f} } // Return a pointer to the File's metadata. @@ -139,28 +138,20 @@ func (f *File) checkMagicBoundary() error { return nil } -// Field is a value read from a parquet file and a pointer to its schema node. -type Field struct { - Schema *Schema - // Value is either a *Raw for a leaf node, or a []Field of any other - // node. - Value interface{} -} - type reader interface { - // Read returns the next value. + // read returns the next value. // Expected to return (nil, EOF) when all values have been consumed. - Read(b RowBuilder) error + read(b RowBuilder) error // Advance the reader without calling back a builder. - Skip() error + skip() error // Peek returns repetition and definition levels for the next value. - Peek() (Levels, error) + peek() (levels, error) // Bind resets the state of the root and adjust any relevant parameter - // based on the RowGroup header. - Bind(rg *RowGroup) + // based on the rowGroup header. + bind(rg *rowGroup) } // RowBuilder interface needs to be implemented to interpret the results of the @@ -171,7 +162,7 @@ type RowBuilder interface { // Called when a primitive field is read. // It is up to this function to call the right method on the decoder based - // on the the schema of the field. + // on the the s of the field. // // Returning an error stops the parsing of the whole file. Primitive(s *Schema, d Decoder) error @@ -229,35 +220,36 @@ var theVoidBuilder = &voidBuilder{} // RowReader reads a parquet file, assembling rows as it goes. // // This is the high-level interface for reading parquet files. RowReader -// presents an iterator interface. Call Next to advance the reader, Value to -// retrieve the current value, and finally check for errors with Error when you -// are done. +// calls back methods on a RowBuilder interface as it assemble rows. +// Call Read() to assemble the next row. It returns EOF when the file has been +// fully processed. // // Example: // // rowReader := parquet.NewRowReader(file) -// for rowReader.Next() { -// v := rowReader.Value() -// fmt.Println(v) -// } -// err = rowReader.Error() -// if err != nil { -// panic(err) +// for { +// // builder implements RowBuilder. +// err := rowReader.Read(builder) +// if err == parquet.EOF { +// break +// } +// // Do something with err or whatever builder does. // } +// +// See StructPlanner to create a RowBuilder that constructs Go structures for +// each row. type RowReader struct { reader *File schema *Schema root reader - rowGroups *RowGroupIterator + rowGroups *rowGroupIterator // state - rowGroup *RowGroup + rowGroup *rowGroup } -// Next assembles the next row, calling back the RowBuilder. -// Returns true if a row was assembled. -// Use Value to retrieve the row, and Error to check whether something went -// wrong. +// Read assembles the next row, calling back the RowBuilder. +// Returns EOF when all rows have been read. func (r *RowReader) Read(b RowBuilder) error { if r.rowGroup == nil { if !r.nextRowGroup() { @@ -265,12 +257,12 @@ func (r *RowReader) Read(b RowBuilder) error { } } - _, err := r.root.Peek() + _, err := r.root.peek() if err == EOF { if !r.nextRowGroup() { return EOF } - _, err = r.root.Peek() + _, err = r.root.peek() } if err != nil { return err @@ -278,7 +270,7 @@ func (r *RowReader) Read(b RowBuilder) error { b.Begin() - err = r.root.Read(b) + err = r.root.read(b) if err != nil { return err } @@ -288,44 +280,52 @@ func (r *RowReader) Read(b RowBuilder) error { return nil } -// WithSchema instructs the RowReader to follow the provided schema instead of -// the one described in the Parquet file. -func (r *RowReader) WithSchema(s *Schema) { - r.schema = s -} - func (r *RowReader) nextRowGroup() bool { - if !r.rowGroups.Next() { + if !r.rowGroups.next() { return false } - debug.Format("-- New row group") - r.rowGroup = r.rowGroups.Value() - r.root.Bind(r.rowGroup) + r.rowGroup = r.rowGroups.value() + r.root.bind(r.rowGroup) return true } -// NewRowReader builds a RowReader using data from r and calling back b. -func NewRowReader(r *File) *RowReader { +// NewRowReaderWithPlan builds a RowReader using data from f and following the plan p. +func NewRowReaderWithPlan(p *Plan, f *File) *RowReader { + schema := p.schema() + if schema == nil { + // use the file's s if the plan is not providing one + // + // TODO: do not parse the file's s if the plan already + // provides one. + schema = f.metadata.Schema + } return &RowReader{ - // The root of a schema has to define a group. - root: newGroupReader(r.metadata.Schema), - reader: r, - rowGroups: r.RowGroups(), - schema: r.metadata.Schema, + // The root of a s has to define a group. + root: newGroupReader(schema), + reader: f, + rowGroups: f.rowGroups(), + schema: schema, } } +// NewRowReader constructs a RowReader to read rows from f. +// +// Use NewRowReaderWithPlan to process rows more finely with a Plan. +func NewRowReader(f *File) *RowReader { + return NewRowReaderWithPlan(defaultPlan, f) +} + // newReader builds the appropriate reader for s. func newReader(s *Schema) reader { // TODO: handle optionals switch s.Kind { - case PrimitiveKind: + case primitiveKind: return newPrimitiveReader(s) - case MapKind: + case mapKind: return newKvReader(s) - case RepeatedKind: + case repeatedKind: return newRepeatedReader(s) - case GroupKind: + case groupKind: return newGroupReader(s) default: panic("unhandled group kind") @@ -337,15 +337,19 @@ type primitiveReader struct { schema *Schema // will be bound at read time - it *RowGroupColumnReader - next *Raw + it *rowGroupColumnReader } -func (r *primitiveReader) String() string { return "PrimaryReader " + r.schema.Name } -func (r *primitiveReader) Peek() (Levels, error) { return r.it.Peek() } -func (r *primitiveReader) Bind(rg *RowGroup) { r.it = rg.Column(r.schema.Path) } -func (r *primitiveReader) Skip() error { return r.it.Read(theVoidBuilder) } -func (r *primitiveReader) Read(b RowBuilder) error { return r.it.Read(b) } +func (r *primitiveReader) String() string { return "PrimaryReader " + r.schema.Name } +func (r *primitiveReader) peek() (levels, error) { return r.it.peek() } +func (r *primitiveReader) bind(rg *rowGroup) { + r.it = rg.Column(r.schema) + if r.it == nil { // TODO: proper error handling + panic(fmt.Errorf("could not find a column at %s for reader %s", r.schema.Path, r.String())) + } +} +func (r *primitiveReader) skip() error { return r.it.read(theVoidBuilder) } +func (r *primitiveReader) read(b RowBuilder) error { return r.it.read(b) } func newPrimitiveReader(s *Schema) *primitiveReader { return &primitiveReader{schema: s} @@ -358,19 +362,19 @@ type groupReader struct { } func (r *groupReader) String() string { return "GroupReader " + r.schema.Name } -func (r *groupReader) Peek() (Levels, error) { - return r.readers[0].Peek() +func (r *groupReader) peek() (levels, error) { + return r.readers[0].peek() } -func (r *groupReader) Bind(rg *RowGroup) { +func (r *groupReader) bind(rg *rowGroup) { for _, reader := range r.readers { - reader.Bind(rg) + reader.bind(rg) } } -func (r *groupReader) Read(b RowBuilder) error { +func (r *groupReader) read(b RowBuilder) error { b.GroupBegin(r.schema) for _, reader := range r.readers { - err := reader.Read(b) + err := reader.read(b) if err != nil { return err } @@ -379,9 +383,9 @@ func (r *groupReader) Read(b RowBuilder) error { return nil } -func (r *groupReader) Skip() error { +func (r *groupReader) skip() error { for _, reader := range r.readers { - err := reader.Skip() + err := reader.skip() if err != nil { return err } @@ -409,25 +413,25 @@ type kvReader struct { } func (r *kvReader) String() string { return "KVReader " + r.schema.Name } -func (r *kvReader) Bind(rg *RowGroup) { - r.keyReader.Bind(rg) - r.valueReader.Bind(rg) +func (r *kvReader) bind(rg *rowGroup) { + r.keyReader.bind(rg) + r.valueReader.bind(rg) } -func (r *kvReader) Peek() (Levels, error) { - return r.keyReader.Peek() +func (r *kvReader) peek() (levels, error) { + return r.keyReader.peek() } -func (r *kvReader) Skip() error { - err := r.keyReader.Skip() +func (r *kvReader) skip() error { + err := r.keyReader.skip() if err != nil { return err } - return r.valueReader.Skip() + return r.valueReader.skip() } -func (r *kvReader) Read(b RowBuilder) error { - _, err := r.Peek() +func (r *kvReader) read(b RowBuilder) error { + _, err := r.peek() if err != nil { return err } @@ -435,42 +439,42 @@ func (r *kvReader) Read(b RowBuilder) error { b.RepeatedBegin(r.schema) for { - levels, err := r.Peek() + levels, err := r.peek() if err != nil { return err } - if levels.Definition > r.schema.DefinitionLevel { + if levels.definition > r.schema.DefinitionLevel { b.KVBegin(r.schema.Children[0]) - err = r.keyReader.Read(b) + err = r.keyReader.read(b) if err != nil { return err } - err = r.valueReader.Read(b) + err = r.valueReader.read(b) if err != nil { return err } b.KVEnd(r.schema.Children[0]) } else { - err = r.keyReader.Skip() + err = r.keyReader.skip() if err != nil { return err } - err = r.valueReader.Skip() + err = r.valueReader.skip() if err != nil { return err } break } - levels, err = r.Peek() + levels, err = r.peek() if err == EOF { break } if err != nil { return err } - if levels.Repetition <= r.schema.RepetitionLevel { + if levels.repetition <= r.schema.RepetitionLevel { break } } @@ -549,12 +553,12 @@ type repeatedReader struct { } func (r *repeatedReader) String() string { return "RepeatedReader " + r.schema.Name } -func (r *repeatedReader) Bind(rg *RowGroup) { r.reader.Bind(rg) } -func (r *repeatedReader) Peek() (Levels, error) { return r.reader.Peek() } -func (r *repeatedReader) Skip() error { return r.reader.Skip() } -func (r *repeatedReader) Read(b RowBuilder) error { +func (r *repeatedReader) bind(rg *rowGroup) { r.reader.bind(rg) } +func (r *repeatedReader) peek() (levels, error) { return r.reader.peek() } +func (r *repeatedReader) skip() error { return r.reader.skip() } +func (r *repeatedReader) read(b RowBuilder) error { // check for EOF - _, err := r.Peek() + _, err := r.peek() if err != nil { return err } @@ -562,26 +566,26 @@ func (r *repeatedReader) Read(b RowBuilder) error { b.RepeatedBegin(r.schema) for { - levels, err := r.Peek() + levels, err := r.peek() if err != nil { return err } - if levels.Definition > r.schema.DefinitionLevel { - err = r.reader.Read(b) + if levels.definition > r.schema.DefinitionLevel { + err = r.reader.read(b) if err != nil { return err } } else { // this is an empty list - err = r.reader.Skip() + err = r.reader.skip() if err != nil { return err } break } - levels, err = r.Peek() + levels, err = r.peek() if err == EOF { break } @@ -590,7 +594,7 @@ func (r *repeatedReader) Read(b RowBuilder) error { } // next level will be part of a different row - if levels.Repetition <= r.schema.RepetitionLevel { + if levels.repetition <= r.schema.RepetitionLevel { break } } @@ -628,14 +632,14 @@ func newRepeatedReader(s *Schema) *repeatedReader { if list.Name != "list" { panic(fmt.Errorf("LIST's child must be named 'list' (not '%s')", list.Name)) } - if list.Kind != RepeatedKind { - panic("LIST's list child must be a group") + if list.Kind != repeatedKind { + panic("LIST's child must be repeated") } if list.Repetition != pthrift.FieldRepetitionType_REPEATED { - panic(fmt.Errorf("LIST's list child but be repeated, not %s", list.Repetition)) + panic(fmt.Errorf("LIST's child but be repeated, not %s", list.Repetition)) } if len(list.Children) != 1 { - panic("LIST's list child must have exactly one child") + panic("LIST's child must have exactly one child") } // The element field encodes the list's element type and repetition. diff --git a/reflect.go b/reflect.go deleted file mode 100644 index 4a852605..00000000 --- a/reflect.go +++ /dev/null @@ -1,111 +0,0 @@ -package parquet - -import ( - "fmt" - "reflect" - - "github.com/iancoleman/strcase" - pthrift "github.com/segmentio/parquet/internal/gen-go/parquet" -) - -// SchemaOf builds a schema tree by reflecting on a provided value v. -// -// It uses the following mapping between Go values and Parquet -// schema: -// -// TODO -// -// Optionals are decoded as pointers of the type they wrap. -// -// By default, it maps a field's name to its snake_case equivalent. You can -// overwrite this behavior on a per-field basis using the `parquet:"..."` field -// annotation. -// -// Only exported fields are considered. -func SchemaOf(v interface{}) *Schema { - t := reflect.TypeOf(v) - t = derefence(t) - - root := fromStruct(t) - root.Name = "root" - root.Root = true - - return root -} - -// fromStruct creates a schema tree from the provided struct type. -func fromStruct(t reflect.Type) *Schema { - assertKind(t, reflect.Struct) - - node := &Schema{ - Kind: GroupKind, - } - - n := t.NumField() - for i := 0; i < n; i++ { - field := t.Field(i) - if field.PkgPath != "" { - // ignore non-exported fields - continue - } - child := fromAny(field.Type) - child.Name = normalizeName(field.Name) - node.Add(child) - } - - return node -} - -// fromAny creates a schema tree for any type (does the dispatch to the right -// from* method). -func fromAny(t reflect.Type) *Schema { - t = derefence(t) - - switch t.Kind() { - case reflect.Struct: - return fromStruct(t) - case reflect.Int32: - return fromPrimitive(t) - default: - panic(fmt.Errorf("unhandled kind: %s", t.Kind())) - } -} - -// fromPrimitive creates a schema leaf for a Go type that maps directly to a -// Parquet primitive type. -func fromPrimitive(t reflect.Type) *Schema { - var physicalType pthrift.Type - switch t.Kind() { - case reflect.Int32: - physicalType = pthrift.Type_INT32 - default: - panic(fmt.Errorf("unhandled kind: %s", t.Kind())) - } - - node := &Schema{ - Kind: PrimitiveKind, - PhysicalType: physicalType, - ConvertedType: nil, - LogicalType: nil, - } - - return node -} - -func assertKind(t reflect.Type, expected reflect.Kind) { - if t.Kind() != expected { - panic(fmt.Errorf("kind should be %s, not %s", expected, t.Kind())) - } -} - -// recursively derefence a pointer type to a non pointer type -func derefence(t reflect.Type) reflect.Type { - if t.Kind() != reflect.Ptr { - return t - } - return derefence(t.Elem()) -} - -func normalizeName(name string) string { - return strcase.ToSnake(name) -} diff --git a/reflect_test.go b/reflect_test.go deleted file mode 100644 index 3009ed03..00000000 --- a/reflect_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package parquet_test - -import ( - "testing" - - "github.com/segmentio/parquet" - - pthrift "github.com/segmentio/parquet/internal/gen-go/parquet" - "github.com/stretchr/testify/assert" -) - -func TestSchemaOf(t *testing.T) { - type Record struct { - Col int32 - ignored int32 - } - - expected := &parquet.Schema{ - Name: "root", - Kind: parquet.GroupKind, - Root: true, - } - expected.Add(&parquet.Schema{ - Name: "col", - Kind: parquet.PrimitiveKind, - PhysicalType: pthrift.Type_INT32, - Path: []string{"col"}, - }) - - result := parquet.SchemaOf(new(Record)) - - assert.Equal(t, expected, result) -} diff --git a/schema.go b/schema.go index f58a76bf..2759625c 100644 --- a/schema.go +++ b/schema.go @@ -9,38 +9,44 @@ import ( // Schema represents a node in the schema tree of Parquet. type Schema struct { - ID int32 - Name string - Kind Kind - PhysicalType pthrift.Type - ConvertedType *pthrift.ConvertedType - LogicalType *pthrift.LogicalType - Repetition pthrift.FieldRepetitionType - Scale int32 - Precision int32 + // User-definable attributes + ID int32 + Name string + PhysicalType pthrift.Type + ConvertedType *pthrift.ConvertedType + LogicalType *pthrift.LogicalType + Repetition pthrift.FieldRepetitionType + Scale int32 + Precision int32 + + // Computed attributes (parquets spec) RepetitionLevel uint32 DefinitionLevel uint32 Path []string - Root bool - parent *Schema - Children []*Schema + // Computed attributes (segmentio/parquet) + Root bool + Kind kind + + // Tree structure + parent *Schema + Children []*Schema } -// Kind indicates the kind of structure a schema node (and its descendent in +// kind indicates the kind of structure a schema node (and its descendent in // the tree) represent. -type Kind int +type kind int const ( - // PrimitiveKind nodes are terminal nodes that map to actual columns in + // primitiveKind nodes are terminal nodes that map to actual columns in // the parquet files. - PrimitiveKind Kind = iota - // GroupKind nodes represent compound objects mad of multiple nodes. - GroupKind - // MapKind nodes are follow the MAP structure. - MapKind - // RepeatedKind nodes are nodes that are expected to be repeated and not + primitiveKind kind = iota + // groupKind nodes represent compound objects made of multiple nodes. + groupKind + // mapKind nodes are follow the MAP structure. + mapKind + // repeatedKind nodes are nodes that are expected to be repeated and not // following the MAP structure. - RepeatedKind + repeatedKind ) func (sn *Schema) Parent() *Schema { @@ -67,7 +73,7 @@ func (sn *Schema) At(path ...string) *Schema { // Panics if called on the root of the schema. func (sn *Schema) Remove() { if sn.Root { - panic("tried to remove the root of the schema") + panic("tried to remove the root of the s") } p := sn.Parent() for i := range p.Children { @@ -79,7 +85,7 @@ func (sn *Schema) Remove() { } } - if p.Kind != GroupKind { + if p.Kind != groupKind { p.Remove() } } @@ -127,36 +133,58 @@ func (sn *Schema) traverse(path []string, f func(node *Schema)) error { } // Add a node as a direct child of this node. -// It updates the parent/children relationships, and the path of the provided -// node. +// It updates the parent/children relationships. func (sn *Schema) Add(node *Schema) { sn.Children = append(sn.Children, node) node.parent = sn - node.Path = make([]string, len(sn.Path), len(sn.Path)+1) - copy(node.Path, sn.Path) - node.Path = append(node.Path, node.Name) } -// Walk the schema tree depth-first, calling walkFn for every node visited. -// If walk fn returns an error, the walk stops. -// It is recommended to not modify the tree while walking it. -func Walk(n *Schema, walkFn WalkFn) error { - err := walkFn(n) - if err != nil { - return err - } - for _, c := range n.Children { - err := Walk(c, walkFn) - if err != nil { - return err - } +// Compute walks the Schema and update all computed attributes. +func (sn *Schema) Compute() { + if sn.parent == nil { + sn.Root = true } - return nil + if sn.parent != nil { + sn.RepetitionLevel = sn.parent.RepetitionLevel + sn.DefinitionLevel = sn.parent.DefinitionLevel + sn.Path = newPath(sn.parent.Path, sn.Name) + } + + if sn.Repetition == pthrift.FieldRepetitionType_REPEATED { + sn.RepetitionLevel++ + } + if sn.Repetition != pthrift.FieldRepetitionType_REQUIRED { + sn.DefinitionLevel++ + } + + sn.Kind = computeKind(sn) + + for _, c := range sn.Children { + c.Compute() + } } -// WalkFn is the type of functions called for each node visited by Walk. -type WalkFn func(n *Schema) error +func computeKind(s *Schema) kind { + if len(s.Children) == 0 { + return primitiveKind + } + + if s.ConvertedType != nil { + switch *s.ConvertedType { + case pthrift.ConvertedType_MAP, pthrift.ConvertedType_MAP_KEY_VALUE: + return mapKind + case pthrift.ConvertedType_LIST: + return repeatedKind + } + } + + if s.Repetition == pthrift.FieldRepetitionType_REPEATED { + return repeatedKind + } + + return groupKind +} var errEmptySchema = errors.New("empty schema") @@ -176,6 +204,8 @@ func schemaFromFlatElements(elements []*pthrift.SchemaElement) (*Schema, error) return nil, fmt.Errorf("should have consumed %d elements but got %d instead", len(elements), consumed) } + root.Compute() + return root, nil } @@ -198,18 +228,6 @@ func flatThriftSchemaToTreeRecurse(current *Schema, left []*pthrift.SchemaElemen current.Scale = el.GetScale() current.Precision = el.GetPrecision() current.Children = make([]*Schema, el.GetNumChildren()) - current.Kind = kindFromSchemaElement(el) - if !current.Root { - current.Path = append(current.Path, el.GetName()) - } - - if current.Repetition == pthrift.FieldRepetitionType_REPEATED { - current.RepetitionLevel++ - } - - if current.Repetition != pthrift.FieldRepetitionType_REQUIRED { - current.DefinitionLevel++ - } offset := 1 for i := int32(0); i < el.GetNumChildren(); i++ { @@ -226,23 +244,9 @@ func flatThriftSchemaToTreeRecurse(current *Schema, left []*pthrift.SchemaElemen return offset } -func kindFromSchemaElement(el *pthrift.SchemaElement) Kind { - if el.GetNumChildren() == 0 { - return PrimitiveKind - } - - if el.ConvertedType != nil { - switch *el.ConvertedType { - case pthrift.ConvertedType_MAP, pthrift.ConvertedType_MAP_KEY_VALUE: - return MapKind - case pthrift.ConvertedType_LIST: - return RepeatedKind - } - } - - if el.GetRepetitionType() == pthrift.FieldRepetitionType_REPEATED { - return RepeatedKind - } - - return GroupKind +func newPath(path []string, name string) []string { + newPath := make([]string, len(path)+1) + copy(newPath, path) + newPath[len(path)] = name + return newPath } diff --git a/schema_test.go b/schema_test.go index a945e89f..31d4bf3d 100644 --- a/schema_test.go +++ b/schema_test.go @@ -7,28 +7,9 @@ import ( "testing" "github.com/segmentio/parquet" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestNodeDelete(t *testing.T) { - a := &parquet.Schema{Name: "a"} - b := &parquet.Schema{Name: "b"} - - root := &parquet.Schema{ - Name: "root", - Root: true, - Kind: parquet.GroupKind, - } - - root.Add(a) - root.Add(b) - - root.At("a").Remove() - - assert.Equal(t, []*parquet.Schema{b}, root.Children) -} - func TestNodeStageDelete(t *testing.T) { file, err := os.Open("./examples/stage-small.parquet") require.NoError(t, err) @@ -145,7 +126,7 @@ func TestNodeStageDelete(t *testing.T) { func readableFlatTree(root *parquet.Schema) string { var b strings.Builder - err := parquet.Walk(root, func(n *parquet.Schema) error { + err := walk(root, func(n *parquet.Schema) error { b.WriteString(fmt.Sprintf("%s%s", strings.Repeat(".", len(n.Path)), n.Name)) b.WriteRune('\n') return nil @@ -155,3 +136,18 @@ func readableFlatTree(root *parquet.Schema) string { } return b.String() } + +func walk(n *parquet.Schema, walkFn func(n *parquet.Schema) error) error { + err := walkFn(n) + if err != nil { + return err + } + for _, c := range n.Children { + err := walk(c, walkFn) + if err != nil { + return err + } + } + + return nil +} diff --git a/struct_builder.go b/struct_builder.go index 1797680a..14134493 100644 --- a/struct_builder.go +++ b/struct_builder.go @@ -1,72 +1,33 @@ package parquet +import ( + "fmt" + "reflect" +) + // StructBuilder implements the RowBuilder interface. -// See NewStructBuilder for details. +// It executes the plan constructed by StructPlanner. type StructBuilder struct { -} - -// NewStructBuilder creates a new StructBuilder targeting v. -// This method will generate a plan to efficiently decode and construct the -// type of the data pointed to by v. -// -// The row is constructed on the value pointed by v. The builder will overwrite -// any fields present in the interface, as encountered during the decoding. -// Fields not encountered during the decoding won't be modified. -// -// StructBuilder uses the following mapping between Go values and Parquet -// schema: -// -// BOOLEAN -> bool -// INT32 (no annotation) -> int32 -// INT64 (no annotation) -> int64 -// Bit width 8, 16, 32, 64 and sign true/false annotations map to their -// respective Go types. -// INT96 -> not supported -// FLOAT -> float32 -// DOUBLE -> float64 -// DECIMAL -> not supported -// BYTE_ARRAY -> []byte -// STRING -> string -// ENUM -> string -// UUID -> TODO []byte (of length 16) -// DATE -> TODO time.Time -// TIME -> TODO time.Time -// TIMESTAMP -> TODO time.Time -// INTERVAL -> not supported -// JSON -> []byte -// BSON -> []byte -// LIST -> []T -// MAP -> map[K]V -// NULL -> not supported -// -// Optionals are decoded as pointers of the type they wrap. -// -// This builder follows parquet-format's Logical Types specification. Read -// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md -// for details about the semantics of the conversions. -// -// By default, the builder maps a field's name to its snake_case equivalent. -// You can overwrite this behavior on a per-field basis using the -// `parquet:"..."` field annotation. -// -// Only exported fields are considered. -func NewStructBuilder(s *Schema) *StructBuilder { - panic("implement me") -} - -// To changes the target of the struct builder. -// The target has to be of the same type as the one used to create the struct -// builder, or this function will panic. -func (sb *StructBuilder) To(v interface{}) *StructBuilder { - panic("implement me") + index map[*Schema]*blueprint + target reflect.Value + stack valueStack } func (sb *StructBuilder) Begin() { - panic("implement me") + sb.stack.push(sb.target.Elem()) } func (sb *StructBuilder) Primitive(s *Schema, d Decoder) error { - panic("implement me") + bp, ok := sb.index[s] + if !ok { + panic("entry in schema not found") + } + v, err := bp.read(d) + if err != nil { + return err + } + bp.set(&sb.stack, v) + return nil } func (sb *StructBuilder) PrimitiveNil(s *Schema) error { @@ -74,29 +35,47 @@ func (sb *StructBuilder) PrimitiveNil(s *Schema) error { } func (sb *StructBuilder) GroupBegin(s *Schema) { - panic("implement me") + if s.Root { + // TODO: maybe support top being nil and create the top level structure? + return + } + bp := sb.index[s] + v := bp.create() + n := bp.set(&sb.stack, v) + sb.stack.push(n) } func (sb *StructBuilder) GroupEnd(node *Schema) { - panic("implement me") + sb.stack.pop() } func (sb *StructBuilder) RepeatedBegin(s *Schema) { - panic("implement me") + bp := sb.index[s] + v := bp.create() + bp.set(&sb.stack, v) + sb.stack.push(v) } func (sb *StructBuilder) RepeatedEnd(node *Schema) { - panic("implement me") + sb.stack.pop() } func (sb *StructBuilder) KVBegin(node *Schema) { - panic("implement me") + // nothing to do } func (sb *StructBuilder) KVEnd(node *Schema) { - panic("implement me") + // nothing to do } func (sb *StructBuilder) End() { - panic("implement me") + // nothing to do +} + +func (sb *StructBuilder) To(v interface{}) *StructBuilder { + sb.target = reflect.ValueOf(v) + if sb.target.Kind() != reflect.Ptr { + panic(fmt.Errorf("need to target a pointer, not %s", sb.target.Kind())) + } + return sb } diff --git a/struct_builder_test.go b/struct_builder_test.go index b1490414..ece4c2b9 100644 --- a/struct_builder_test.go +++ b/struct_builder_test.go @@ -1,24 +1,280 @@ package parquet_test -//func TestStructBuilderSimple(t *testing.T) { -// type MyRecord struct { -// Name string -// Foo int32 `parquet:"bar"` -// } -// -// f, err := parquet.OpenFile() -// require.NoError(t, err) -// -// record := &MyRecord{} -// builder := parquet.NewStructBuilder(record) -// rowReader := parquet.NewRowReader(f) -// -// for { -// err := rowReader.Read(builder.To(record)) -// if err == parquet.EOF { -// break -// } -// require.NoError(t, err) -// fmt.Printf("record: %#v\n", record) -// } -//} +import ( + "os" + "path" + "reflect" + "testing" + + "github.com/segmentio/parquet" + "github.com/segmentio/parquet/internal/benchmark" + "github.com/segmentio/parquet/internal/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + localref "github.com/xitongsys/parquet-go-source/local" + writerref "github.com/xitongsys/parquet-go/writer" +) + +// All the tests in this file use parquet-go and its annotations to generate +// the test parquet files. +// This should be rewritten when we have our own writer. + +func TestStructBuilderSimple(t *testing.T) { + type Record struct { + Name string `parquet:"name=name, type=UTF8"` + Foo int32 `parquet:"name=foo, type=INT_32"` + } + + expected := []interface{}{ + &Record{Name: "name1", Foo: 1}, + &Record{Name: "name2", Foo: 2}, + &Record{Name: "name3", Foo: 3}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderNestedStructs(t *testing.T) { + type Inner struct { + Bar int32 `parquet:"name=bar, type=INT_32"` + } + type Record struct { + Foo int32 `parquet:"name=foo, type=INT_32"` + Inner Inner `parquet:"name=inner"` + } + + expected := []interface{}{ + &Record{Foo: 1, Inner: Inner{Bar: 11}}, + &Record{Foo: 2, Inner: Inner{Bar: 22}}, + &Record{Foo: 3, Inner: Inner{Bar: 33}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderTwoNestedStructs(t *testing.T) { + type Inner struct { + Bar int32 `parquet:"name=bar, type=INT_32"` + } + type InnerTwo struct { + Bartwo int32 `parquet:"name=bartwo, type=INT_32"` + } + type Record struct { + Foo int32 `parquet:"name=foo, type=INT_32"` + Inner Inner `parquet:"name=inner"` + InnerTwo InnerTwo `parquet:"name=inner_two"` + } + + expected := []interface{}{ + &Record{Foo: 1, Inner: Inner{Bar: 11}, InnerTwo: InnerTwo{Bartwo: 111}}, + &Record{Foo: 2, Inner: Inner{Bar: 22}, InnerTwo: InnerTwo{Bartwo: 222}}, + &Record{Foo: 3, Inner: Inner{Bar: 33}, InnerTwo: InnerTwo{Bartwo: 333}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderList(t *testing.T) { + type Record struct { + Foo []int32 `parquet:"name=foo, type=LIST, valuetype=INT32"` + } + + expected := []interface{}{ + &Record{Foo: []int32{1, 2}}, + &Record{Foo: nil}, + &Record{Foo: []int32{3}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderStructListStruct(t *testing.T) { + type Tag struct { + Key string `parquet:"name=key, type=UTF8"` + Value string `parquet:"name=value, type=UTF8"` + } + type Record struct { + Tags []Tag `parquet:"name=tags, type=LIST"` + } + + expected := []interface{}{ + &Record{Tags: []Tag{{Key: "one", Value: "un"}, {Key: "two", Value: "deux"}}}, + &Record{}, + &Record{[]Tag{{Key: "three", Value: "trois"}}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderMap(t *testing.T) { + type Record struct { + Map map[string]string `parquet:"name=map, type=MAP, keytype=UTF8, valuetype=UTF8"` + } + + expected := []interface{}{ + &Record{Map: map[string]string{"one": "un", "two": "deux"}}, + &Record{Map: map[string]string{}}, + &Record{Map: map[string]string{"three": "trois"}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderAnonymousStruct(t *testing.T) { + type Record struct { + Inner struct { + Value int32 `parquet:"name=value, type=INT_32"` + } `parquet:"name=inner"` + } + + expected := []interface{}{ + &Record{Inner: struct { + Value int32 `parquet:"name=value, type=INT_32"` + }{Value: 42}}, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderAnonymousField(t *testing.T) { + type Anon struct { + Avalue int32 `parquet:"name=avalue, type=INT_32"` + } + type Record struct { + Anon `parquet:"name=anon"` + Value int32 `parquet:"name=value, type=INT_32"` + } + + expected := []interface{}{ + &Record{ + Anon: Anon{Avalue: 1}, + Value: 2, + }, + } + + structBuilderTest(t, new(Record), new(Record), expected, expected) +} + +func TestStructBuilderIgnoreNonExported(t *testing.T) { + type Record struct { + ignored int32 + Value int32 `parquet:"name=value, type=INT_32"` + } + + records := []interface{}{ + &Record{ + ignored: 1, + Value: 2, + }, + } + + expected := []interface{}{ + &Record{ + ignored: 0, + Value: 2, + }, + } + + structBuilderTest(t, new(Record), new(Record), expected, records) +} + +func TestStructBuilderTraces(t *testing.T) { + expected := []interface{}{ + &benchmark.Trace{ + KafkaPartition: 1, + KafkaOffset: 2, + TimestampMs: 3, + TraceID: "TRACE_ID", + SpanID: "SPAN_ID", + ParentSpanID: "PARENT_SPAN_ID", + NextTraceID: "NEXT_TRACE_ID", + ParentTraceIDs: nil, + Baggage: []benchmark.Tag{ + { + Name: "BAGGAGE_1", + Value: "BAGGAGE_UN", + }, + { + Name: "BAGGAGE_2", + Value: "BAGGAGE_DEUX", + }, + }, + Tags: []benchmark.Tag{ + { + Name: "TAGS_1", + Value: "TAGS_UN", + }, + }, + MessageID: "MESSAGE_ID", + UserID: "USER_ID", + EventType: "EVENT_TYPE", + SourceID: "SOURCE_ID", + DestinationID: "DESTINATION_ID", + WorkspaceID: "WORKSPACE_ID", + Name: "NAME", + SpanTime: 4, + SpanDuration: 5, + ExchangeTime: 6, + ExchangeDuration: 7, + ExchangeRequestMethod: "METHOD", + ExchangeRequestURL: "URL", + ExchangeRequestHeaders: map[string]string{ + "HEADER_1": "UN", + }, + ExchangeRequestBody: "REQUEST", + ExchangeResponseStatusCode: 8, + ExchangeResponseStatusText: "STATUS", + ExchangeResponseHeaders: map[string]string{}, + ExchangeResponseBody: "RESPONSE", + ExchangeErrorType: "", + ExchangeErrorMessage: "", + }, + } + + structBuilderTest(t, new(benchmark.Trace), new(benchmark.Trace), expected, expected) +} + +func structBuilderTest(t *testing.T, recordPgo, record interface{}, expected []interface{}, records []interface{}) { + // We have to pass two different structs because the annotations are + // different between this library and parquet-go. This should all go + // away when we have our own writer. + test.WithTestDir(t, func(dir string) { + p := path.Join(dir, "test.parquet") + dst, err := localref.NewLocalFileWriter(p) + require.NoError(t, err) + writer, err := writerref.NewParquetWriter(dst, recordPgo, 1) + require.NoError(t, err) + for _, record := range records { + require.NoError(t, writer.Write(record)) + } + require.NoError(t, writer.WriteStop()) + require.NoError(t, dst.Close()) + + f, err := os.Open(p) + require.NoError(t, err) + defer func() { assert.NoError(t, f.Close()) }() + + pf, err := parquet.OpenFile(f) + require.NoError(t, err) + + // TODO: this is a lot of ceremony + planner := parquet.StructPlannerOf(record) + builder := planner.Builder() + plan := planner.Plan() + rowReader := parquet.NewRowReaderWithPlan(plan, pf) + + var actual []interface{} + + for { + target := reflect.New(reflect.TypeOf(record).Elem()).Interface() + err := rowReader.Read(builder.To(target)) + if err == parquet.EOF { + break + } + require.NoError(t, err) + actual = append(actual, target) + } + + require.Equal(t, expected, actual) + }) +} diff --git a/struct_planner.go b/struct_planner.go new file mode 100644 index 00000000..d83ceb45 --- /dev/null +++ b/struct_planner.go @@ -0,0 +1,387 @@ +package parquet + +import ( + "fmt" + "reflect" + "strings" + "unicode" + + pthrift "github.com/segmentio/parquet/internal/gen-go/parquet" +) + +// StructPlanner is a factory for StructBuilder, and generates a Plan for +// RowBuilder that maps to the struct. +type StructPlanner struct { + blueprint *blueprint + index map[*Schema]*blueprint +} + +// StructPlanOf builds a StructPlanner using v's type as a basis. +// v is expected to be a pointer to a struct. +// +// Mapping of Go primitive types to Parquet schema: +// (format: Go -> parquet primitive type [| annotation]) +// +// bool -> BOOLEAN +// int -> INT64 | INT(bits=64, signed=true) +// int8 -> INT32 | INT(bits=8, signed=true) +// int16 -> INT32 | INT(bits=16, signed=true) +// int32 -> INT32 | INT(bits=32, signed=true) +// int64 -> INT64 | INT(bits=64, signed=true) +// uint -> INT32 | INT(bits=64, signed=false) +// uint8 -> INT32 | INT(bits=8, signed=false) +// uint16 -> INT32 | INT(bits=16, signed=false) +// uint32 -> INT32 | INT(bits=32, signed=false) +// uint64 -> INT64 | INT(bits=64, signed=false) +// float32 -> FLOAT +// float64 -> DOUBLE +// string -> BYTE_ARRAY | STRING +// +// Go composite types: +// +// Array/slice: []T -> +// group (LIST) { +// repeated group list { +// element; +// } +// } +// +// Maps: map[K]V -> +// group (MAP) { +// repeated group key_value { +// required key; +// value; +// } +// } +// +// Structs: -> +// [optional] group { +// fields... +// } +// +// Special cases: +// +// Byte array/slice: []byte -> BYTE_ARRAY +// Timestamp: time.Time -> INT64 | TIMESTAMP(isAdjustedToUTC=true, precision=NANOS) +// +// All pointers are treated as optional. +// +// Types not listed here are not supported. +func StructPlannerOf(v interface{}) *StructPlanner { + t := reflect.TypeOf(v) + t = dereference(t) + + root := &blueprint{ + schema: &Schema{}, + } + bpFromStruct(root, t) + root.schema.Name = "root" + root.schema.Compute() + + index := map[*Schema]*blueprint{} + root.register(index) + + return &StructPlanner{ + blueprint: root, + index: index, + } +} + +func (sp *StructPlanner) Builder() *StructBuilder { + return &StructBuilder{ + index: sp.index, + } +} + +func (sp *StructPlanner) Plan() *Plan { + return &Plan{ + s: sp.blueprint.schema, + } +} + +// setFn is a function executed by the StructBuilder when setting a value on a +// container (struct field, slice, map entry). +// The function should return an addressable version of the value that was set. +type setFn func(stack *valueStack, value reflect.Value) reflect.Value + +// blueprint is a structure that parallels a schema, providing the information +// to build the actual Go types. +type blueprint struct { + schema *Schema + // Call this function to decode the value from readers. + read func(d Decoder) (reflect.Value, error) + // Call this function to set the value on the parent container. + set setFn + // Call this function create the initial value of this container. + create func() reflect.Value + + parent *blueprint + children []*blueprint + + fieldPath []int // path to a field in nested strings +} + +func (bp *blueprint) add(child *blueprint) { + bp.children = append(bp.children, child) + child.parent = bp +} + +func (bp *blueprint) register(index map[*Schema]*blueprint) { + index[bp.schema] = bp + for _, c := range bp.children { + c.register(index) + } +} + +func bpFromStruct(p *blueprint, t reflect.Type) { + p.create = func() reflect.Value { + return reflect.Zero(t) + } + + n := t.NumField() + for i := 0; i < n; i++ { + field := t.Field(i) + if field.PkgPath != "" { + // ignore non-exported fields + continue + } + name := normalizeName(field.Name) + child := &blueprint{ + schema: &Schema{ + Name: name, + }, + fieldPath: newFieldPath(p.fieldPath, i), + } + child.set = makeSetStructFieldFn(child) + + bpFromAny(child, field.Type) + p.schema.Add(child.schema) + p.add(child) + } +} + +func makeSetStructFieldFn(p *blueprint) setFn { + return func(stack *valueStack, value reflect.Value) reflect.Value { + stack.view(len(p.fieldPath) - 1).top().FieldByIndex(p.fieldPath).Set(value) + return value + } +} + +func bpFromMap(p *blueprint, t reflect.Type) { + p.schema.ConvertedType = pthrift.ConvertedTypePtr(pthrift.ConvertedType_MAP) + p.schema.Repetition = pthrift.FieldRepetitionType_REQUIRED + p.create = func() reflect.Value { + return reflect.MakeMap(t) + } + + keyValue := &Schema{ + Name: "key_value", + Repetition: pthrift.FieldRepetitionType_REPEATED, + } + p.schema.Add(keyValue) + + key := &Schema{ + Name: "key", + Repetition: pthrift.FieldRepetitionType_REQUIRED, + } + keyValue.Add(key) + + keyBp := &blueprint{ + schema: key, + } + p.add(keyBp) + bpFromAny(keyBp, t.Key()) + keyBp.set = func(stack *valueStack, value reflect.Value) reflect.Value { + stack.push(value) // push the value on the stack to remember for the value set + return value + } + + value := &Schema{ + Name: "value", + } + keyValue.Add(value) + + valueBp := &blueprint{ + schema: value, + } + p.add(valueBp) + bpFromAny(valueBp, t.Elem()) + valueBp.set = func(stack *valueStack, value reflect.Value) reflect.Value { + // Top of the stack is the key. + key := stack.top() + stack.pop() + // Second to the top is the actual map. + stack.top().SetMapIndex(key, value) + return value + } +} + +func bpFromSlice(p *blueprint, t reflect.Type) { + p.schema.ConvertedType = pthrift.ConvertedTypePtr(pthrift.ConvertedType_LIST) + p.schema.Repetition = pthrift.FieldRepetitionType_REQUIRED + p.create = func() reflect.Value { + return reflect.Zero(t) + } + + list := &Schema{ + Name: "list", + Repetition: pthrift.FieldRepetitionType_REPEATED, + } + p.schema.Add(list) + + element := &Schema{ + Name: "element", + } + list.Add(element) + + ebp := &blueprint{ + schema: element, + } + p.add(ebp) + bpFromAny(ebp, t.Elem()) + ebp.set = makeSetSliceFn(ebp) +} + +func makeSetSliceFn(p *blueprint) setFn { + return func(stack *valueStack, value reflect.Value) reflect.Value { + // Expect top of stack to be the slice. + idx := stack.top().Len() + slice := reflect.Append(stack.top(), value) + stack.replace(slice) + + // Re-set the stack on the parent as it may have been reallocated. + view := stack.view(1) + p.parent.set(view, slice) + + return slice.Index(idx) + } +} + +func newFieldPath(path []int, i int) []int { + newPath := make([]int, len(path)+1) + copy(newPath, path) + newPath[len(path)] = i + return newPath +} + +func bpFromAny(p *blueprint, t reflect.Type) { + t = dereference(t) + + switch t.Kind() { + case reflect.Struct: + bpFromStruct(p, t) + case reflect.Slice: + bpFromSlice(p, t) + case reflect.Map: + bpFromMap(p, t) + default: + bpFromPrimitive(p, t) + } +} + +// fromPrimitive creates a schema leaf for a Go type that maps directly to a +// Parquet primitive type. +func bpFromPrimitive(p *blueprint, t reflect.Type) { + switch t.Kind() { + case reflect.Uint16: + p.schema.PhysicalType = pthrift.Type_INT32 + p.schema.ConvertedType = pthrift.ConvertedTypePtr(pthrift.ConvertedType_UINT_16) + p.schema.LogicalType = pthrift.NewLogicalType() + p.schema.LogicalType.INTEGER = pthrift.NewIntType() + p.schema.LogicalType.INTEGER.BitWidth = 16 + p.schema.LogicalType.INTEGER.IsSigned = false + p.read = func(d Decoder) (reflect.Value, error) { + v, err := d.Int32() + if err != nil { + return reflect.Value{}, err + } + return reflect.ValueOf(uint16(v)), nil + } + case reflect.Int32: + p.schema.PhysicalType = pthrift.Type_INT32 + p.read = func(d Decoder) (reflect.Value, error) { + v, err := d.Int32() + if err != nil { + return reflect.Value{}, err + } + return reflect.ValueOf(v), nil + } + case reflect.Int64: + p.schema.PhysicalType = pthrift.Type_INT64 + p.read = func(d Decoder) (reflect.Value, error) { + v, err := d.Int64() + if err != nil { + return reflect.Value{}, err + } + return reflect.ValueOf(v), nil + } + case reflect.String: + p.schema.PhysicalType = pthrift.Type_BYTE_ARRAY + p.schema.ConvertedType = pthrift.ConvertedTypePtr(pthrift.ConvertedType_UTF8) + p.schema.LogicalType = pthrift.NewLogicalType() + p.schema.LogicalType.STRING = pthrift.NewStringType() + p.read = func(d Decoder) (reflect.Value, error) { + b, err := d.ByteArray(nil) + if err != nil { + return reflect.Value{}, err + } + return reflect.ValueOf(string(b)), nil + } + default: + panic(fmt.Errorf("unhandled kind: %s", t.Kind())) + } +} + +// recursively dereference a pointer type to a non pointer type +func dereference(t reflect.Type) reflect.Type { + if t.Kind() != reflect.Ptr { + return t + } + return dereference(t.Elem()) +} + +func normalizeName(name string) string { + // TODO: cache + var b strings.Builder + prevUpper := false + for i, c := range name { + if unicode.IsUpper(c) { + if !prevUpper && i > 0 { + b.WriteRune('_') + } + prevUpper = true + } else { + prevUpper = false + } + b.WriteRune(unicode.ToLower(c)) + } + return b.String() +} + +// Because StructBuilder gets called back from the reads it needs to keep track +// of where it's at in the structure it is building. +type valueStack struct { + stack []reflect.Value +} + +func (s *valueStack) push(v reflect.Value) { + s.stack = append(s.stack, v) +} + +func (s *valueStack) pop() { + s.stack = s.stack[:len(s.stack)-1] +} + +func (s *valueStack) replace(value reflect.Value) { + s.stack[len(s.stack)-1] = value +} + +func (s *valueStack) top() reflect.Value { + return s.stack[len(s.stack)-1] +} + +// view(0) is the same as the stack itself +func (s *valueStack) view(offset int) *valueStack { + // TODO: that's on the hot path. not great + return &valueStack{stack: s.stack[:len(s.stack)-offset]} +}