/
avro.go
135 lines (113 loc) · 4.18 KB
/
avro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package kafka
import (
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"
"github.com/sirupsen/logrus"
)
const (
AvroSerializer string = "io.confluent.kafka.serializers.KafkaAvroSerializer"
AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, *Xk6KafkaError) {
bytesData := []byte(data.(string))
client := SchemaRegistryClient(configuration.SchemaRegistry.Url,
configuration.SchemaRegistry.BasicAuth.Username,
configuration.SchemaRegistry.BasicAuth.Password)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
schemaID := 0
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
}
if xk6KafkaError != nil {
logrus.New().WithError(xk6KafkaError).Warn("Failed to create or get schema, manually encoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for encoding Avro",
err)
}
avroEncodedData, _, err := codec.NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}
bytesData, err = codec.BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
}
if schemaInfo != nil {
schemaID = schemaInfo.ID()
// Encode the data into Avro and then the wire format
avroEncodedData, _, err := schemaInfo.Codec().NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}
bytesData, err = schemaInfo.Codec().BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
}
return EncodeWireFormat(bytesData, schemaID), nil
}
func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err)
}
client := SchemaRegistryClient(configuration.SchemaRegistry.Url,
configuration.SchemaRegistry.BasicAuth.Username,
configuration.SchemaRegistry.BasicAuth.Password)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
}
if xk6KafkaError != nil {
logrus.New().WithError(xk6KafkaError).Warn("Failed to create or get schema, manually decoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for decoding Avro",
err)
}
avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}
return avroDecodedData, nil
}
if schemaInfo != nil {
// Decode the data from Avro
avroDecodedData, _, err := schemaInfo.Codec().NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}
return avroDecodedData, nil
}
return bytesDecodedData, nil
}