Skip to content

Commit

Permalink
Use a proper Schema Registry client to fix issues in #22 (#52)
Browse files Browse the repository at this point in the history
* Use a proper Schema Registry client to fix issues in #22
* Add srclient dependency as a better way to access Schema Registry
* Combine functions in Avro serdes
* Change SchemaRegistryConfiguration and BasicAuth to fix issues (breaking change)
* Validate configuration for all supported (and unsupported) serde functionality
* Default serde is now StringSerializer and StringDeserializer
* Refactor wire format de/encoding
* Fix bugs and typos and refactor the rest
* Refactor basic auth function and move structs to their relevant files
* Fix linting errors reported by gosimple (golangci-lint)
  • Loading branch information
mostafa committed May 9, 2022
1 parent 8e1fb0c commit 9911ee8
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 246 deletions.
71 changes: 32 additions & 39 deletions avro.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,57 @@
package kafka

import (
"log"

"github.com/linkedin/goavro/v2"
)

func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
key := []byte(data.(string))
func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) {
bytesData := []byte(data.(string))
if schema != "" {
key = ToAvro(data.(string), schema)
codec, err := goavro.NewCodec(schema)
if err != nil {
ReportError(err, "Failed to create codec for encoding Avro")
}

avroEncodedData, _, err := codec.NativeFromTextual(bytesData)
if err != nil {
ReportError(err, "Failed to encode data into Avro")
}

bytesData, err = codec.BinaryFromNative(nil, avroEncodedData)
if err != nil {
ReportError(err, "Failed to encode Avro data into binary")
}
}

byteData, err := addMagicByteAndSchemaIdPrefix(configuration, key, topic, keyOrValue, schema)
byteData, err := encodeWireFormat(configuration, bytesData, topic, element, schema, version)
if err != nil {
ReportError(err, "Failed to add wire format to the binary data")
return nil, err
}

return byteData, nil
}

func ToAvro(value string, schema string) []byte {
codec, err := goavro.NewCodec(schema)
if err != nil {
log.Fatal(err)
}

native, _, err := codec.NativeFromTextual([]byte(value))
if err != nil {
log.Fatal(err)
}

binary, err := codec.BinaryFromNative(nil, native)
func DeserializeAvro(configuration Configuration, data []byte, element Element, schema string, version int) interface{} {
bytesDecodedData, err := decodeWireFormat(configuration, data, element)
if err != nil {
log.Fatal(err)
ReportError(err, "Failed to remove wire format from the binary data")
return nil
}

return binary
}

func DeserializeAvro(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
dataWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, data, keyOrValue)

if schema != "" {
return FromAvro(dataWithoutPrefix, schema)
}
codec, err := goavro.NewCodec(schema)
if err != nil {
ReportError(err, "Failed to create codec for decoding Avro")
}

return dataWithoutPrefix
}
avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData)
if err != nil {
ReportError(err, "Failed to decode data from Avro")
}

func FromAvro(message []byte, schema string) interface{} {
codec, err := goavro.NewCodec(schema)
if err != nil {
log.Fatal(err)
}

native, _, err := codec.NativeFromBinary(message)
if err != nil {
log.Fatal(err)
return avroDecodedData
}

return native
return bytesDecodedData
}
4 changes: 2 additions & 2 deletions bytearray.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package kafka

import "errors"

func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
func SerializeByteArray(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) {
switch data.(type) {
case []interface{}:
bArray := data.([]interface{})
Expand All @@ -16,6 +16,6 @@ func SerializeByteArray(configuration Configuration, topic string, data interfac
}
}

func DeserializeByteArray(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
func DeserializeByteArray(configuration Configuration, data []byte, element Element, schema string, version int) interface{} {
return data
}
59 changes: 16 additions & 43 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"encoding/json"
"errors"
"fmt"
"reflect"
)

type ConsumerConfiguration struct {
Expand All @@ -15,68 +17,39 @@ type ProducerConfiguration struct {
ValueSerializer string `json:"valueSerializer"`
}

type BasicAuth struct {
CredentialsSource string `json:"credentialsSource"`
UserInfo string `json:"userInfo"`
}

type SchemaRegistryConfiguration struct {
Url string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
UseLatest bool `json:"useLatest"`
}

type Configuration struct {
Consumer ConsumerConfiguration `json:"consumer"`
Producer ProducerConfiguration `json:"producer"`
SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

func unmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
func UnmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
var configuration Configuration
err := json.Unmarshal([]byte(jsonConfiguration), &configuration)
return configuration, err
}

func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ConsumerConfiguration{}) == configuration.Consumer {
return false
}
if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" ||
keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" {
return true
func ValidateConfiguration(configuration Configuration) error {
if (Configuration{}) == configuration {
// No configuration, fallback to default
return nil
}
return false
}

func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ProducerConfiguration{}) == configuration.Producer {
return false
}
if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" ||
keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" {
return true
if useSerializer(configuration, Key) || useSerializer(configuration, Value) {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("You must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of either of these types " + fmt.Sprintf("%q", reflect.ValueOf(Serializers).MapKeys()))
}
}
return false
return nil
}

