-
Notifications
You must be signed in to change notification settings - Fork 918
/
backup.py
243 lines (187 loc) · 7.12 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import io
import json
import tarfile
import zlib
from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from mvt.common.utils import check_for_links, convert_unix_to_iso
PBKDF2_KEY_SIZE = 32
class AndroidBackupParsingError(Exception):
"""Exception raised file parsing an android backup file"""
class AndroidBackupNotImplemented(AndroidBackupParsingError):
pass
class InvalidBackupPassword(AndroidBackupParsingError):
pass
# TODO: Need to clean all the following code and conform it to the coding style.
def to_utf8_bytes(input_bytes):
output = []
for byte in input_bytes:
if byte < ord(b"\x80"):
output.append(byte)
else:
output.append(ord("\xef") | (byte >> 12))
output.append(ord("\xbc") | ((byte >> 6) & ord("\x3f")))
output.append(ord("\x80") | (byte & ord("\x3f")))
return bytes(output)
def parse_ab_header(data):
"""
Parse the header of an Android Backup file
Returns a dict {'backup': True, 'compression': False,
'encryption': "none", 'version': 4}
"""
if data.startswith(b"ANDROID BACKUP"):
[_, version, is_compressed, encryption, _] = data.split(b"\n", 4)
return {
"backup": True,
"compression": (is_compressed == b"1"),
"version": int(version),
"encryption": encryption.decode("utf-8"),
}
return {"backup": False, "compression": None, "version": None, "encryption": None}
def decrypt_master_key(
password,
user_salt,
user_iv,
pbkdf2_rounds,
master_key_blob,
format_version,
checksum_salt,
):
"""Generate AES key from user password uisng PBKDF2
The backup master key is extracted from the master key blog after decryption.
"""
# Derive key from password using PBKDF2.
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds
)
key = kdf.derive(password.encode("utf-8"))
# Decrypt master key blob.
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
decryptor = cipher.decryptor()
try:
decryted_master_key_blob = (
decryptor.update(master_key_blob) + decryptor.finalize()
)
# Extract key and IV from decrypted blob.
key_blob = io.BytesIO(decryted_master_key_blob)
master_iv_length = ord(key_blob.read(1))
master_iv = key_blob.read(master_iv_length)
master_key_length = ord(key_blob.read(1))
master_key = key_blob.read(master_key_length)
master_key_checksum_length = ord(key_blob.read(1))
master_key_checksum = key_blob.read(master_key_checksum_length)
except TypeError as exc:
raise InvalidBackupPassword() from exc
# Handle quirky encoding of master key bytes in Android original Java crypto code.
if format_version > 1:
hmac_mk = to_utf8_bytes(master_key)
else:
hmac_mk = master_key
# Derive checksum to confirm successful backup decryption.
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds
)
calculated_checksum = kdf.derive(hmac_mk)
if master_key_checksum != calculated_checksum:
raise InvalidBackupPassword()
return master_key, master_iv
def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version):
"""
Generate encryption keyffrom password and do decryption
"""
if encryption_algo != b"AES-256":
raise AndroidBackupNotImplemented("Encryption Algorithm not implemented")
if password is None:
raise InvalidBackupPassword()
[
user_salt,
checksum_salt,
pbkdf2_rounds,
user_iv,
master_key_blob,
encrypted_data,
] = encrypted_backup.split(b"\n", 5)
user_salt = bytes.fromhex(user_salt.decode("utf-8"))
checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8"))
pbkdf2_rounds = int(pbkdf2_rounds)
user_iv = bytes.fromhex(user_iv.decode("utf-8"))
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
# Derive decryption master key from password.
master_key, master_iv = decrypt_master_key(
password=password,
user_salt=user_salt,
user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds,
master_key_blob=master_key_blob,
format_version=format_version,
checksum_salt=checksum_salt,
)
# Decrypt and unpad backup data using derivied key.
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
decryptor = cipher.decryptor()
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(decrypted_tar)
def parse_backup_file(data, password=None):
"""
Parse an ab file, returns a tar file
"""
if not data.startswith(b"ANDROID BACKUP"):
raise AndroidBackupParsingError("Invalid file header")
[_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
version = int(version)
is_compressed = int(is_compressed)
if encryption_algo != b"none":
tar_data = decrypt_backup_data(
tar_data, password, encryption_algo, format_version=version
)
if is_compressed:
try:
tar_data = zlib.decompress(tar_data)
except zlib.error as exc:
raise AndroidBackupParsingError(
"Impossible to decompress the backup file"
) from exc
return tar_data
def parse_tar_for_sms(data):
"""
Extract SMS from a tar backup archive
Returns an array of SMS
"""
dbytes = io.BytesIO(data)
res = []
with tarfile.open(fileobj=dbytes) as tar:
for member in tar.getmembers():
if member.name.startswith("apps/com.android.providers.telephony/d_f/") and (
member.name.endswith("_sms_backup")
or member.name.endswith("_mms_backup")
):
dhandler = tar.extractfile(member)
res.extend(parse_sms_file(dhandler.read()))
return res
def parse_sms_file(data):
"""
Parse an SMS or MMS file extracted from a backup
Returns a list of SMS or MMS entries
"""
res = []
data = zlib.decompress(data)
json_data = json.loads(data)
for entry in json_data:
# Adapt MMS format to SMS format.
if "mms_body" in entry:
entry["body"] = entry["mms_body"]
entry.pop("mms_body")
message_links = check_for_links(entry["body"])
entry["isodate"] = convert_unix_to_iso(int(entry["date"]) / 1000)
entry["direction"] = "sent" if int(entry["date_sent"]) else "received"
# Extract links from the body
if message_links or entry["body"].strip() == "":
entry["links"] = message_links
res.append(entry)
return res