-
Notifications
You must be signed in to change notification settings - Fork 156
/
image.py
267 lines (198 loc) · 7.67 KB
/
image.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
"""OTA Firmware handling."""
from __future__ import annotations
import logging
import attr
import zigpy.types as t
LOGGER = logging.getLogger(__name__)
@attr.s(frozen=True)
class ImageKey:
manufacturer_id = attr.ib(default=None)
image_type = attr.ib(default=None)
class HWVersion(t.uint16_t):
@property
def version(self):
return self >> 8
@property
def revision(self):
return self & 0x00FF
def __repr__(self):
return "<{} version={} revision={}>".format(
self.__class__.__name__, self.version, self.revision
)
class HeaderString(str):
_size = 32
@classmethod
def deserialize(cls, data):
if len(data) < cls._size:
raise ValueError(f"Data is too short. Should be at least {cls._size}")
raw = data[: cls._size].split(b"\x00")[0]
return cls(raw.decode("utf8", errors="replace")), data[cls._size :]
def serialize(self):
return self.encode("utf8").ljust(self._size, b"\x00")
class FieldControl(t.bitmap16):
SECURITY_CREDENTIAL_VERSION_PRESENT = 0b001
DEVICE_SPECIFIC_FILE_PRESENT = 0b010
HARDWARE_VERSIONS_PRESENT = 0b100
class OTAImageHeader(t.Struct):
MAGIC_VALUE = 0x0BEEF11E
OTA_HEADER = MAGIC_VALUE.to_bytes(4, "little")
upgrade_file_id: t.uint32_t
header_version: t.uint16_t
header_length: t.uint16_t
field_control: FieldControl
manufacturer_id: t.uint16_t
image_type: t.uint16_t
file_version: t.uint32_t
stack_version: t.uint16_t
header_string: HeaderString
image_size: t.uint32_t
security_credential_version: t.uint8_t = t.StructField(
requires=lambda s: s.security_credential_version_present
)
upgrade_file_destination: t.EUI64 = t.StructField(
requires=lambda s: s.device_specific_file
)
minimum_hardware_version: HWVersion = t.StructField(
requires=lambda s: s.hardware_versions_present
)
maximum_hardware_version: HWVersion = t.StructField(
requires=lambda s: s.hardware_versions_present
)
@property
def security_credential_version_present(self) -> bool:
if self.field_control is None:
return None
return bool(
self.field_control & FieldControl.SECURITY_CREDENTIAL_VERSION_PRESENT
)
@property
def device_specific_file(self) -> bool:
if self.field_control is None:
return None
return bool(self.field_control & FieldControl.DEVICE_SPECIFIC_FILE_PRESENT)
@property
def hardware_versions_present(self) -> bool:
if self.field_control is None:
return None
return bool(self.field_control & FieldControl.HARDWARE_VERSIONS_PRESENT)
@property
def key(self):
return ImageKey(self.manufacturer_id, self.image_type)
@classmethod
def deserialize(cls, data) -> tuple[OTAImageHeader, bytes]:
hdr, data = super().deserialize(data)
if hdr.upgrade_file_id != cls.MAGIC_VALUE:
raise ValueError(
f"Wrong magic number for OTA Image: {hdr.upgrade_file_id!r}"
)
return hdr, data
class ElementTagId(t.enum16):
UPGRADE_IMAGE = 0x0000
ECDSA_SIGNATURE = 0x0001
ECDSA_SIGNING_CERTIFICATE = 0x0002
IMAGE_INTEGRITY_CODE = 0x0003
class LVBytes32(t.LVBytes):
_prefix_length = 4
class SubElement(t.Struct):
tag_id: ElementTagId
data: LVBytes32
class BaseOTAImage:
"""
Base OTA image container type. Not all images are valid Zigbee OTA images but are
nonetheless accepted by devices. Only requirement is that the image contains a valid
OTAImageHeader property and can be serialized/deserialized.
"""
header: OTAImageHeader
@classmethod
def deserialize(cls, data) -> tuple[BaseOTAImage, bytes]:
raise NotImplementedError() # pragma: no cover
def serialize(self):
raise NotImplementedError() # pragma: no cover
class OTAImage(t.Struct, BaseOTAImage):
"""
Zigbee OTA image according to 11.4 of the ZCL specification.
"""
header: OTAImageHeader
subelements: t.List[SubElement]
@classmethod
def deserialize(cls, data) -> tuple[OTAImage, bytes]:
hdr, data = OTAImageHeader.deserialize(data)
elements_len = hdr.image_size - hdr.header_length
if elements_len > len(data):
raise ValueError(f"Data is too short for {cls}")
image = cls(header=hdr, subelements=[])
element_data, data = data[:elements_len], data[elements_len:]
while element_data:
element, element_data = SubElement.deserialize(element_data)
image.subelements.append(element)
return image, data
def serialize(self):
res = super().serialize()
assert len(res) == self.header.image_size
return res
@attr.s
class HueSBLOTAImage(BaseOTAImage):
"""
Unique OTA image format for certain Hue devices. Starts with a valid header but does
not contain any valid subelements beyond that point.
"""
SUBELEMENTS_MAGIC = b"\x2A\x00\x01"
header = attr.ib(default=None)
data = attr.ib(default=None)
def serialize(self) -> bytes:
return self.header.serialize() + self.data
@classmethod
def deserialize(cls, data) -> tuple[HueSBLOTAImage, bytes]:
header, remaining_data = OTAImageHeader.deserialize(data)
firmware = remaining_data[: header.image_size - len(header.serialize())]
if len(data) < header.image_size:
raise ValueError(
f"Data is too short to contain image: {len(data)} < {header.image_size}"
)
if not firmware.startswith(cls.SUBELEMENTS_MAGIC):
raise ValueError(
f"Firmware does not start with expected magic bytes: {firmware[:10]!r}"
)
if header.manufacturer_id != 4107:
raise ValueError(
f"Only Hue images are expected. Got: {header.manufacturer_id}"
)
return cls(header=header, data=firmware), data[header.image_size :]
def parse_ota_image(data: bytes) -> tuple[BaseOTAImage, bytes]:
"""
Attempts to extract any known OTA image type from data. Does not validate firmware.
"""
if len(data) > 4 and int.from_bytes(data[0:4], "little") + 21 == len(data):
# Legrand OTA images are prefixed with their unwrapped size and include a 1 + 16
# byte suffix
return OTAImage.deserialize(data[4:-17])
elif data.startswith(b"NGIS"):
# IKEA container needs to be unwrapped
if len(data) <= 24:
raise ValueError(
f"Data too short to contain IKEA container header: {len(data)}"
)
offset = int.from_bytes(data[16:20], "little")
size = int.from_bytes(data[20:24], "little")
if len(data) <= offset + size:
raise ValueError(f"Data too short to be IKEA container: {len(data)}")
wrapped_data = data[offset : offset + size]
image, rest = OTAImage.deserialize(wrapped_data)
if rest:
LOGGER.warning(
"Fixing IKEA OTA image with trailing data (%s bytes)",
size - image.header.image_size,
)
image.header.image_size += len(rest)
# No other structure has been observed
assert len(image.subelements) == 1
assert image.subelements[0].tag_id == ElementTagId.UPGRADE_IMAGE
image.subelements[0].data += rest
rest = b""
return image, rest
try:
# Hue sbl-ota images start with a Zigbee OTA header but contain no valid
# subelements after that. Try it first.
return HueSBLOTAImage.deserialize(data)
except ValueError:
return OTAImage.deserialize(data)