Skip to content

Commit

Permalink
Extract Android backups, yield devices instead of just echoing (#80)
Browse files Browse the repository at this point in the history
* Extract Android backups, yield devices instead of just echoing

This moves the functionality towards automatic config generation out of backups.
The extraction depends on code which is not available in pypi, and requires some
changes to be useful for us: bluec0re/android-backup-tools#1

* use logging for the database class, fix linting mistakes and some small changes based on code review

* Add a fork of python-backup-tools for backup extraction

Forked and modified from https://github.com/bluec0re/android-backup-tools
  • Loading branch information
rytilahti committed Oct 1, 2017
1 parent a6a8ecd commit e6e72e0
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 34 deletions.
226 changes: 226 additions & 0 deletions mirobo/android_backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This is a fork from original work by BlueC0re <coding@bluec0re.eu>
# https://github.com/bluec0re/android-backup-tools
# License: GPLv3
import tarfile
import zlib
import enum
import io
import pickle
import os
import binascii

try:
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto import Random
except ImportError:
AES = None


class CompressionType(enum.IntEnum):
NONE = 0
ZLIB = 1


class EncryptionType(enum.Enum):
NONE = 'none'
AES256 = 'AES-256'


class AndroidBackup:
def __init__(self, fname=None):
if fname:
self.open(fname)

def open(self, fname, mode='rb'):
self.fp = open(fname, mode)

def close(self):
self.fp.close()

def parse(self):
self.fp.seek(0)
magic = self.fp.readline()
assert magic == b'ANDROID BACKUP\n'
self.version = int(self.fp.readline().strip())
self.compression = CompressionType(int(self.fp.readline().strip()))
self.encryption = EncryptionType(self.fp.readline().strip().decode())

def is_encrypted(self):
return self.encryption == EncryptionType.AES256

def _decrypt(self, enc, password):
if AES is None:
raise ImportError("PyCrypto required")

user_salt, enc = enc.split(b'\n', 1)
user_salt = binascii.a2b_hex(user_salt)
ck_salt, enc = enc.split(b'\n', 1)
ck_salt = binascii.a2b_hex(ck_salt)
rounds, enc = enc.split(b'\n', 1)
rounds = int(rounds)
iv, enc = enc.split(b'\n', 1)
iv = binascii.a2b_hex(iv)
master_key, enc = enc.split(b'\n', 1)
master_key = binascii.a2b_hex(master_key)

user_key = PBKDF2(password, user_salt, dkLen=256//8, count=rounds)
cipher = AES.new(user_key,
mode=AES.MODE_CBC,
IV=iv)

master_key = list(cipher.decrypt(master_key))
l = master_key.pop(0)
master_iv = bytes(master_key[:l])
master_key = master_key[l:]
l = master_key.pop(0)
mk = bytes(master_key[:l])
master_key = master_key[l:]
l = master_key.pop(0)
master_ck = bytes(master_key[:l])

# gen checksum

# double encode utf8
utf8mk = self.encode_utf8(mk)
calc_ck = PBKDF2(utf8mk, ck_salt, dkLen=256//8, count=rounds)
assert calc_ck == master_ck

cipher = AES.new(mk,
mode=AES.MODE_CBC,
IV=master_iv)

dec = cipher.decrypt(enc)
pad = dec[-1]

return dec[:-pad]

@staticmethod
def encode_utf8(mk):
utf8mk = mk.decode('raw_unicode_escape')
utf8mk = list(utf8mk)
for i in range(len(utf8mk)):
c = ord(utf8mk[i])
# fix java encoding (add 0xFF00 to non ascii chars)
if 0x7f < c < 0x100:
c += 0xff00
utf8mk[i] = chr(c)
return ''.join(utf8mk).encode('utf-8')

def _encrypt(self, dec, password):
if AES is None:
raise ImportError("PyCrypto required")

master_key = Random.get_random_bytes(32)
master_salt = Random.get_random_bytes(64)
user_salt = Random.get_random_bytes(64)
master_iv = Random.get_random_bytes(16)
user_iv = Random.get_random_bytes(16)
rounds = 10000

l = len(dec)
pad = 16 - (l % 16)
dec += bytes([pad] * pad)
cipher = AES.new(master_key, IV=master_iv, mode=AES.MODE_CBC)
enc = cipher.encrypt(dec)

master_ck = PBKDF2(self.encode_utf8(master_key),
master_salt, dkLen=256//8, count=rounds)

user_key = PBKDF2(password,
user_salt, dkLen=256//8, count=rounds)

master_dec = b"\x10" + master_iv + b"\x20" + master_key + b"\x20" + master_ck
l = len(master_dec)
pad = 16 - (l % 16)
master_dec += bytes([pad] * pad)
cipher = AES.new(user_key, IV=user_iv, mode=AES.MODE_CBC)
master_enc = cipher.encrypt(master_dec)

enc = binascii.b2a_hex(user_salt).upper() + b"\n" + \
binascii.b2a_hex(master_salt).upper() + b"\n" + \
str(rounds).encode() + b"\n" + \
binascii.b2a_hex(user_iv).upper() + b"\n" + \
binascii.b2a_hex(master_enc).upper() + b"\n" + enc

return enc

def read_data(self, password):
"""Reads from the file and returns a TarFile object."""
data = self.fp.read()

if self.encryption == EncryptionType.AES256:
if password is None:
raise Exception("Password need to be provided to extract encrypted archives")
data = self._decrypt(data, password)

if self.compression == CompressionType.ZLIB:
data = zlib.decompress(data, zlib.MAX_WBITS)

tar = tarfile.TarFile(fileobj=io.BytesIO(data))
return tar

def unpack(self, target_dir=None, password=None):
tar = self.read_data(password)

members = tar.getmembers()

if target_dir is None:
target_dir = os.path.basename(self.fp.name) + '_unpacked'
pickle_fname = os.path.basename(self.fp.name) + '.pickle'
if not os.path.exists(target_dir):
os.mkdir(target_dir)

tar.extractall(path=target_dir)

with open(pickle_fname, 'wb') as fp:
pickle.dump(members, fp)

def list(self, password=None):
tar = self.read_tar(password)
return tar.list()

def pack(self, fname, password=None):
target_dir = os.path.basename(fname) + '_unpacked'
pickle_fname = os.path.basename(fname) + '.pickle'

data = io.BytesIO()
tar = tarfile.TarFile(name=fname,
fileobj=data,
mode='w',
format=tarfile.PAX_FORMAT)

with open(pickle_fname, 'rb') as fp:
members = pickle.load(fp)

os.chdir(target_dir)
for member in members:
if member.isreg():
tar.addfile(member, open(member.name, 'rb'))
else:
tar.addfile(member)

tar.close()

data.seek(0)
if self.compression == CompressionType.ZLIB:
data = zlib.compress(data.read())
if self.encryption == EncryptionType.AES256:
data = self._encrypt(data, password)

with open(fname, 'wb') as fp:
fp.write(b'ANDROID BACKUP\n')
fp.write('{}\n'.format(self.version).encode())
fp.write('{:d}\n'.format(self.compression).encode())
fp.write('{}\n'.format(self.encryption.value).encode())

fp.write(data)

def __exit__(self, *args, **kwargs):
self.close()

def __enter__(self):
self.parse()
return self
102 changes: 68 additions & 34 deletions mirobo/extract_tokens.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import logging
import click
import tarfile
import tempfile
import sqlite3
from Crypto.Cipher import AES
from pprint import pformat as pf
import attr
from .android_backup import AndroidBackup

logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)


@attr.s
class DeviceConfig:
name = attr.ib()
mac = attr.ib()
ip = attr.ib()
token = attr.ib()
model = attr.ib()


class BackupDatabaseReader:
def __init__(self, dump_all=False, dump_raw=False):
self.dump_all = dump_all
def __init__(self, dump_raw=False):
self.dump_raw = dump_raw

@staticmethod
def dump_raw(dev):
raw = {k: dev[k] for k in dev.keys()}
click.echo(pf(raw))
_LOGGER.info(pf(raw))

@staticmethod
def decrypt_ztoken(ztoken):
Expand All @@ -29,7 +42,7 @@ def decrypt_ztoken(ztoken):
return token.decode()

def read_apple(self):
click.echo("Reading tokens from Apple DB")
_LOGGER.info("Reading tokens from Apple DB")
c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';")
for dev in c.fetchall():
if self.dump_raw:
Expand All @@ -39,11 +52,12 @@ def read_apple(self):
model = dev['ZMODEL']
name = dev['ZNAME']
token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN'])
if ip or self.dump_all:
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac))

config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token)
yield config

def read_android(self):
click.echo("Reading tokens from Android DB")
_LOGGER.info("Reading tokens from Android DB")
c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';")
for dev in c.fetchall():
if self.dump_raw:
Expand All @@ -53,57 +67,77 @@ def read_android(self):
model = dev['model']
name = dev['name']
token = dev['token']
if ip or self.dump_all:
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac))

