-
Notifications
You must be signed in to change notification settings - Fork 4
/
avromessage.py
109 lines (88 loc) · 4.19 KB
/
avromessage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import json
import pathlib
import pickle
import struct
from io import BytesIO
from typing import Optional, Tuple, Dict, Iterable, NamedTuple, Any
import itertools as it
import fastavro
from confluent_kafka.cimpl import Message
from confluent_kafka.avro import loads as load_schema
from esque.message import FileWriter, FileReader, KafkaMessage
from esque.schemaregistry import SchemaRegistryClient
class DecodedAvroMessage(NamedTuple):
key: Any
value: Any
partition: int
key_schema_id: int
value_schema_id: int
class AvroFileWriter(FileWriter):
def __init__(self, directory: pathlib.Path, schema_registry_client: SchemaRegistryClient):
super().__init__(directory)
self.directory = directory
self.schema_registry_client = schema_registry_client
self.current_key_schema_id = None
self.current_value_schema_id = None
self.schema_dir_name = None
self.schema_version = it.count(1)
self.open_mode = "wb+"
def write_message_to_file(self, message: Message):
key_schema_id, decoded_key = self.decode_bytes(message.key())
value_schema_id, decoded_value = self.decode_bytes(message.value())
decoded_message = DecodedAvroMessage(
decoded_key, decoded_value, message.partition(), key_schema_id, value_schema_id
)
if self.schema_changed(decoded_message) or self.schema_dir_name is None:
self.schema_dir_name = f"{next(self.schema_version):04}_{key_schema_id}_{value_schema_id}"
self.current_key_schema_id = key_schema_id
self.current_value_schema_id = value_schema_id
self._dump_schemata(key_schema_id, value_schema_id)
serializable_message = {
"key": decoded_message.key,
"value": decoded_message.value,
"partition": decoded_message.partition,
"schema_directory_name": self.schema_dir_name,
}
pickle.dump(serializable_message, self.file)
def _dump_schemata(self, key_schema_id, value_schema_id):
directory = self.directory / self.schema_dir_name
directory.mkdir()
(directory / "key_schema.avsc").write_text(
json.dumps(self.schema_registry_client.get_schema_from_id(key_schema_id).original_schema), encoding="utf-8"
)
(directory / "value_schema.avsc").write_text(
json.dumps(self.schema_registry_client.get_schema_from_id(value_schema_id).original_schema),
encoding="utf-8",
)
def decode_bytes(self, raw_data: Optional[bytes]) -> Tuple[int, Optional[Dict]]:
if raw_data is None:
return -1, None
with BytesIO(raw_data) as fake_stream:
schema_id = extract_schema_id(fake_stream.read(5))
parsed_schema = self.schema_registry_client.get_schema_from_id(schema_id).parsed_schema
record = fastavro.schemaless_reader(fake_stream, parsed_schema)
return schema_id, record
def schema_changed(self, decoded_message: DecodedAvroMessage) -> bool:
return (
self.current_value_schema_id != decoded_message.value_schema_id and decoded_message.value is not None
) or self.current_key_schema_id != decoded_message.key_schema_id
class AvroFileReader(FileReader):
def __init__(self, directory: pathlib.Path):
super().__init__(directory)
self.open_mode = "rb"
def read_from_file(self) -> Iterable[KafkaMessage]:
while True:
try:
record = pickle.load(self.file)
except EOFError:
return
schema_directory = self.directory / record["schema_directory_name"]
key_schema = load_schema((schema_directory / "key_schema.avsc").read_text(encoding="utf-8"))
value_schema = load_schema((schema_directory / "value_schema.avsc").read_text(encoding="utf-8"))
yield KafkaMessage(
json.dumps(record["key"]), json.dumps(record["value"]), record["partition"], key_schema, value_schema
)
def extract_schema_id(message: bytes) -> int:
magic_byte, schema_id = struct.unpack(">bI", message[:5])
assert magic_byte == 0, f"Wrong magic byte ({magic_byte}), no AVRO message."
return schema_id