-
Notifications
You must be signed in to change notification settings - Fork 81
/
ubi.py
142 lines (112 loc) · 5.02 KB
/
ubi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import statistics
from typing import Optional
from structlog import get_logger
from unblob.extractors import Command
from ...file_utils import InvalidInputFormat, get_endian, iterate_patterns
from ...iter_utils import get_intervals
from ...models import File, Handler, HexString, StructHandler, ValidChunk
logger = get_logger()
class UBIFSHandler(StructHandler):
NAME = "ubifs"
_BIG_ENDIAN_MAGIC = 0x06_10_18_31
# TODO: At the moment, we only match on the UBIFS superblock. Do we also want to account for
# cases where the first node isn't a UBIFS superblock? Would such a layout actually be valid?
# It might be valid to be flagged, but not necessarily to be extracted.
#
# Since we are running the handlers against every single match, regardless of whether a
# previous chunk has already been established. That means that, for example, if we find a
# superblock, and then many other kinds of nodes, it will take forever to run caculate_chunk()
# against all the other nodes, and we waste loads of time and resources.
# magic (4 bytes), 16 bytes, node type (1 byte, 0x06 is superblock),
# group type (1 byte), 2 nulls.
PATTERNS = [
HexString("31 18 10 06 [16] 06 ( 00 | 01 | 02 ) 00 00"), # LE
HexString("06 10 18 31 [16] 06 ( 00 | 01 | 02 ) 00 00"), # BE
]
C_DEFINITIONS = r"""
typedef struct ubifs_ch {
uint32 magic;
uint32 crc;
uint64 sqnum;
uint32 len;
uint8 node_type;
uint8 group_type;
uint8 padding[2];
} ubifs_ch_t;
typedef struct ubifs_sb_node {
ubifs_ch_t ch;
uint8 padding[2];
uint8 key_hash;
uint8 key_fmt;
uint32 flags;
uint32 min_io_size;
uint32 leb_size;
uint32 leb_cnt;
uint32 max_leb_cnt;
uint64 max_bud_bytes;
uint32 log_lebs;
uint32 lpt_lebs;
uint32 orph_lebs;
uint32 jhead_cnt;
uint32 fanout;
uint32 lsave_cnt;
uint32 fmt_version;
uint16 default_compr;
uint8 padding1[2];
uint32 rp_uid;
uint32 rp_gid;
uint64 rp_size;
uint32 time_gran;
uint8 uuid[16];
uint32 ro_compat_version;
uint8 hmac[64];
uint8 hmac_wkm[64];
uint16 hash_algo;
uint8 hash_mst[64];
uint8 padding2[3774];
} ubifs_sb_node_t;
"""
HEADER_STRUCT = "ubifs_sb_node_t"
EXTRACTOR = Command("ubireader_extract_files", "{inpath}", "-o", "{outdir}")
def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
endian = get_endian(file, self._BIG_ENDIAN_MAGIC)
sb_header = self.parse_header(file, endian)
# At the moment we are only matching on superblock nodes, so we can get the size of the
# chunk from the LEB size * LEB count.
ubifs_length = sb_header.leb_size * sb_header.leb_cnt
return ValidChunk(
start_offset=start_offset,
end_offset=start_offset + ubifs_length,
)
class UBIHandler(Handler):
NAME = "ubi"
_UBI_EC_HEADER = b"UBI#"
PATTERNS = [HexString("55 42 49 23 01 // UBI# and version 1")]
EXTRACTOR = Command("ubireader_extract_images", "{inpath}", "-o", "{outdir}")
def _guess_peb_size(self, file: File) -> int:
# Since we don't know the PEB size, we need to guess it. At the moment we just find the
# most common interval between every erase block header we find in the image. This _might_
# cause an issue if we had a blob containing multiple UBI images, with different PEB sizes.
all_ubi_eraseblock_offsets = list(iterate_patterns(file, self._UBI_EC_HEADER))
offset_intervals = get_intervals(all_ubi_eraseblock_offsets)
if not offset_intervals:
raise InvalidInputFormat
return statistics.mode(offset_intervals)
def _walk_ubi(self, file: File, peb_size: int) -> int:
"""Walk from the start_offset, at PEB-sized intervals, until we don't hit an erase block."""
while True:
offset = file.tell()
first_bytes = file.read(len(self._UBI_EC_HEADER))
if first_bytes == b"" or first_bytes != self._UBI_EC_HEADER:
break
file.seek(offset + peb_size)
return offset
def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
peb_size = self._guess_peb_size(file)
logger.debug("Guessed UBI PEB size", size=peb_size)
file.seek(start_offset)
# We don't want to parse headers, because we don't know what third party tools are doing,
# and it would be too expensive to validate the CRC and/or calculate all of the headers
# This is good enough and way faster than parsing headers
end_offset = self._walk_ubi(file, peb_size)
return ValidChunk(start_offset=start_offset, end_offset=end_offset)