def dump_to_file(self, fp):
fp.open()
self.db.seek(0) # go to the beginning
click.echo("Saving db to %s" % fp)
fp.write(self.db.read())
config = DeviceConfig(name=name, ip=ip, mac=mac,
model=model, token=token)
yield config

def read_tokens(self, db):
self.db = db
_LOGGER.info("Reading database from %s" % db)
self.conn = sqlite3.connect(db)

self.conn.row_factory = sqlite3.Row
with self.conn:
is_android = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
is_apple = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
if is_android:
self.read_android()
yield from self.read_android()
elif is_apple:
self.read_apple()
yield from self.read_apple()
else:
click.echo("Error, unknown database type!")
_LOGGER.error("Error, unknown database type!")


@click.command()
@click.argument('backup')
@click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging')
@click.option('--dump-all', is_flag=True, default=False, help='dump devices without ip addresses')
@click.option('--write-to-disk', type=click.File('wb'),
help='writes sqlite3 db to a file for debugging')
@click.option('--password', type=str,
help='password if the android database is encrypted')
@click.option('--dump-all', is_flag=True, default=False,
help='dump devices without ip addresses')
@click.option('--dump-raw', is_flag=True, help='dumps raw rows')
def main(backup, write_to_disk, dump_all, dump_raw):
def main(backup, write_to_disk, password, dump_all, dump_raw):
"""Reads device information out from an sqlite3 DB.
If the given file is a .tar file, the file will be extracted
and the database automatically located (out of Android backups).
If the given file is an Android backup (.ab), the database
will be extracted automatically.
If the given file is an iOS backup, the tokens will be
extracted (and decrypted if needed) automatically.
"""
reader = BackupDatabaseReader(dump_all, dump_raw)
if backup.endswith(".tar"):

