In [1]:
from pathlib import Path
import pandas as pd

REMOVABLE_DRIVE_PATH = Path("/run/media/mix/64B2AAFCB2AAD1BA/")
MALWARE_PATH = REMOVABLE_DRIVE_PATH / "APP"
NORMAL_PATH = REMOVABLE_DRIVE_PATH / "ExAPP"

In [2]:
from dataclasses import dataclass
import subprocess
from tempfile import TemporaryDirectory
import lxml.etree as etree
from tqdm.notebook import tqdm


@dataclass
class ManifestData:
    label: str
    pkg_name: str
    permissions: list[str]
    activities: list[str]
    services: list[str]
    receivers: list[str]
    providers: list[str]


def extract_android_manifest_data(manifest_path: Path):
    parser = etree.XMLParser(recover=True, no_network=True, ns_clean=True)
    tree = etree.parse(str(manifest_path), parser)
    permissions: list[str] = [
        e.attrib[name_key]
        for e in tree.xpath("//uses-permission")
        if (name_key := next((k for k in e.attrib.keys() if k.endswith("name")), None))
    ]
    activities: list[str] = [
        e.attrib[name_key]
        for e in tree.xpath("//activity")
        if (name_key := next((k for k in e.attrib.keys() if k.endswith("name")), None))
    ]
    services: list[str] = [
        e.attrib[name_key]
        for e in tree.xpath("//service")
        if (name_key := next((k for k in e.attrib.keys() if k.endswith("name")), None))
    ]
    receivers: list[str] = [
        e.attrib[name_key]
        for e in tree.xpath("//receiver")
        if (name_key := next((k for k in e.attrib.keys() if k.endswith("name")), None))
    ]
    providers: list[str] = [
        e.attrib[name_key]
        for e in tree.xpath("//provider")
        if (name_key := next((k for k in e.attrib.keys() if k.endswith("name")), None))
    ]
    application = tree.xpath("/manifest/application")[0]
    label = next(str(v) for k, v in application.attrib.items() if k.endswith("label"))
    if label.startswith("@string"):
        _, _, key = label.partition("/")
        label = next(
            (
                elem.text
                for file in (manifest_path.parent / "res").glob("**/strings.xml")
                for elem in etree.parse(str(file), parser).xpath("//string")
                if elem.attrib.get("name") == key
            ),
            label,
        )
    return ManifestData(
        label=label,
        pkg_name=next(e.attrib["package"] for e in tree.xpath("/manifest")),
        permissions=permissions,
        activities=activities,
        services=services,
        receivers=receivers,
        providers=providers,
    )


In [3]:
import sqlite3
import json
import hashlib

conn = sqlite3.connect("malware.db")
conn.executescript("""
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;

CREATE TABLE IF NOT EXISTS apps (
    pkg_name TEXT PRIMARY KEY,
    sha256 TEXT UNIQUE NOT NULL,
    label TEXT NOT NULL,
    permissions TEXT,
    activities TEXT,
    services TEXT,
    receivers TEXT,
    providers TEXT
);
CREATE INDEX IF NOT EXISTS apps_label ON apps(label);
CREATE UNIQUE INDEX IF NOT EXISTS apps_sha256 ON apps(sha256 COLLATE NOCASE);

CREATE TABLE IF NOT EXISTS files (
    pkg_name TEXT NOT NULL,
    path TEXT NOT NULL,
    sha256 TEXT NOT NULL,
    PRIMARY KEY (pkg_name, path),
    FOREIGN KEY (pkg_name) REFERENCES apps(pkg_name)
);
CREATE INDEX IF NOT EXISTS files_path ON files(path);
CREATE INDEX IF NOT EXISTS files_sha256 ON files(sha256 COLLATE NOCASE);

CREATE VIEW IF NOT EXISTS unique_files_per_app AS
    SELECT 
        pkg_name,
        json_group_array(
            json_object(
                'path', path, 
                'sha256', sha256
            )
        ) AS unique_files
    FROM 
        files
    WHERE 
        sha256 IN (
            SELECT sha256
            FROM files
            GROUP BY sha256
            HAVING COUNT(*) = 1
        )
    GROUP BY 
        pkg_name;

CREATE VIEW IF NOT EXISTS ranked_duplicate_files AS
    SELECT 
        sha256,
        COUNT(DISTINCT pkg_name) AS app_count,
        json_group_array(DISTINCT pkg_name) AS packages,
        (
            SELECT json_group_array(path)
            FROM files AS f2
            WHERE f2.sha256 = f1.sha256
        ) AS paths
    FROM 
        files AS f1
    GROUP BY 
        sha256
    HAVING 
        COUNT(DISTINCT pkg_name) > 1
    ORDER BY 
        app_count DESC,
        sha256;
""")


@dataclass
class ApplicationData:
    manifest: ManifestData
    files: dict[str, str]
    sha256: str


