Skip to content

Commit

Permalink
Initial pass on logical types
Browse files Browse the repository at this point in the history
  • Loading branch information
xentripetal committed Jan 14, 2022
1 parent 788fd02 commit 251053e
Show file tree
Hide file tree
Showing 18 changed files with 327 additions and 148 deletions.
28 changes: 15 additions & 13 deletions format/avro/decoders/array.go
Expand Up @@ -3,12 +3,13 @@ package decoders
import (
"errors"
"fmt"
"github.com/wader/fq/pkg/scalar"

"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

func decodeArrayFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
func decodeArrayFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
if schema.Items == nil {
return nil, errors.New("array schema must have items")
}
Expand All @@ -18,18 +19,18 @@ func decodeArrayFn(schema schema.SimplifiedSchema) (func(string, *decode.D), err
return nil, fmt.Errorf("failed getting decode fn for array item: %w", err)
}

//Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array
//items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
//If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block
//size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when
//projecting a record to a subset of its fields.
// Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array
// items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
// If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block
// size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when
// projecting a record to a subset of its fields.
// For example, the array schema {"type": "array", "items": "long"}
// an array containing the items 3 and 27 could be encoded as the long value 2 (encoded as hex 04)
// followed by long values 3 and 27 (encoded as hex 06 36) terminated by zero:
// 04 06 36 00

//For example, the array schema {"type": "array", "items": "long"}
//an array containing the items 3 and 27 could be encoded as the long value 2 (encoded as hex 04)
//followed by long values 3 and 27 (encoded as hex 06 36) terminated by zero:
//04 06 36 00

return func(name string, d *decode.D) {
return func(name string, d *decode.D) interface{} {
var values []interface{}
d.FieldArray(name, func(d *decode.D) {
count := int64(-1)
for count != 0 {
Expand All @@ -41,11 +42,12 @@ func decodeArrayFn(schema schema.SimplifiedSchema) (func(string, *decode.D), err
}
d.FieldArray("data", func(d *decode.D) {
for i := int64(0); i < count; i++ {
valueD("entry", d)
values = append(values, valueD("entry", d))
}
})
})
}
})
return values
}, nil
}
9 changes: 5 additions & 4 deletions format/avro/decoders/bool.go
Expand Up @@ -3,12 +3,13 @@ package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

func decodeBoolFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a boolean is written as a single byte whose value is either 0 (false) or 1 (true).
return func(name string, d *decode.D) {
d.FieldBoolFn(name, func(d *decode.D) bool {
func decodeBoolFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
// A boolean is written as a single byte whose value is either 0 (false) or 1 (true).
return func(name string, d *decode.D) interface{} {
return d.FieldBoolFn(name, func(d *decode.D) bool {
return d.U8() >= 1
})
}, nil
Expand Down
25 changes: 18 additions & 7 deletions format/avro/decoders/bytes.go
Expand Up @@ -3,16 +3,27 @@ package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

type BytesCodec struct{}

func decodeBytesFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//bytes are encoded as a long followed by that many bytes of data.
return func(name string, d *decode.D) {
// What if its a record with a field called name_len?
// using a struct is probably a better idea. But it makes it less usable
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldRawLen("name", length*8)
func decodeBytesFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
// Bytes are encoded as a long followed by that many bytes of data.
return func(name string, d *decode.D) interface{} {
var val []byte
var err error

d.FieldStruct(name, func(d *decode.D) {
length := d.FieldSFn("length", VarZigZag)
bb := d.FieldRawLen("data", length*8, sms...)

val, err = bb.Bytes()
if err != nil {
d.Fatalf("failed to read %s bytes: %v", name, err)
}
})

return val
}, nil
}
44 changes: 0 additions & 44 deletions format/avro/decoders/codec.go

This file was deleted.

51 changes: 51 additions & 0 deletions format/avro/decoders/decoders.go
@@ -0,0 +1,51 @@
package decoders

import (
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

type DecodeFn func(string, *decode.D) interface{}

func DecodeFnForSchema(s schema.SimplifiedSchema) (DecodeFn, error) {
var sms []scalar.Mapper
mapper := logicalMapperForSchema(s)
if mapper != nil {
sms = append(sms, mapper)
}

switch s.Type {
case schema.ARRAY:
return decodeArrayFn(s, sms...)
case schema.BOOLEAN:
return decodeBoolFn(s, sms...)
case schema.BYTES:
return decodeBytesFn(s, sms...)
case schema.DOUBLE:
return decodeDoubleFn(s, sms...)
case schema.ENUM:
return decodeEnumFn(s, sms...)
case schema.FIXED:
return decodeFixedFn(s, sms...)
case schema.FLOAT:
return decodeFloatFn(s, sms...)
case schema.INT:
return decodeIntFn(s, sms...)
case schema.LONG:
return decodeLongFn(s, sms...)
case schema.NULL:
return decodeNullFn(s, sms...)
case schema.RECORD:
return decodeRecordFn(s, sms...)
case schema.STRING:
return decodeStringFn(s, sms...)
case schema.UNION:
return decodeUnionFn(s, sms...)
case schema.MAP:
return decodeMapFn(s, sms...)
default:
return nil, fmt.Errorf("unknown type: %s", s.Type)
}
}
11 changes: 6 additions & 5 deletions format/avro/decoders/double.go
Expand Up @@ -3,12 +3,13 @@ package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

func decodeDoubleFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a double is written as 8 bytes. The double is converted into a 64-bit integer using a method equivalent to Java's
//doubleToLongBits and then encoded in little-endian format.
return func(name string, d *decode.D) {
d.FieldF64(name)
func decodeDoubleFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
// A double is written as 8 bytes. The double is converted into a 64-bit integer using a method equivalent to Java's
// doubleToLongBits and then encoded in little-endian format.
return func(name string, d *decode.D) interface{} {
return d.FieldF64(name)
}, nil
}
15 changes: 9 additions & 6 deletions format/avro/decoders/enum.go
Expand Up @@ -8,23 +8,26 @@ import (
"github.com/wader/fq/pkg/scalar"
)

func decodeEnumFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
func decodeEnumFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
if len(schema.Symbols) == 0 {
return nil, errors.New("enum requires symbols")
}

//An enum is encoded by a int, representing the zero-based position of the symbol in the schema.
//For example, consider the enum:
// An enum is encoded by an int, representing the zero-based position of the symbol in the schema.
// For example, consider the enum:
// {"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }
//This would be encoded by an int between zero and three, with zero indicating "A", and 3 indicating "D".
return func(name string, d *decode.D) {
// This would be encoded by an int between zero and three, with zero indicating "A", and 3 indicating "D".
return func(name string, d *decode.D) interface{} {
var val string
d.FieldSFn(name, VarZigZag, scalar.Fn(func(s scalar.S) (scalar.S, error) {
v := int(s.ActualS())
if v < 0 || v >= len(schema.Symbols) {
return s, errors.New("enum value of out range")
}
s.Sym = schema.Symbols[v]
val = schema.Symbols[v]
s.Sym = val
return s, nil
}))
return val
}, nil
}
14 changes: 10 additions & 4 deletions format/avro/decoders/fixed.go
Expand Up @@ -2,18 +2,24 @@ package decoders

import (
"errors"
"github.com/wader/fq/pkg/scalar"

"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)

func decodeFixedFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
func decodeFixedFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
if schema.Size < 0 {
return nil, errors.New("fixed size must be greater than or equal to zero")
}
size := int64(schema.Size)
//Fixed instances are encoded using the number of bytes declared in the schema.
return func(name string, d *decode.D) {
d.FieldRawLen(name, size*8)
// Fixed instances are encoded using the number of bytes declared in the schema.
return func(name string, d *decode.D) interface{} {
bb := d.FieldRawLen(name, size*8, sms...)
val, err := bb.Bytes()
if err != nil {
d.Fatalf("failed to read fixed %s value: %v", name, err)
}
return val
}, nil
}
11 changes: 6 additions & 5 deletions format/avro/decoders/float.go
Expand Up @@ -3,12 +3,13 @@ package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

func decodeFloatFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a float is written as 4 bytes. The float is converted into a 32-bit integer using a method equivalent to Java's
//floatToIntBits and then encoded in little-endian format.
return func(name string, d *decode.D) {
d.FieldF32(name)
func decodeFloatFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
// A float is written as 4 bytes. The float is converted into a 32-bit integer using a method equivalent to Java's
// floatToIntBits and then encoded in little-endian format.
return func(name string, d *decode.D) interface{} {
return d.FieldF32(name)
}, nil
}
9 changes: 5 additions & 4 deletions format/avro/decoders/int.go
Expand Up @@ -3,11 +3,12 @@ package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)

func decodeIntFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
// int and long values are written using variable-length zig-zag coding.
return func(name string, d *decode.D) {
d.FieldSFn(name, VarZigZag)
func decodeIntFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
// Int and long values are written using variable-length zig-zag coding.
return func(name string, d *decode.D) interface{} {
return d.FieldSFn(name, VarZigZag)
}, nil
}

0 comments on commit 251053e

Please sign in to comment.