Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
StructBuilder (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
pelletier committed Oct 27, 2020
1 parent 23af6a3 commit 1e38054
Show file tree
Hide file tree
Showing 22 changed files with 1,158 additions and 573 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ internal/gen-go/parquet/parquet.go: Dockerfile.thrift

buildkite:
docker build . -f Dockerfile.buildkite -t parquet-buildkite
docker run -ti parquet-buildkite go test ./...
docker run -ti parquet-buildkite go test -race ./...
15 changes: 1 addition & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ To test against the parquet-tools reference implementation, make sure to have
`parquet-tools` in your `PATH`. [Build it][build] or [download it
pre-built][dl].

️Buildkite does not test against parquet-tools because our base docker images
️ Buildkite does not test against parquet-tools because our base docker images
don't have both Go and Java available.

[build]: https://github.com/apache/parquet-mr/tree/master/parquet-tools
Expand All @@ -16,16 +16,3 @@ don't have both Go and Java available.

* https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
* https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36632.pdf

## Todo

- [x] `parquet-tools cat` reimplementation.
- [x] Use builder to remove allocs.
- [ ] RowReader projection.
- [ ] Predicates pushdown.
- [ ] Re-introduce Column interface.
- [ ] ReadSeeker => ReaderAt + size
- [ ] S3 partial reader.
- [ ] Benchmark with parquet-go.
- [ ] Writer.
- [ ] Out-of-band writer.
2 changes: 0 additions & 2 deletions cmd/ptools/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,10 @@ func (p *prettyPrinter) prefix() int {
}

func (p *prettyPrinter) Begin() {
debug.Format("BEGIN")
p.stackClear()
}

func (p *prettyPrinter) Primitive(s *parquet.Schema, d parquet.Decoder) error {
debug.Format("PRIMITIVE: %s", s.Path)
valueDepth := p.depth()
for _, c := range s.Path[p.prefix() : len(s.Path)-1] {
p.writeDepth(valueDepth)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ptools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"

color "github.com/logrusorgru/aurora/v3"
"github.com/segmentio/parquet/internal/debug"
"github.com/segmentio/cli"
"github.com/segmentio/parquet/internal/debug"
)

func main() {
Expand Down
32 changes: 12 additions & 20 deletions decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ func readUvarint(r io.Reader) (uint64, uint32, error) {
var read uint32
buff := []byte{0}
for i := 0; ; i++ {
_, err := r.Read(buff)
n, err := r.Read(buff)
if err != nil {
return x, read, err
}
if n != len(buff) {
panic("was supposed to read 1 byte")
}
b := buff[0]
read++
if b < 0x80 {
Expand All @@ -44,14 +47,6 @@ type Decoder interface {
ByteArray(dst []byte) ([]byte, error)
}

type EmptyDecoder struct{}

func (e *EmptyDecoder) prepare(r io.Reader) {}
func (e *EmptyDecoder) Int32() (int32, error) { return 0, nil }
func (e *EmptyDecoder) Int64() (int64, error) { return 0, nil }
func (e *EmptyDecoder) Uint32(bitWidth int, out []uint32) error { return nil }
func (e *EmptyDecoder) ByteArray() ([]byte, error) { return nil, nil }

// Construct a Decoder for a given encoding.
// Technically, not all encoding work everywhere. Let's say good-enough for now.
func decoderFor(enc pthrift.Encoding) (Decoder, error) {
Expand Down Expand Up @@ -84,10 +79,13 @@ func (d *rleDecoder) prepare(r io.Reader) {
// All those values fit in uint32.
func (d *rleDecoder) Uint32(bitWidth int, out []uint32) error {
b := d.buff[:]
_, err := d.r.Read(b)
n, err := d.r.Read(b)
if err != nil {
return err
}
if n != len(b) {
panic("not enough bytes")
}

size := binary.LittleEndian.Uint32(b)
read := uint32(0)
Expand Down Expand Up @@ -115,16 +113,10 @@ func (d *rleDecoder) Uint32(bitWidth int, out []uint32) error {
bytesInBuffer = 4
}

if len(d.scratch) < bytesInBuffer {
v := bytesInBuffer
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
d.scratch = make([]byte, v)
if cap(d.scratch) < bytesInBuffer {
d.scratch = make([]byte, bytesInBuffer)
} else {
d.scratch = d.scratch[:bytesInBuffer]
}

data := d.scratch[:bytesInRLE]
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.15
require (
github.com/apache/thrift v0.13.0
github.com/golang/snappy v0.0.2
github.com/iancoleman/strcase v0.1.2
github.com/logrusorgru/aurora/v3 v3.0.0
github.com/pmezard/go-difflib v1.0.0
github.com/segmentio/cli v0.3.4
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/hashicorp/go-uuid v0.0.0-20180228145832-27454136f036/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/iancoleman/strcase v0.1.2 h1:gnomlvw9tnV3ITTAxzKSgTF+8kFWcU/f+TgttpXGz1U=
github.com/iancoleman/strcase v0.1.2/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
Expand Down
64 changes: 64 additions & 0 deletions internal/benchmark/schema_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package benchmark_test

import (
"log"
"os"
"testing"

"github.com/segmentio/parquet"
"github.com/segmentio/parquet/internal/benchmark"
"github.com/stretchr/testify/require"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
)

func BenchmarkReflectReadRow(b *testing.B) {
p := "../../examples/stage-small.parquet"
planner := parquet.StructPlannerOf(new(benchmark.Trace))
builder := planner.Builder()
plan := planner.Plan()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
f, err := os.Open(p)
require.NoError(b, err)

pf, err := parquet.OpenFile(f)
require.NoError(b, err)
rowReader := parquet.NewRowReaderWithPlan(plan, pf)

for {
target := &benchmark.Trace{}
err := rowReader.Read(builder.To(target))
if err == parquet.EOF {
break
}
require.NoError(b, err)
}
require.NoError(b, f.Close())
}
}

func BenchmarkReflectReadRowParquetGo(b *testing.B) {
p := "../../examples/stage-small.parquet"
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr, err := local.NewLocalFileReader(p)
if err != nil {
log.Println("Can't open file", err)
return
}
pr, err := reader.NewParquetReader(fr, new(benchmark.Trace), 1)
require.NoError(b, err)
num := int(pr.GetNumRows())
target := make([]benchmark.Trace, 1, 1)
for i := 0; i < num; i++ {
err := pr.Read(&target)
require.NoError(b, err)
}
pr.ReadStop()
require.NoError(b, err, fr.Close())
}
}
49 changes: 49 additions & 0 deletions internal/benchmark/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package benchmark

type Tag struct {
Name string `parquet:"name=name, type=UTF8"`
Value string `parquet:"name=value, type=UTF8"`
}

type Trace struct {
kafkaTopic string //not exporting it to parquet
KafkaPartition int64 `parquet:"name=kafka_partition, type=INT_64"`
KafkaOffset int64 `parquet:"name=kafka_offset, type=INT_64"`
TimestampMs int64 `parquet:"name=timestamp_ms, type=INT_64"`
TraceID string `parquet:"name=trace_id, type=UTF8"`
SpanID string `parquet:"name=span_id, type=UTF8"`
ParentSpanID string `parquet:"name=parent_span_id, type=UTF8"`
NextTraceID string `parquet:"name=next_trace_id, type=UTF8"`
ParentTraceIDs []string `parquet:"name=parent_trace_ids, type=LIST, valuetype=UTF8"`
Baggage []Tag `parquet:"name=baggage, type=LIST"`
Tags []Tag `parquet:"name=tags, type=LIST"`

// Extracted from the exchange payload. Will be nil if they are not
// present in the payload or if the payload cannot be unmarshalled.
MessageID string `parquet:"name=message_id, type=UTF8"`
UserID string `parquet:"name=user_id, type=UTF8"`
EventType string `parquet:"name=event_type, type=UTF8"`
SourceID string `parquet:"name=source_id, type=UTF8"`
DestinationID string `parquet:"name=destination_id, type=UTF8"`
WorkspaceID string `parquet:"name=workspace_id, type=UTF8"`

Name string `parquet:"name=name, type=UTF8"`
SpanTime int64 `parquet:"name=span_time, type=INT_64"` // ms
SpanDuration int64 `parquet:"name=span_duration, type=INT_64"` // ms

ExchangeTime int64 `parquet:"name=exchange_time, type=INT_64"` // ms
ExchangeDuration int32 `parquet:"name=exchange_duration, type=INT_32"` // ms

ExchangeRequestMethod string `parquet:"name=exchange_request_method, type=UTF8"`
ExchangeRequestURL string `parquet:"name=exchange_request_url, type=UTF8"`
ExchangeRequestHeaders map[string]string `parquet:"name=exchange_request_headers, type=MAP, keytype=UTF8, valuetype=UTF8"`
ExchangeRequestBody string `parquet:"name=exchange_request_body, type=UTF8"`

ExchangeResponseStatusCode uint16 `parquet:"name=exchange_response_status_code, type=UINT_16"`
ExchangeResponseStatusText string `parquet:"name=exchange_response_status_text, type=UTF8"`
ExchangeResponseHeaders map[string]string `parquet:"name=exchange_response_headers, type=UTF8, keytype=UTF8, valuetype=UTF8"`
ExchangeResponseBody string `parquet:"name=exchange_response_body, type=UTF8"`

ExchangeErrorType string `parquet:"name=exchange_error_type, type=UTF8"`
ExchangeErrorMessage string `parquet:"name=exchange_error_message, type=UTF8"`
}
2 changes: 1 addition & 1 deletion internal/thrift/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (tr *Reader) Fork() *Reader {
// Read implements the io.Reader interface, forwarding the call to the
// underlying Thrift protocol Reader.
func (tr *Reader) Read(p []byte) (int, error) {
debug.Format("thrift: Reader: Read %d", len(p))
debug.Format("thrift: Reader: read %d", len(p))
return tr.protocol.Transport().Read(p)
}

Expand Down
Loading

0 comments on commit 1e38054

Please sign in to comment.