func useBasicAuthWithCredentialSourceUserInfo(configuration Configuration) bool {
func GivenCredentials(configuration Configuration) bool {
if (Configuration{}) == configuration ||
(SchemaRegistryConfiguration{}) == configuration.SchemaRegistry ||
(BasicAuth{}) == configuration.SchemaRegistry.BasicAuth {
return false
}
return configuration.SchemaRegistry.BasicAuth.CredentialsSource == "USER_INFO"
}

func validateConfiguration(configuration Configuration) error {
if useKafkaAvroSerializer(configuration, "key") || useKafkaAvroSerializer(configuration, "value") {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("you must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of type \"io.confluent.kafka.serializers.KafkaAvroSerializer\"")
}
}
return nil
return configuration.SchemaRegistry.BasicAuth.Username != "" &&
configuration.SchemaRegistry.BasicAuth.Password != ""
}
15 changes: 12 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"go.k6.io/k6/metrics"
)

var DefaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"

func (*Kafka) Reader(
brokers []string, topic string, partition int,
groupID string, offset int64, auth string) *kafkago.Reader {
Expand Down Expand Up @@ -46,7 +48,7 @@ func (k *Kafka) Consume(reader *kafkago.Reader, limit int64,
func (k *Kafka) ConsumeWithConfiguration(
reader *kafkago.Reader, limit int64, configurationJson string,
keySchema string, valueSchema string) []map[string]interface{} {
configuration, err := unmarshalConfiguration(configurationJson)
configuration, err := UnmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
err = k.reportReaderStats(reader.Stats())
Expand Down Expand Up @@ -85,6 +87,13 @@ func (k *Kafka) consumeInternal(
limit = 1
}

err = ValidateConfiguration(configuration)
if err != nil {
ReportError(err, "Validation of configuration failed. Falling back to defaults")
configuration.Consumer.KeyDeserializer = DefaultDeserializer
configuration.Consumer.ValueDeserializer = DefaultDeserializer
}

keyDeserializer := GetDeserializer(configuration.Consumer.KeyDeserializer, keySchema)
valueDeserializer := GetDeserializer(configuration.Consumer.ValueDeserializer, valueSchema)

Expand Down Expand Up @@ -114,11 +123,11 @@ func (k *Kafka) consumeInternal(

message := make(map[string]interface{})
if len(msg.Key) > 0 {
message["key"] = keyDeserializer(configuration, msg.Key, "key", keySchema)
message["key"] = keyDeserializer(configuration, msg.Key, Key, keySchema, 0)
}

if len(msg.Value) > 0 {
message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema)
message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema, 0)
}

// Rest of the fields of a given message
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/mostafa/xk6-kafka
go 1.18

require (
github.com/linkedin/goavro/v2 v2.11.0
github.com/linkedin/goavro/v2 v2.11.1
github.com/riferrei/srclient v0.5.2
github.com/segmentio/kafka-go v0.4.31
go.k6.io/k6 v0.38.0
)
Expand All @@ -13,20 +14,22 @@ require (
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
Expand Down
15 changes: 12 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5Nq
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible h1:bopx7t9jyUNX1ebhr0G4gtQWmUOgwQRI0QsYhdYLgkU=
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand All @@ -27,8 +28,9 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/linkedin/goavro/v2 v2.11.0 h1:AlU/NR32ESbC/dlzbhTjyqybwESupUCc3SrrHg2qdTg=
github.com/linkedin/goavro/v2 v2.11.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1f4U0=
github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand All @@ -49,6 +51,10 @@ github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riferrei/srclient v0.5.2 h1:vDfBVmn/o5/MDByAnmeaLB1Tc91o+Bx5jmySKsQed7s=
github.com/riferrei/srclient v0.5.2/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI=
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE=
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ=
github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU=
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e h1:zWKUYT07mGmVBH+9UgnHXd/ekCK99C8EbDSAt5qsjXE=
Expand All @@ -74,6 +80,9 @@ golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJ
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
14 changes: 8 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
"Lz4": &compress.Lz4Codec,
"Zstd": &compress.ZstdCodec,
}
DefaultSerializer = "org.apache.kafka.common.serialization.StringSerializer"
)

func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer {
Expand Down Expand Up @@ -45,7 +46,7 @@ func (k *Kafka) Produce(
func (k *Kafka) ProduceWithConfiguration(
writer *kafkago.Writer, messages []map[string]interface{},
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
configuration, err := UnmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
return nil
Expand All @@ -69,10 +70,11 @@ func (k *Kafka) produceInternal(
return nil
}

err = validateConfiguration(configuration)
err = ValidateConfiguration(configuration)
if err != nil {
ReportError(err, "Validation of properties failed.")
return err
ReportError(err, "Validation of configuration failed. Falling back to defaults")
configuration.Producer.KeySerializer = DefaultSerializer
configuration.Producer.ValueSerializer = DefaultSerializer
}

if state == nil {
Expand Down Expand Up @@ -112,7 +114,7 @@ func (k *Kafka) produceInternal(

// If a key was provided, add it to the message. Keys are optional.
if _, has_key := message["key"]; has_key {
keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema)
keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema, 0)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
Expand All @@ -122,7 +124,7 @@ func (k *Kafka) produceInternal(
}

// Then add the message
valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema)
valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema, 0)
if err != nil {
ReportError(err, "Creation of message bytes failed.")
return err
Expand Down

0 comments on commit 9911ee8

Please sign in to comment.