def sha256_checksum_file(file: Path):
    hasher = hashlib.sha256()
    with file.open("rb") as f:
        for chunk in f:
            hasher.update(chunk)
    return hasher.hexdigest()


def insert_app_data(data: ApplicationData):
    conn.execute("BEGIN")
    try:
        conn.execute(
            "INSERT INTO apps (pkg_name, sha256, label, permissions, activities, services, receivers, providers) "
            "VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(pkg_name) DO NOTHING",
            (
                data.manifest.pkg_name,
                data.sha256,
                data.manifest.label,
                json.dumps(data.manifest.permissions),
                json.dumps(data.manifest.activities),
                json.dumps(data.manifest.services),
                json.dumps(data.manifest.receivers),
                json.dumps(data.manifest.providers),
            ),
        )
        conn.executemany(
            "INSERT INTO files (pkg_name, path, sha256) VALUES (?, ?, ?) "
            "ON CONFLICT(pkg_name, path) DO NOTHING",
            (
                (data.manifest.pkg_name, path, sha256)
                for path, sha256 in data.files.items()
            ),
        )
    except sqlite3.IntegrityError:
        conn.rollback()
    else:
        conn.commit()


def create_file_hashes(app_dir: Path):
    file_hashes: dict[str, str] = {}
    files = [file for file in Path(app_dir).rglob("**/*") if file.is_file()]
    with tqdm(files, leave=False, unit="file", dynamic_ncols=True) as pbar:
        for file in pbar:
            file_hashes[str(file.relative_to(app_dir))] = sha256_checksum_file(file)
    return file_hashes

In [None]:
import os
from random import Random

total_files = sorted([
    file for path in (MALWARE_PATH, NORMAL_PATH) for file in path.glob("**/*.apk")
])
Random(b"developedByMix").shuffle(total_files)

progress_bar = tqdm(
    total_files,
    desc="Decompiling APKs",
    unit="APK",
    dynamic_ncols=True,
)


for apk in progress_bar:
    relative_path = apk.relative_to(REMOVABLE_DRIVE_PATH)
    progress_bar.set_postfix_str(str(relative_path))

    existing_app, *_ = conn.execute(
        "SELECT COUNT(*) FROM apps WHERE sha256 = ?",
        (app_sha256 := sha256_checksum_file(apk),),
    ).fetchone()
    if existing_app > 0:
        continue

    with TemporaryDirectory(prefix=apk.stem) as decompile_dir:
        result = subprocess.run(
            [
                "jadx",
                "--no-debug-info",
                "-j",
                str(os.cpu_count() or 1),
                "-d",
                decompile_dir,
                apk,
            ],
            env={**os.environ, "JAVA_OPTS": "-Xmx8g -XX:+UseG1GC"},
            check=False,
            capture_output=True,
            encoding="utf-8",
        )
        manifest_path = Path(decompile_dir) / "resources" / "AndroidManifest.xml"
        if not manifest_path.is_file():
            tqdm.write(
                f"Jadx failed to decompile the APK {relative_path}, "
                f"stdout: {result.stdout}, stderr: {result.stderr}"
            )
            continue
        try:
            manifest: ManifestData = extract_android_manifest_data(manifest_path)
        except Exception as e:
            tqdm.write(f"Failed to read manifest data from {relative_path}: {e}")
            continue
        progress_bar.set_description(
            f"{manifest.label} ({manifest.pkg_name}@{relative_path})"
        )
        file_hashes = create_file_hashes(Path(decompile_dir))
    insert_app_data(
        ApplicationData(manifest=manifest, files=file_hashes, sha256=app_sha256)
    )

In [None]:
new_df = pd.DataFrame(
    columns=[
        "label",
        "pkg_name",
        "permissions",
        "activities",
        "services",
        "receivers",
        "providers",
        "files",
    ]
)

cursor = conn.cursor()
cursor.execute(
    "SELECT label, pkg_name, permissions, activities, services, receivers, providers FROM apps"
)
for (
    label,
    pkg_name,
    permissions,
    activities,
    services,
    receivers,
    providers,
) in cursor.fetchall():
    unique_files, *_ = conn.execute(
        "SELECT unique_files FROM unique_files_per_app WHERE pkg_name = ?", (pkg_name,)
    ).fetchone()
    unique_files = (
        [item["path"] for item in json.loads(unique_files)] if unique_files else []
    )
    unique_files.sort()
    new_df.loc[len(new_df)] = {
        "label": label,
        "pkg_name": pkg_name,
        "permissions": json.loads(permissions),
        "activities": json.loads(activities),
        "services": json.loads(services),
        "receivers": json.loads(receivers),
        "providers": json.loads(providers),
        "files": unique_files,
    }

new_df.to_csv("apps.csv", index=False)

new_df