Skip to content

Commit

Permalink
Merge branch 'master' into enable-linters
Browse files Browse the repository at this point in the history
  • Loading branch information
xmcqueen committed Aug 19, 2022
2 parents d97d7bd + 442327d commit be79bb9
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 51 deletions.
89 changes: 88 additions & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,98 @@ func NewCodec(schemaSpecification string) (*Codec, error) {
})
}

// NewCodecForStandardJSON returns a codec that uses a special union
// processing code that allows normal json to be ingested via an
// avro schema, by inferring the "type" intended for union types.
//
// This is the one-way code to get such json into the avro system
// and the deserialization is not supported in this codec - its
// json into avro-json one-way and one-way only for this codec.
//
// The "type" inference is done by using the types specified as
// potentially acceptable types for the union, and trying to
// unpack the incomin json into each of the specified types for
// the union type. See union.go +/Standard JSON/ for a general
// description of the problem and details of the solution
// are in union.go +/nativeAvroFromTextualJson/
//
// For a general description of a codex seen the comment for NewCodec
// above.
//
// The following is the exact same schema used in the above
// code for NewCodec:
//
// codec, err := goavro.NewCodecForStandardJSON(`
// {
// "type": "record",
// "name": "LongList",
// "fields" : [
// {"name": "next", "type": ["null", "LongList"], "default": null}
// ]
// }`)
// if err != nil {
// fmt.Println(err)
// }
//
// The above will take json of this sort:
//
// {"next": null}
//
// {"next":{"next":null}}
//
// {"next":{"next":{"next":null}}}
//
// For more examples see the test cases in union_test.go
func NewCodecForStandardJSON(schemaSpecification string) (*Codec, error) {
return NewCodecFrom(schemaSpecification, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceJSON,
buildCodecForTypeDescribedBySliceOneWayJson,
})
}

// NewCodecForStandardJSONOneWay is an alias for NewCodecForStandardJSON
// added to make the transition to two-way json handling more smooth
//
// This will unambiguously provide OneWay avro encoding for standard
// internet json. This takes in internet json, and brings it into
// the avro world, but the deserialization retains the unique
// form of normal avro-friendly json where unions have their
// types types specified in stream like this example from
// the official docs // https://avro.apache.org/docs/1.11.1/api/c/
//
// `{"string": "Follow your bliss."}`
//
// To be clear this means the incoming json string:
//
// "Follow your bliss."
//
// would deserialize according to the avro-json expectations to:
//
// `{"string": "Follow your bliss."}`
//
// To get full two-way support see the below NewCodecForStandardJSONFull
func NewCodecForStandardJSONOneWay(schemaSpecification string) (*Codec, error) {
return NewCodecForStandardJSON(schemaSpecification)
}

// NewCodecForStandardJSONFull provides full serialization/deserialization
// for json that meets the expectations of regular internet json, viewed as
// something distinct from avro-json which has special handling for union
// types. For details see the above comments.
//
// With this `codec` you can expect to see a json string like this:
//
// "Follow your bliss."
//
// to deserialize into the same json structure
//
// "Follow your bliss."
func NewCodecForStandardJSONFull(schemaSpecification string) (*Codec, error) {
return NewCodecFrom(schemaSpecification, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceTwoWayJson,
})
}

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/linkedin/goavro/v2

go 1.12

require github.com/golang/snappy v0.0.1
require (
github.com/golang/snappy v0.0.1
github.com/stretchr/testify v1.7.5
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,16 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45 changes: 43 additions & 2 deletions text_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ package goavro

import (
"bytes"
"encoding/json"
"fmt"
"math"
"testing"

"github.com/stretchr/testify/assert"
)

func testTextDecodeFail(t *testing.T, schema string, buf []byte, errorMessage string) {
Expand Down Expand Up @@ -63,18 +66,56 @@ func testTextDecodePass(t *testing.T, schema string, datum interface{}, encoded
}
toNativeAndCompare(t, schema, datum, encoded, codec)
}
func testJSONDecodePass(t *testing.T, schema string, datum interface{}, encoded []byte) {
func testJsonDecodePass(t *testing.T, schema string, datum interface{}, encoded []byte) {
t.Helper()
codec, err := NewCodecFrom(schema, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceJSON,
buildCodecForTypeDescribedBySliceOneWayJson,
})
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}
toNativeAndCompare(t, schema, datum, encoded, codec)
}
func testNativeToTextualJsonPass(t *testing.T, schema string, datum interface{}, encoded []byte) {
t.Helper()
codec, err := NewCodecFrom(schema, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceTwoWayJson,
})
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}
toTextualAndCompare(t, schema, datum, encoded, codec)
}

func toTextualAndCompare(t *testing.T, schema string, datum interface{}, expected []byte, codec *Codec) {
t.Helper()
decoded, err := codec.TextualFromNative(nil, datum)
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}

// do extra stuff to to the challenge equality of maps
var want interface{}

if err := json.Unmarshal(expected, &want); err != nil {
t.Errorf("Could not unmarshal the expected data into a go struct:%#v:", string(expected))
}

var got interface{}

if err := json.Unmarshal(decoded, &got); err != nil {
t.Errorf("Could not unmarshal the received data into a go struct:%#v:", string(decoded))
}

if !assert.Equal(t, want, got) {
t.Errorf("GOT: %v; WANT: %v", string(decoded), string(expected))
}
}

func toNativeAndCompare(t *testing.T, schema string, datum interface{}, encoded []byte, codec *Codec) {
t.Helper()
decoded, remaining, err := codec.NativeFromTextual(encoded)
Expand Down
83 changes: 69 additions & 14 deletions union.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func makeCodecInfo(st map[string]*Codec, enclosingNamespace string, schemaArray

}

func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
func unionNativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {

return func(buf []byte) (interface{}, []byte, error) {
var decoded interface{}
Expand All @@ -114,7 +114,7 @@ func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, erro
return Union(cr.allowedTypes[index], decoded), buf, nil
}
}
func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
func unionBinaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
Expand All @@ -141,7 +141,7 @@ func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
func unionNativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.indexFromName["null"]; ok {
Expand All @@ -159,7 +159,7 @@ func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, err
return datum, buf, nil
}
}
func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
func unionTextualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
Expand Down Expand Up @@ -196,6 +196,37 @@ func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byt
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func textualJsonFromNativeAvro(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
var err error
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return buf, nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
Expand All @@ -213,10 +244,10 @@ func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromTextual: nativeFromTextual(&cr),
textualFromNative: textualFromNative(&cr),
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: unionNativeFromTextual(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
Expand Down Expand Up @@ -246,7 +277,31 @@ func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace
// and then it will remain avro-json object
// avro data is not serialized back into standard json
// the data goes to avro-json and stays that way
func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
func buildCodecForTypeDescribedBySliceOneWayJson(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}

cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}

rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJson(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
func buildCodecForTypeDescribedBySliceTwoWayJson(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
Expand All @@ -263,10 +318,10 @@ func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamesp
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJSON(&cr),
textualFromNative: textualFromNative(&cr),
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJson(&cr),
textualFromNative: textualJsonFromNativeAvro(&cr),
}
return rv, nil
}
Expand All @@ -289,7 +344,7 @@ func checkAll(allowedTypes []string, cr *codecInfo, buf []byte) (interface{}, []
}
return nil, buf, fmt.Errorf("could not decode any json data in input %v", string(buf))
}
func nativeAvroFromTextualJSON(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
func nativeAvroFromTextualJson(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {

reader := bytes.NewReader(buf)
Expand Down
Loading

0 comments on commit be79bb9

Please sign in to comment.