reader = BackupDatabaseReader(dump_raw)
if backup.endswith(".ab"):
DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db"
with tarfile.open(backup) as f:
click.echo("Opened %s" % backup)
db = f.extractfile(DBFILE)
with tempfile.NamedTemporaryFile() as fp:
click.echo("Extracting to %s" % fp.name)
with AndroidBackup(backup) as f:
tar = f.read_data(password)
try:
db = tar.extractfile(DBFILE)
except KeyError as ex:
click.echo("Unable to extract the database file %s: %s" % (DBFILE, ex))
return
if write_to_disk:
file = write_to_disk
else:
file = tempfile.NamedTemporaryFile()
with file as fp:
click.echo("Saving database to %s" % fp.name)
fp.write(db.read())
if write_to_disk:
reader.dump_to_file(write_to_disk)

reader.read_tokens(fp.name)
devices = list(reader.read_tokens(fp.name))
else:
reader.read_tokens(backup)
devices = list(reader.read_tokens(backup))

for dev in devices:
if dev.ip or dump_all:
click.echo("%s\n"
"\tModel: %s\n"
"\tIP address: %s\n"
"\tToken: %s\n"
"\tMAC: %s" % (dev.name, dev.model,
dev.ip, dev.token, dev.mac))


if __name__ == "__main__":
Expand Down

0 comments on commit e6e72e0

Please sign in to comment.