diff --git a/avro.go b/avro.go index 1f7a833..bf177b8 100644 --- a/avro.go +++ b/avro.go @@ -1,64 +1,57 @@ package kafka import ( - "log" - "github.com/linkedin/goavro/v2" ) -func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) { - key := []byte(data.(string)) +func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) { + bytesData := []byte(data.(string)) if schema != "" { - key = ToAvro(data.(string), schema) + codec, err := goavro.NewCodec(schema) + if err != nil { + ReportError(err, "Failed to create codec for encoding Avro") + } + + avroEncodedData, _, err := codec.NativeFromTextual(bytesData) + if err != nil { + ReportError(err, "Failed to encode data into Avro") + } + + bytesData, err = codec.BinaryFromNative(nil, avroEncodedData) + if err != nil { + ReportError(err, "Failed to encode Avro data into binary") + } } - byteData, err := addMagicByteAndSchemaIdPrefix(configuration, key, topic, keyOrValue, schema) + byteData, err := encodeWireFormat(configuration, bytesData, topic, element, schema, version) if err != nil { + ReportError(err, "Failed to add wire format to the binary data") return nil, err } return byteData, nil } -func ToAvro(value string, schema string) []byte { - codec, err := goavro.NewCodec(schema) - if err != nil { - log.Fatal(err) - } - - native, _, err := codec.NativeFromTextual([]byte(value)) - if err != nil { - log.Fatal(err) - } - - binary, err := codec.BinaryFromNative(nil, native) +func DeserializeAvro(configuration Configuration, data []byte, element Element, schema string, version int) interface{} { + bytesDecodedData, err := decodeWireFormat(configuration, data, element) if err != nil { - log.Fatal(err) + ReportError(err, "Failed to remove wire format from the binary data") + return nil } - return binary -} - -func DeserializeAvro(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} { - dataWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, data, keyOrValue) - if schema != "" { - return FromAvro(dataWithoutPrefix, schema) - } + codec, err := goavro.NewCodec(schema) + if err != nil { + ReportError(err, "Failed to create codec for decoding Avro") + } - return dataWithoutPrefix -} + avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData) + if err != nil { + ReportError(err, "Failed to decode data from Avro") + } -func FromAvro(message []byte, schema string) interface{} { - codec, err := goavro.NewCodec(schema) - if err != nil { - log.Fatal(err) - } - - native, _, err := codec.NativeFromBinary(message) - if err != nil { - log.Fatal(err) + return avroDecodedData } - return native + return bytesDecodedData } diff --git a/bytearray.go b/bytearray.go index 1cb570c..843c6a3 100644 --- a/bytearray.go +++ b/bytearray.go @@ -2,7 +2,7 @@ package kafka import "errors" -func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) { +func SerializeByteArray(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) { switch data.(type) { case []interface{}: bArray := data.([]interface{}) @@ -16,6 +16,6 @@ func SerializeByteArray(configuration Configuration, topic string, data interfac } } -func DeserializeByteArray(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} { +func DeserializeByteArray(configuration Configuration, data []byte, element Element, schema string, version int) interface{} { return data } diff --git a/configuration.go b/configuration.go index d7fe749..043ff91 100644 --- a/configuration.go +++ b/configuration.go @@ -3,6 +3,8 @@ package kafka import ( "encoding/json" "errors" + "fmt" + "reflect" ) type ConsumerConfiguration struct { @@ -15,68 +17,39 @@ type ProducerConfiguration struct { ValueSerializer string `json:"valueSerializer"` } -type BasicAuth struct { - CredentialsSource string `json:"credentialsSource"` - UserInfo string `json:"userInfo"` -} - -type SchemaRegistryConfiguration struct { - Url string `json:"url"` - BasicAuth BasicAuth `json:"basicAuth"` - UseLatest bool `json:"useLatest"` -} - type Configuration struct { Consumer ConsumerConfiguration `json:"consumer"` Producer ProducerConfiguration `json:"producer"` SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"` } -func unmarshalConfiguration(jsonConfiguration string) (Configuration, error) { +func UnmarshalConfiguration(jsonConfiguration string) (Configuration, error) { var configuration Configuration err := json.Unmarshal([]byte(jsonConfiguration), &configuration) return configuration, err } -func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bool { - if (Configuration{}) == configuration || - (ConsumerConfiguration{}) == configuration.Consumer { - return false - } - if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" || - keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" { - return true +func ValidateConfiguration(configuration Configuration) error { + if (Configuration{}) == configuration { + // No configuration, fallback to default + return nil } - return false -} -func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool { - if (Configuration{}) == configuration || - (ProducerConfiguration{}) == configuration.Producer { - return false - } - if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" || - keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" { - return true + if useSerializer(configuration, Key) || useSerializer(configuration, Value) { + if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry { + return errors.New("You must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " + + "of either of these types " + fmt.Sprintf("%q", reflect.ValueOf(Serializers).MapKeys())) + } } - return false + return nil } -func useBasicAuthWithCredentialSourceUserInfo(configuration Configuration) bool { +func GivenCredentials(configuration Configuration) bool { if (Configuration{}) == configuration || (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry || (BasicAuth{}) == configuration.SchemaRegistry.BasicAuth { return false } - return configuration.SchemaRegistry.BasicAuth.CredentialsSource == "USER_INFO" -} - -func validateConfiguration(configuration Configuration) error { - if useKafkaAvroSerializer(configuration, "key") || useKafkaAvroSerializer(configuration, "value") { - if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry { - return errors.New("you must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " + - "of type \"io.confluent.kafka.serializers.KafkaAvroSerializer\"") - } - } - return nil + return configuration.SchemaRegistry.BasicAuth.Username != "" && + configuration.SchemaRegistry.BasicAuth.Password != "" } diff --git a/consumer.go b/consumer.go index 657832e..3d5c80c 100644 --- a/consumer.go +++ b/consumer.go @@ -9,6 +9,8 @@ import ( "go.k6.io/k6/metrics" ) +var DefaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer" + func (*Kafka) Reader( brokers []string, topic string, partition int, groupID string, offset int64, auth string) *kafkago.Reader { @@ -46,7 +48,7 @@ func (k *Kafka) Consume(reader *kafkago.Reader, limit int64, func (k *Kafka) ConsumeWithConfiguration( reader *kafkago.Reader, limit int64, configurationJson string, keySchema string, valueSchema string) []map[string]interface{} { - configuration, err := unmarshalConfiguration(configurationJson) + configuration, err := UnmarshalConfiguration(configurationJson) if err != nil { ReportError(err, "Cannot unmarshal configuration "+configurationJson) err = k.reportReaderStats(reader.Stats()) @@ -85,6 +87,13 @@ func (k *Kafka) consumeInternal( limit = 1 } + err = ValidateConfiguration(configuration) + if err != nil { + ReportError(err, "Validation of configuration failed. Falling back to defaults") + configuration.Consumer.KeyDeserializer = DefaultDeserializer + configuration.Consumer.ValueDeserializer = DefaultDeserializer + } + keyDeserializer := GetDeserializer(configuration.Consumer.KeyDeserializer, keySchema) valueDeserializer := GetDeserializer(configuration.Consumer.ValueDeserializer, valueSchema) @@ -114,11 +123,11 @@ func (k *Kafka) consumeInternal( message := make(map[string]interface{}) if len(msg.Key) > 0 { - message["key"] = keyDeserializer(configuration, msg.Key, "key", keySchema) + message["key"] = keyDeserializer(configuration, msg.Key, Key, keySchema, 0) } if len(msg.Value) > 0 { - message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema) + message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema, 0) } // Rest of the fields of a given message diff --git a/go.mod b/go.mod index 1f3896d..dd9eb8e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/mostafa/xk6-kafka go 1.18 require ( - github.com/linkedin/goavro/v2 v2.11.0 + github.com/linkedin/goavro/v2 v2.11.1 + github.com/riferrei/srclient v0.5.2 github.com/segmentio/kafka-go v0.4.31 go.k6.io/k6 v0.38.0 ) @@ -13,7 +14,7 @@ require ( github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf // indirect github.com/fatih/color v1.13.0 // indirect github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect - github.com/golang/snappy v0.0.1 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.12 // indirect @@ -21,12 +22,14 @@ require ( github.com/nxadm/tail v1.4.8 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/afero v1.1.2 // indirect github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect diff --git a/go.sum b/go.sum index fe1b4fc..30ac39f 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,9 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5Nq github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible h1:bopx7t9jyUNX1ebhr0G4gtQWmUOgwQRI0QsYhdYLgkU= github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -27,8 +28,9 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/linkedin/goavro/v2 v2.11.0 h1:AlU/NR32ESbC/dlzbhTjyqybwESupUCc3SrrHg2qdTg= -github.com/linkedin/goavro/v2 v2.11.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1f4U0= +github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -49,6 +51,10 @@ github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/riferrei/srclient v0.5.2 h1:vDfBVmn/o5/MDByAnmeaLB1Tc91o+Bx5jmySKsQed7s= +github.com/riferrei/srclient v0.5.2/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ= github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU= github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e h1:zWKUYT07mGmVBH+9UgnHXd/ekCK99C8EbDSAt5qsjXE= @@ -74,6 +80,9 @@ golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJ golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/producer.go b/producer.go index b7d7fef..197223b 100644 --- a/producer.go +++ b/producer.go @@ -17,6 +17,7 @@ var ( "Lz4": &compress.Lz4Codec, "Zstd": &compress.ZstdCodec, } + DefaultSerializer = "org.apache.kafka.common.serialization.StringSerializer" ) func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer { @@ -45,7 +46,7 @@ func (k *Kafka) Produce( func (k *Kafka) ProduceWithConfiguration( writer *kafkago.Writer, messages []map[string]interface{}, configurationJson string, keySchema string, valueSchema string) error { - configuration, err := unmarshalConfiguration(configurationJson) + configuration, err := UnmarshalConfiguration(configurationJson) if err != nil { ReportError(err, "Cannot unmarshal configuration "+configurationJson) return nil @@ -69,10 +70,11 @@ func (k *Kafka) produceInternal( return nil } - err = validateConfiguration(configuration) + err = ValidateConfiguration(configuration) if err != nil { - ReportError(err, "Validation of properties failed.") - return err + ReportError(err, "Validation of configuration failed. Falling back to defaults") + configuration.Producer.KeySerializer = DefaultSerializer + configuration.Producer.ValueSerializer = DefaultSerializer } if state == nil { @@ -112,7 +114,7 @@ func (k *Kafka) produceInternal( // If a key was provided, add it to the message. Keys are optional. if _, has_key := message["key"]; has_key { - keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema) + keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema, 0) if err != nil { ReportError(err, "Creation of key bytes failed.") return err @@ -122,7 +124,7 @@ func (k *Kafka) produceInternal( } // Then add the message - valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema) + valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema, 0) if err != nil { ReportError(err, "Creation of message bytes failed.") return err diff --git a/schemaRegistry.go b/schemaRegistry.go index 8901ed4..7086874 100644 --- a/schemaRegistry.go +++ b/schemaRegistry.go @@ -1,17 +1,30 @@ package kafka import ( - "bytes" - "encoding/json" "errors" - "fmt" - "io/ioutil" - "net/http" - "strings" - "github.com/linkedin/goavro/v2" + "github.com/riferrei/srclient" ) +type Element string + +const ( + Key Element = "key" + Value Element = "value" +) + +type BasicAuth struct { + Username string `json:"username"` + Password string `json:"password"` +} + +type SchemaRegistryConfiguration struct { + Url string `json:"url"` + BasicAuth BasicAuth `json:"basicAuth"` + UseLatest bool `json:"useLatest"` + CacheSchemas bool `json:"cacheSchemas"` +} + func i32tob(val uint32) []byte { r := make([]byte, 4) for i := uint32(0); i < 4; i++ { @@ -20,115 +33,86 @@ func i32tob(val uint32) []byte { return r } -// Account for proprietary 5-byte prefix before the Avro payload: +// Account for proprietary 5-byte prefix before the Avro, ProtoBuf or JSONSchema payload: // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -func removeMagicByteAndSchemaIdPrefix(configuration Configuration, messageData []byte, keyOrValue string) []byte { - if useKafkaAvroDeserializer(configuration, keyOrValue) { - return messageData[5:] +func decodeWireFormat(configuration Configuration, messageData []byte, element Element) ([]byte, error) { + if !useDeserializer(configuration, element) { + return messageData, nil + } + + if element == Key && isWireFormatted(configuration.Consumer.KeyDeserializer) || + element == Value && isWireFormatted(configuration.Consumer.ValueDeserializer) { + if len(messageData) < 5 { + return nil, errors.New("Invalid message data") + } + return messageData[5:], nil } - return messageData + return messageData, nil } -// Add proprietary 5-byte prefix before the Avro payload: +// Add proprietary 5-byte prefix before the Avro, ProtoBuf or JSONSchema payload: // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -func addMagicByteAndSchemaIdPrefix(configuration Configuration, avroData []byte, topic string, keyOrValue string, schema string) ([]byte, error) { - var schemaId, err = getSchemaId(configuration, topic, keyOrValue, schema) - if err != nil { - ReportError(err, "Retrieval of schema id failed.") - return nil, err +func encodeWireFormat(configuration Configuration, avroData []byte, topic string, element Element, schema string, version int) ([]byte, error) { + if !useSerializer(configuration, element) { + return avroData, nil } - if schemaId != 0 { - return append(append([]byte{0}, i32tob(schemaId)...), avroData...), nil + + if element == Key && isWireFormatted(configuration.Producer.KeySerializer) || + element == "value " && isWireFormatted(configuration.Producer.ValueSerializer) { + var schemaInfo, err = getSchema( + configuration, topic, element, schema, srclient.Avro, version) + if err != nil { + ReportError(err, "Retrieval of schema id failed.") + return nil, err + } + if schemaInfo.ID() != 0 { + return append(append([]byte{0}, i32tob(uint32(schemaInfo.ID()))...), avroData...), nil + } } return avroData, nil } -var schemaIdCache = make(map[string]uint32) +func schemaRegistryClient(configuration Configuration) *srclient.SchemaRegistryClient { + srClient := srclient.CreateSchemaRegistryClient(configuration.SchemaRegistry.Url) + srClient.CachingEnabled(configuration.SchemaRegistry.CacheSchemas) -type SchemaInfo struct { - Id int32 `json:"id"` - Version int32 `json:"version"` + if GivenCredentials(configuration) { + srClient.SetCredentials( + configuration.SchemaRegistry.BasicAuth.Username, + configuration.SchemaRegistry.BasicAuth.Password) + } + return srClient } -func getSchemaId(configuration Configuration, topic string, keyOrValue string, schema string) (uint32, error) { - if schemaIdCache[schema] > 0 { - return schemaIdCache[schema], nil +func getSchema( + configuration Configuration, topic string, element Element, + schema string, schemaType srclient.SchemaType, version int) (*srclient.Schema, error) { + + // Default schema type is Avro + if schemaType == "" { + schemaType = srclient.Avro } - if useKafkaAvroSerializer(configuration, keyOrValue) { - if configuration.SchemaRegistry.UseLatest { - url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions/latest" - client := &http.Client{} - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, err - } - req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") - if useBasicAuthWithCredentialSourceUserInfo(configuration) { - username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0] - password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1] - req.SetBasicAuth(username, password) - } - resp, err := client.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode >= 400 { - return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, response=%v", url, resp)) - } - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } - var result SchemaInfo - - err = json.Unmarshal(bodyBytes, &result) - if err != nil { - return 0, err - } - schemaId := uint32(result.Id) - schemaIdCache[schema] = schemaId - return schemaId, nil - } else { - url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions" - codec, _ := goavro.NewCodec(schema) - - body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}" - - client := &http.Client{} - req, err := http.NewRequest("POST", url, bytes.NewReader([]byte(body))) - if err != nil { - return 0, err - } - req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") - if useBasicAuthWithCredentialSourceUserInfo(configuration) { - username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0] - password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1] - req.SetBasicAuth(username, password) - } - resp, err := client.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode >= 400 { - return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, body=%v, response=%v", url, body, resp)) - } - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } - - var result map[string]int32 - err = json.Unmarshal(bodyBytes, &result) - if err != nil { - return 0, err - } - schemaId := uint32(result["id"]) - schemaIdCache[schema] = schemaId - return schemaId, nil + srClient := schemaRegistryClient(configuration) + + var schemaInfo *srclient.Schema + subject := topic + "-" + string(element) + // Default version of the schema is the latest version + // If CacheSchemas is true, the client will cache the schema + if version == 0 { + schemaInfo, _ = srClient.GetLatestSchema(subject) + } else { + schemaInfo, _ = srClient.GetSchemaByVersion(subject, version) + } + + if schemaInfo == nil { + schemaInfo, err := srClient.CreateSchema(subject, schema, schemaType) + if err != nil { + ReportError(err, "Creation of schema failed.") + return nil, err } + return schemaInfo, nil } - return 0, nil + + return schemaInfo, nil } diff --git a/scripts/test_avro_with_schema_registry.js b/scripts/test_avro_with_schema_registry.js index c5d0f8e..94c0728 100644 --- a/scripts/test_avro_with_schema_registry.js +++ b/scripts/test_avro_with_schema_registry.js @@ -13,22 +13,16 @@ import { deleteTopic, } from "k6/x/kafka"; // import kafka extension -const bootstrapServers = ["subdomain.us-east-1.aws.confluent.cloud:9092"]; -const topic = "com.example.person"; +const bootstrapServers = ["localhost:9092"]; +const kafkaTopic = "com.example.person"; -const auth = JSON.stringify({ - username: "username", - password: "password", - algorithm: "plain", -}); - -const producer = writer(bootstrapServers, topic, auth); -const consumer = reader(bootstrapServers, topic, null, "", null, auth); +const producer = writer(bootstrapServers, kafkaTopic); +const consumer = reader(bootstrapServers, kafkaTopic, null, "", null); const keySchema = `{ "name": "KeySchema", "type": "record", - "namespace": "com.example", + "namespace": "com.example.key", "fields": [ { "name": "ssn", @@ -40,7 +34,7 @@ const keySchema = `{ const valueSchema = `{ "name": "ValueSchema", "type": "record", - "namespace": "com.example", + "namespace": "com.example.value", "fields": [ { "name": "firstname", @@ -63,11 +57,7 @@ var configuration = JSON.stringify({ valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer", }, schemaRegistry: { - url: "https://subdomain.us-east-2.aws.confluent.cloud", - basicAuth: { - credentialsSource: "USER_INFO", - userInfo: "KEY:SECRET", - }, + url: "http://localhost:8081", }, }); diff --git a/scripts/test_avro_with_schema_registry_no_key.js b/scripts/test_avro_with_schema_registry_no_key.js index 58d4b1e..e33bfa5 100644 --- a/scripts/test_avro_with_schema_registry_no_key.js +++ b/scripts/test_avro_with_schema_registry_no_key.js @@ -14,10 +14,10 @@ import { } from "k6/x/kafka"; // import kafka extension const bootstrapServers = ["localhost:9092"]; -const topic = "com.example.person"; +const kafkaTopic = "com.example.person"; -const producer = writer(bootstrapServers, topic, null); -const consumer = reader(bootstrapServers, topic, null, "", null, null); +const producer = writer(bootstrapServers, kafkaTopic, null); +const consumer = reader(bootstrapServers, kafkaTopic, null, "", null, null); const valueSchema = `{ "name": "ValueSchema", @@ -81,14 +81,14 @@ export default function () { export function teardown(data) { if (__VU == 1) { - // Delete the topic + // Delete the kafkaTopic const error = deleteTopic(bootstrapServers[0], kafkaTopic); if (error === undefined) { - // If no error returns, it means that the topic + // If no error returns, it means that the kafkaTopic // is successfully deleted console.log("Topic deleted successfully"); } else { - console.log("Error while deleting topic: ", error); + console.log("Error while deleting kafkaTopic: ", error); } } producer.close(); diff --git a/serde.go b/serde.go index 79634d4..9e8323d 100644 --- a/serde.go +++ b/serde.go @@ -1,40 +1,87 @@ package kafka -type Serializer func(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) -type Deserializer func(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} +type Serializer func(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) +type Deserializer func(configuration Configuration, data []byte, element Element, schema string, version int) interface{} + +var ( + // TODO: Find a better way to do this, like serde registry or something + Serializers = map[string]Serializer{ + "org.apache.kafka.common.serialization.StringSerializer": SerializeString, + "org.apache.kafka.common.serialization.ByteArraySerializer": SerializeByteArray, + "io.confluent.kafka.serializers.KafkaAvroSerializer": SerializeAvro, + "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer": nil, + "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer": nil, + } + + Deserializers = map[string]Deserializer{ + "org.apache.kafka.common.serialization.StringDeserializer": DeserializeString, + "org.apache.kafka.common.serialization.ByteArrayDeserializer": DeserializeByteArray, + "io.confluent.kafka.serializers.KafkaAvroDeserializer": DeserializeAvro, + "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer": nil, + "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer": nil, + } + + WireFormattedCodecs = map[string]bool{ + // Serializers + "org.apache.kafka.common.serialization.StringSerializer": false, + "org.apache.kafka.common.serialization.ByteArraySerializer": false, + "io.confluent.kafka.serializers.KafkaAvroSerializer": true, + "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer": true, + "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer": true, + + // Deserializers + "org.apache.kafka.common.serialization.StringDeserializer": false, + "org.apache.kafka.common.serialization.ByteArrayDeserializer": false, + "io.confluent.kafka.serializers.KafkaAvroDeserializer": true, + "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer": true, + "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer": true, + } +) + +func useSerializer(configuration Configuration, element Element) bool { + if (Configuration{}) == configuration || (ProducerConfiguration{}) == configuration.Producer && + (element == Key && configuration.Producer.KeySerializer != "") || (element == Value && configuration.Producer.ValueSerializer != "") { + return true + } + return false +} + +func useDeserializer(configuration Configuration, element Element) bool { + if (Configuration{}) == configuration || (ConsumerConfiguration{}) == configuration.Consumer && + (element == Key && configuration.Consumer.KeyDeserializer != "") || (element == Value && configuration.Consumer.ValueDeserializer != "") { + return true + } + return false +} + +func isWireFormatted(serde string) bool { + return WireFormattedCodecs[serde] +} func GetSerializer(serializer string, schema string) Serializer { - // if schema exists default to AVRO without schema registry + // if schema exists default to Avro without schema registry + // TODO: deprecate this if schema != "" { return SerializeAvro } - switch serializer { - case "org.apache.kafka.common.serialization.ByteArraySerializer": - return SerializeByteArray - case "org.apache.kafka.common.serialization.StringSerializer": - return SerializeString - case "io.confluent.kafka.serializers.KafkaAvroSerializer": - return SerializeAvro - default: + serializerFunction := Serializers[serializer] + if serializerFunction == nil { return SerializeString } + return serializerFunction } func GetDeserializer(deserializer string, schema string) Deserializer { - // if schema exists default to AVRO without schema registry + // if schema exists default to Avro without schema registry + // TODO: deprecate this if schema != "" { return DeserializeAvro } - switch deserializer { - case "org.apache.kafka.common.serialization.ByteArrayDeserializer": - return DeserializeByteArray - case "org.apache.kafka.common.serialization.StringDeserializer": - return DeserializeString - case "io.confluent.kafka.serializers.KafkaAvroDeserializer": - return DeserializeAvro - default: + deserializerFunction := Deserializers[deserializer] + if deserializerFunction == nil { return DeserializeString } + return deserializerFunction } diff --git a/string.go b/string.go index 51f8325..af987be 100644 --- a/string.go +++ b/string.go @@ -2,15 +2,15 @@ package kafka import "errors" -func SerializeString(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) { - switch data.(type) { +func SerializeString(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, error) { + switch data := data.(type) { case string: - return []byte(data.(string)), nil + return []byte(data), nil default: return nil, errors.New("Invalid data type provided for string serializer (requires string)") } } -func DeserializeString(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} { +func DeserializeString(configuration Configuration, data []byte, element Element, schema string, version int) interface{} { return string(data) }