-
Notifications
You must be signed in to change notification settings - Fork 3
/
avroserdebase.py
74 lines (60 loc) · 2.22 KB
/
avroserdebase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import struct
import avro.io
from confluent_kafka.avro import MessageSerializer as ConfluentMessageSerializer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.serializer.message_serializer import ContextStringIO
MAGIC_BYTE = 0
class HasSchemaMixin:
"""
A mixing for decoded Avro record to make able to add schema attribute
"""
def schema(self):
"""
:return: Avro schema for used to decode this entity
:rtype: avro.schema.Schema
"""
return self._schema
def _wrap(value, schema):
"""
Wraps a value into subclass with HasSchemaMixin
:param value: a decoded value
:param schema: corresponding Avro schema used to decode value
:return: An instance of a dynamically created class with schema fullname
"""
if hasattr(schema, 'fullname'):
name = schema.fullname
elif hasattr(schema, 'namespace'):
name = "{namespace}.{name}".format(namespace=schema.namespace,
name=schema.name)
elif hasattr(schema, 'name'):
name = schema.name
else:
name = schema.type
new_class = type(str(name), (value.__class__, HasSchemaMixin), {})
wrapped = new_class(value)
wrapped._schema = schema
return wrapped
class AvroSerDeBase(ConfluentMessageSerializer):
"""
A subclass of MessageSerializer from Confluent's kafka-python,
adding schema to deserialized Avro messages.
"""
def decode_message(self, message):
"""
Decode a message from kafka that has been encoded for use with
the schema registry.
@:param: message
"""
if message is None:
return None
if len(message) <= 5:
raise SerializerError("message is too small to decode")
with ContextStringIO(message) as payload:
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
return _wrap(
decoder_func(payload),
self.registry_client.get_by_id(schema_id)
)