Skip to content

Commit

Permalink
initial work for avro OCF files
Browse files Browse the repository at this point in the history
  • Loading branch information
xentripetal committed Dec 28, 2021
1 parent eb4a6fd commit d6ca481
Show file tree
Hide file tree
Showing 23 changed files with 551 additions and 0 deletions.
18 changes: 18 additions & 0 deletions format/avro/codecs/array.go
@@ -0,0 +1,18 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type BoolCodec struct{}

func (l BoolCodec) Decode(name string, d *decode.D) {
d.FieldBoolFn(name, func(d *decode.D) bool {
return d.U8() >= 1
})
}

func BuildBoolCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BoolCodec{}, nil
}
22 changes: 22 additions & 0 deletions format/avro/codecs/bool.go
@@ -0,0 +1,22 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)


type NullCodec struct {}

func (l NullCodec) Decode(d *decode.D) interface{}{
// null is written as zero bytes.
return nil
}

func (l NullCodec) Type() CodecType {
return SCALAR
}

func BuildNullCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &NullCodec{}, nil
}
20 changes: 20 additions & 0 deletions format/avro/codecs/bytes.go
@@ -0,0 +1,20 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type FloatCodec struct{}

func (l FloatCodec) Decode(d *decode.D) interface{} {
return d.F32()
}

func (l FloatCodec) Type() CodecType {
return SCALAR
}

func BuildFloatCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FloatCodec{}, nil
}
1 change: 1 addition & 0 deletions format/avro/codecs/codec.go
@@ -0,0 +1 @@
package codecs
20 changes: 20 additions & 0 deletions format/avro/codecs/double.go
@@ -0,0 +1,20 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type FloatCodec struct{}

func (l FloatCodec) Decode(d *decode.D) interface{} {
return d.F32()
}

func (l FloatCodec) Type() CodecType {
return SCALAR
}

func BuildFloatCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FloatCodec{}, nil
}
22 changes: 22 additions & 0 deletions format/avro/codecs/enum.go
@@ -0,0 +1,22 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type BytesCodec struct{}

func (l BytesCodec) Decode(d *decode.D) interface{} {
length := d.FieldSFn("length", VarZigZag)
d.FieldRawLen("value", length*8)
return nil
}

func (l BytesCodec) Type() CodecType {
return STRUCT
}

func BuildBytesCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BytesCodec{}, nil
}
19 changes: 19 additions & 0 deletions format/avro/codecs/fixed.go
@@ -0,0 +1,19 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type BytesCodec struct{}

func (l BytesCodec) Decode(name string, d *decode.D) {
// What if its a record with a field called name_len?
// using a struct is probably a better idea. But it makes it less usable
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldRawLen("name", length*8)
}

func BuildBytesCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BytesCodec{}, nil
}
38 changes: 38 additions & 0 deletions format/avro/codecs/float.go
@@ -0,0 +1,38 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

const intMask = byte(127)
const intFlag = byte(128)

// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}

type LongCodec struct {}

func (l LongCodec) Decode(d *decode.D) interface{}{
return VarZigZag(d)
}

func (l LongCodec) Type() CodecType {
return SCALAR
}

func BuildLongCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &LongCodec{}, nil
}
22 changes: 22 additions & 0 deletions format/avro/codecs/int.go
@@ -0,0 +1,22 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)


type BoolCodec struct {}

func (l BoolCodec) Decode(d *decode.D) interface{}{
// a boolean is written as a single byte whose value is either 0 (false) or 1 (true).
return d.U8() != 0
}

func (l BoolCodec) Type() CodecType {
return SCALAR
}

func BuildBoolCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BoolCodec{}, nil
}
1 change: 1 addition & 0 deletions format/avro/codecs/long.go
@@ -0,0 +1 @@
package codecs
45 changes: 45 additions & 0 deletions format/avro/codecs/map.go
@@ -0,0 +1,45 @@
package codecs

import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type ArrayCodec struct {
valueCodec Codec
}

func (l ArrayCodec) Decode(name string, d *decode.D) {
d.FieldArray(name, func(d *decode.D) {
count := int64(-1)
for count != 0 {
d.FieldStruct(name, func(d *decode.D) {
count = d.FieldSFn("count", VarZigZag)
if count < 0 {
d.FieldSFn("size", VarZigZag)
count *= -1
}
d.FieldArray("entries", func(d *decode.D) {
for i := int64(0); i < count; i++ {
l.valueCodec.Decode("entry", d)
}
})
})
}
})
}

func BuildArrayCodec(schema schema.SimplifiedSchema) (Codec, error) {
if schema.Items == nil {
return nil, errors.New("array schema must have items")
}

valueCodec, err := BuildCodec(*schema.Items)
if err != nil {
return nil, fmt.Errorf("ArrayCodec: %s", err)
}

return &ArrayCodec{valueCodec: valueCodec}, nil
}
38 changes: 38 additions & 0 deletions format/avro/codecs/null.go
@@ -0,0 +1,38 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

const intMask = byte(127)
const intFlag = byte(128)

// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}

type LongCodec struct {}

func (l LongCodec) Decode(d *decode.D) interface{}{
return VarZigZag(d)
}

func (l LongCodec) Type() CodecType {
return SCALAR
}

func BuildLongCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &LongCodec{}, nil
}
15 changes: 15 additions & 0 deletions format/avro/codecs/record.go
@@ -0,0 +1,15 @@
package codecs

import "github.com/wader/fq/pkg/decode"


type StringCodec struct {}

func (l StringCodec) Decode(d *decode.D) {
length := d.FieldSFn("length", VarZigZag)
d.FieldUTF8("value", int(length))
}

func BuildStringCodec(schema SimplifiedSchema) (Codec, error) {
return &StringCodec{}, nil
}
31 changes: 31 additions & 0 deletions format/avro/codecs/string.go
@@ -0,0 +1,31 @@
package codecs

import "github.com/wader/fq/pkg/decode"

const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}

type LongCodec struct {}

func (l LongCodec) Decode(d *decode.D) {
d.Value.V = VarZigZag(d)
}

func BuildLongCodec(schema SimplifiedSchema) (Codec, error) {
c := LongCodec{}
return &c, nil
}
26 changes: 26 additions & 0 deletions format/avro/codecs/union.go
@@ -0,0 +1,26 @@
package codecs

import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

type EnumCodec struct{
symbols []string
}

func (l EnumCodec) Decode(d *decode.D) interface{} {
value := int(VarZigZag(d))
if value >= len(l.symbols) {
d.Fatalf("invalid enum value: %d", value)
}
return l.symbols[value]
}

func (l EnumCodec) Type() CodecType {
return SCALAR
}

func BuildEnumCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &EnumCodec{symbols: schema.Symbols}, nil
}

0 comments on commit d6ca481

Please sign in to comment.