Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a proper Schema Registry client to fix issues in #22 #52

Merged
merged 3 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 32 additions & 39 deletions avro.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,57 @@
package kafka

import (
"log"

"github.com/linkedin/goavro/v2"
)

func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
key := []byte(data.(string))
func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) {
bytesData := []byte(data.(string))
if schema != "" {
key = ToAvro(data.(string), schema)
codec, err := goavro.NewCodec(schema)
if err != nil {
ReportError(err, "Failed to create codec for encoding Avro")
}

avroEncodedData, _, err := codec.NativeFromTextual(bytesData)
if err != nil {
ReportError(err, "Failed to encode data into Avro")
}

bytesData, err = codec.BinaryFromNative(nil, avroEncodedData)
if err != nil {
ReportError(err, "Failed to encode Avro data into binary")
}
}

byteData, err := addMagicByteAndSchemaIdPrefix(configuration, key, topic, keyOrValue, schema)
byteData, err := encodeWireFormat(configuration, bytesData, topic, element, schema, version)
if err != nil {
ReportError(err, "Failed to add wire format to the binary data")
return nil, err
}

return byteData, nil
}

func ToAvro(value string, schema string) []byte {
codec, err := goavro.NewCodec(schema)
if err != nil {
log.Fatal(err)
}

native, _, err := codec.NativeFromTextual([]byte(value))
if err != nil {
log.Fatal(err)
}

binary, err := codec.BinaryFromNative(nil, native)
func DeserializeAvro(configuration Configuration, data []byte, element Element, schema string, version int) interface{} {
bytesDecodedData, err := decodeWireFormat(configuration, data, element)
if err != nil {
log.Fatal(err)
ReportError(err, "Failed to remove wire format from the binary data")
return nil
}

return binary
}

func DeserializeAvro(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
dataWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, data, keyOrValue)

if schema != "" {
return FromAvro(dataWithoutPrefix, schema)
}
codec, err := goavro.NewCodec(schema)
if err != nil {
ReportError(err, "Failed to create codec for decoding Avro")
}

return dataWithoutPrefix
}
avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData)
if err != nil {
ReportError(err, "Failed to decode data from Avro")
}

func FromAvro(message []byte, schema string) interface{} {
codec, err := goavro.NewCodec(schema)
if err != nil {
log.Fatal(err)
}

native, _, err := codec.NativeFromBinary(message)
if err != nil {
log.Fatal(err)
return avroDecodedData
}

return native
return bytesDecodedData
}
4 changes: 2 additions & 2 deletions bytearray.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package kafka

import "errors"

func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
func SerializeByteArray(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) {
switch data.(type) {
case []interface{}:
bArray := data.([]interface{})
Expand All @@ -16,6 +16,6 @@ func SerializeByteArray(configuration Configuration, topic string, data interfac
}
}

func DeserializeByteArray(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
func DeserializeByteArray(configuration Configuration, data []byte, element Element, schema string, version int) interface{} {
return data
}
59 changes: 16 additions & 43 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"encoding/json"
"errors"
"fmt"
"reflect"
)

type ConsumerConfiguration struct {
Expand All @@ -15,68 +17,39 @@ type ProducerConfiguration struct {
ValueSerializer string `json:"valueSerializer"`
}

type BasicAuth struct {
CredentialsSource string `json:"credentialsSource"`
UserInfo string `json:"userInfo"`
}

type SchemaRegistryConfiguration struct {
Url string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
UseLatest bool `json:"useLatest"`
}

type Configuration struct {
Consumer ConsumerConfiguration `json:"consumer"`
Producer ProducerConfiguration `json:"producer"`
SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

func unmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
func UnmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
var configuration Configuration
err := json.Unmarshal([]byte(jsonConfiguration), &configuration)
return configuration, err
}

func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ConsumerConfiguration{}) == configuration.Consumer {
return false
}
if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" ||
keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" {
return true
func ValidateConfiguration(configuration Configuration) error {
if (Configuration{}) == configuration {
// No configuration, fallback to default
return nil
}
return false
}

func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ProducerConfiguration{}) == configuration.Producer {
return false
}
if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" ||
keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" {
return true
if useSerializer(configuration, Key) || useSerializer(configuration, Value) {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("You must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of either of these types " + fmt.Sprintf("%q", reflect.ValueOf(Serializers).MapKeys()))
}
}
return false
return nil
}

func useBasicAuthWithCredentialSourceUserInfo(configuration Configuration) bool {
func GivenCredentials(configuration Configuration) bool {
if (Configuration{}) == configuration ||
(SchemaRegistryConfiguration{}) == configuration.SchemaRegistry ||
(BasicAuth{}) == configuration.SchemaRegistry.BasicAuth {
return false
}
return configuration.SchemaRegistry.BasicAuth.CredentialsSource == "USER_INFO"
}

func validateConfiguration(configuration Configuration) error {
if useKafkaAvroSerializer(configuration, "key") || useKafkaAvroSerializer(configuration, "value") {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("you must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of type \"io.confluent.kafka.serializers.KafkaAvroSerializer\"")
}
}
return nil
return configuration.SchemaRegistry.BasicAuth.Username != "" &&
configuration.SchemaRegistry.BasicAuth.Password != ""
}
15 changes: 12 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"go.k6.io/k6/metrics"
)

var DefaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"

func (*Kafka) Reader(
brokers []string, topic string, partition int,
groupID string, offset int64, auth string) *kafkago.Reader {
Expand Down Expand Up @@ -46,7 +48,7 @@ func (k *Kafka) Consume(reader *kafkago.Reader, limit int64,
func (k *Kafka) ConsumeWithConfiguration(
reader *kafkago.Reader, limit int64, configurationJson string,
keySchema string, valueSchema string) []map[string]interface{} {
configuration, err := unmarshalConfiguration(configurationJson)
configuration, err := UnmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
err = k.reportReaderStats(reader.Stats())
Expand Down Expand Up @@ -85,6 +87,13 @@ func (k *Kafka) consumeInternal(
limit = 1
}

err = ValidateConfiguration(configuration)
if err != nil {
ReportError(err, "Validation of configuration failed. Falling back to defaults")
configuration.Consumer.KeyDeserializer = DefaultDeserializer
configuration.Consumer.ValueDeserializer = DefaultDeserializer
}

keyDeserializer := GetDeserializer(configuration.Consumer.KeyDeserializer, keySchema)
valueDeserializer := GetDeserializer(configuration.Consumer.ValueDeserializer, valueSchema)

Expand Down Expand Up @@ -114,11 +123,11 @@ func (k *Kafka) consumeInternal(

message := make(map[string]interface{})
if len(msg.Key) > 0 {
message["key"] = keyDeserializer(configuration, msg.Key, "key", keySchema)
message["key"] = keyDeserializer(configuration, msg.Key, Key, keySchema, 0)
}

if len(msg.Value) > 0 {
message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema)
message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema, 0)
}

// Rest of the fields of a given message
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/mostafa/xk6-kafka
go 1.18

require (
github.com/linkedin/goavro/v2 v2.11.0
github.com/linkedin/goavro/v2 v2.11.1
github.com/riferrei/srclient v0.5.2
github.com/segmentio/kafka-go v0.4.31
go.k6.io/k6 v0.38.0
)
Expand All @@ -13,20 +14,22 @@ require (
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
Expand Down
15 changes: 12 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5Nq
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible h1:bopx7t9jyUNX1ebhr0G4gtQWmUOgwQRI0QsYhdYLgkU=
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand All @@ -27,8 +28,9 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/linkedin/goavro/v2 v2.11.0 h1:AlU/NR32ESbC/dlzbhTjyqybwESupUCc3SrrHg2qdTg=
github.com/linkedin/goavro/v2 v2.11.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1f4U0=
github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand All @@ -49,6 +51,10 @@ github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riferrei/srclient v0.5.2 h1:vDfBVmn/o5/MDByAnmeaLB1Tc91o+Bx5jmySKsQed7s=
github.com/riferrei/srclient v0.5.2/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI=
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE=
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ=
github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU=
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e h1:zWKUYT07mGmVBH+9UgnHXd/ekCK99C8EbDSAt5qsjXE=
Expand All @@ -74,6 +80,9 @@ golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJ
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
14 changes: 8 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
"Lz4": &compress.Lz4Codec,
"Zstd": &compress.ZstdCodec,
}
DefaultSerializer = "org.apache.kafka.common.serialization.StringSerializer"
)

func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer {
Expand Down Expand Up @@ -45,7 +46,7 @@ func (k *Kafka) Produce(
func (k *Kafka) ProduceWithConfiguration(
writer *kafkago.Writer, messages []map[string]interface{},
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
configuration, err := UnmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
return nil
Expand All @@ -69,10 +70,11 @@ func (k *Kafka) produceInternal(
return nil
}

err = validateConfiguration(configuration)
err = ValidateConfiguration(configuration)
if err != nil {
ReportError(err, "Validation of properties failed.")
return err
ReportError(err, "Validation of configuration failed. Falling back to defaults")
configuration.Producer.KeySerializer = DefaultSerializer
configuration.Producer.ValueSerializer = DefaultSerializer
}

if state == nil {
Expand Down Expand Up @@ -112,7 +114,7 @@ func (k *Kafka) produceInternal(

// If a key was provided, add it to the message. Keys are optional.
if _, has_key := message["key"]; has_key {
keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema)
keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema, 0)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
Expand All @@ -122,7 +124,7 @@ func (k *Kafka) produceInternal(
}

// Then add the message
valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema)
valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema, 0)
if err != nil {
ReportError(err, "Creation of message bytes failed.")
return err
Expand Down