<a href="https://colab.research.google.com/github/justinlplummer/DemoRepo/blob/master/APKVulnerabilityChecker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
!pip install androguard
!apt-get install apktool -y
# --- Setup directories ---
# Define the base directory for reports
# Create directories if missing
os.makedirs("apks", exist_ok=True)
os.makedirs("reports", exist_ok=True) # Ensure the reports directory exists

print(f"Before continuing, APKs should be placed in: {'apks/'}")
print(f"Reports will be generated in: {'reports/'}")

In [10]:
# Full APK Vulnerability Scanner with Real Cloud Service Scans
# Google Colab compatible, no placeholders


import sys
import os
import re
import json
import zipfile
import shutil
import tempfile
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
from androguard.core.apk import APK
from tqdm import tqdm

from loguru import logger
logger.remove()

# Regex patterns to detect cloud services URLs or endpoints in strings extracted from APK
CLOUD_SERVICES = {
    'firebase': r'https?://[\w\-.]+\.firebaseio\.com',
    'aws': r'https?://s3[.-][a-z0-9-]*\.amazonaws\.com|https?://[\w\-]+\.s3\.amazonaws\.com',
    'gcp': r'https?://storage\.googleapis\.com|https?://[\w\-]+\.storage\.googleapis\.com',
    'azure': r'https?://[\w\-]+\.blob\.core\.windows\.net',
    'backblaze': r'https?://f[0-9]+\.backblazeb2\.com',
    'wasabi': r'https?://s3\.wasabisys\.com',
    'supabase': r'https?://[\w\-]+\.supabase\.co',
    'heroku': r'https?://[\w\-]+\.herokuapp\.com',
    'netlify': r'https?://[\w\-]+\.netlify\.app',
    'vercel': r'https?://[\w\-]+\.vercel\.app',
    'digitalocean': r'https?://[\w\-]+\.digitaloceanspaces\.com',
    'linode': r'https?://[\w\-]+\.linodeobjects\.com',
    'ibm': r'https?://[\w\-]+\.cloud-object-storage\.appdomain\.cloud',
    'oracle': r'https?://objectstorage\.[a-z\-]+\.oraclecloud\.com',
    'minio': r'https?://[\w\-\.]+/minio',
    'cloudflare': r'https?://[\w\-]+\.r2\.cloudflarestorage\.com',
    'parse': r'https?://[\w\-]+\.back4app\.io',
    'kinvey': r'https?://[\w\-]+\.kinvey\.com',
    'packetfabric': r'https?://[\w\-]+\.packetfabric\.com',
}

# Regex patterns for common API keys, secrets, tokens inside APK strings
SECRETS_REGEX = [
    (r"AIza[0-9A-Za-z\-_]{35}", "Google API Key"),
    (r"[a-zA-Z0-9_]{32,45}-us[0-9]+", "Mailchimp API Key"),
    (r"AKIA[0-9A-Z]{16}", "AWS Access Key ID"),
    (r"(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9])", "Generic API Key"),
    (r"(?<![a-z0-9])[a-z0-9]{32}(?![a-z0-9])", "Generic Secret"),
    # Facebook App Secret (typically 32 lowercase hex characters)
    (r"(?<![a-f0-9])[a-f0-9]{32}(?![a-f0-9])(?=[^a-zA-Z0-9]*facebook|fb_app_secret|app_secret)", "Facebook App Secret (Potential)"), # Added lookahead for context

    # Twitter Consumer Secret (typically 45 characters, alphanumeric + underscore/dash)
    (r"[0-9a-zA-Z\-_]{45}(?=[^a-zA-Z0-9]*(twitter|consumer_secret|oauth_consumer_secret))", "Twitter Consumer Secret (Potential)"), # Added lookahead for context

    # Google OAuth Client Secret (similar to generic secrets, but often has specific lengths or starts/ends differently)
    # This is tricky because Google Client Secrets for *web apps* are diverse.
    # If found in an APK, it's almost always a critical error.
    # Example: GOCSPX-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (can be 38 chars after prefix, or other forms)
    (r"GOCSPX-[0-9A-Za-z\-_]{38}", "Google OAuth Client Secret (Potential)"),
    # General pattern for 'client_secret' followed by a strong candidate value
    (r"(client_secret|oauth_client_secret|app_secret)[\s=:]*[\"']?([a-zA-Z0-9\-_]{32,64})[\"']?", "OAuth Client Secret (Generic Key-Value)"), # Catches common "key=value" pairs

    # Refresh Tokens (long-lived, highly sensitive) - often start with 1/ or 1//
    (r"1\/[0-9A-Za-z\-_]{40,}", "Google OAuth Refresh Token (Potential)"),
    (r"ya29\.[0-9A-Za-z\-_]+", "Google OAuth Access/Refresh Token (Potential)"),

    # Common Client IDs (less severe if found, but good to identify if context suggests misuse)
    # Google Client ID example: 123456789012-abcdefg12345.apps.googleusercontent.com
    (r"\b\d{12}-[\w]{20,}\.apps\.googleusercontent\.com\b", "Google OAuth Client ID"),
    # Facebook App ID (purely numeric, public)
    (r"\b(facebook_app_id|fb_app_id)[\s=:]*[\"']?(\d{15,17})[\"']?", "Facebook App ID"),
]

def extract_strings(file_path):
    # Extract readable ASCII strings 4+ chars long from APK binary
    with open(file_path, 'rb') as f:
        content = f.read()
    raw_strings = re.findall(rb'[\x20-\x7E]{4,}', content)
    return [s.decode(errors='ignore') for s in raw_strings]

def detect_cloud_services(strings):
    detected = {}
    for service, pattern in CLOUD_SERVICES.items():
        regex = re.compile(pattern, re.IGNORECASE)
        found_urls = set()
        for s in strings:
            matches = regex.findall(s)
            if matches:
                found_urls.update(matches)
        if found_urls:
            detected[service] = list(found_urls)
    return detected

def detect_secrets(strings):
    findings = []
    for s in strings:
        for pattern, name in SECRETS_REGEX:
            if re.search(pattern, s):
                findings.append({"type": name, "value": s})
    return findings



# =====================
# XML Vulnerability Scans
# =====================

def scan_manifest_vulnerabilities(decompiled_path, findings, app_id):
    """
    Scans AndroidManifest.xml for common vulnerabilities like debuggable flag,
    cleartext traffic, and insecurely exported components.
    """
    manifest_path = os.path.join(decompiled_path, "AndroidManifest.xml")
    if not os.path.exists(manifest_path):
        logger.warning(f"AndroidManifest.xml not found at {manifest_path}")
        return

    try:
        tree = ET.parse(manifest_path)
        root = tree.getroot()
        # Define Android namespace for XPath queries
        ns = {'android': 'http://schemas.android.com/apk/res/android'}

        # 1. Check for android:debuggable="true"
        application_tag = root.find('application')
        if application_tag is not None:
            debuggable = application_tag.get(f"{{{ns['android']}}}debuggable")
            if debuggable == 'true':
                findings.append({
                    "service": "AndroidManifest",
                    "url": "N/A",
                    "issue": "Application is debuggable",
                    "severity": "CRITICAL",
                    "remediation": "Set android:debuggable to 'false' in production builds. This exposes the app to debugging tools and potential data extraction."
                })

            # 2. Check for android:usesCleartextTraffic="true"
            uses_cleartext_traffic = application_tag.get(f"{{{ns['android']}}}usesCleartextTraffic")
            if uses_cleartext_traffic == 'true':
                findings.append({
                    "service": "AndroidManifest",
                    "url": "N/A",
                    "issue": "Application allows cleartext HTTP traffic",
                    "severity": "HIGH",
                    "remediation": "Ensure all network communication uses HTTPS. Cleartext traffic is vulnerable to eavesdropping and tampering. Consider a Network Security Configuration to enforce HTTPS."
                })

        # 3. Check for insecurely exported components
        exported_components = root.findall(".//*[@android:exported='true']", ns)
        for component in exported_components:
            # Check if component is an activity, service, receiver, or provider
            if component.tag in ['activity', 'service', 'receiver', 'provider']:
                # Check if a permission is explicitly defined for the component
                has_permission = component.get(f"{{{ns['android']}}}permission") is not None
                if not has_permission:
                    name = component.get(f"{{{ns['android']}}}name")
                    findings.append({
                        "service": "AndroidManifest",
                        "url": "N/A",
                        "issue": f"Exported {component.tag} '{name}' without explicit permission",
                        "severity": "CRITICAL",
                        "remediation": f"Either set android:exported='false' for {component.tag} '{name}' or add a strong android:permission attribute to restrict access by other applications."
                    })

    except ET.ParseError as e:
        logger.error(f"Error parsing AndroidManifest.xml: {e}")
        findings.append({
            "service": "AndroidManifest",
            "url": "N/A",
            "issue": f"Error parsing AndroidManifest.xml: {e}",
            "severity": "LOW",
            "remediation": "Check APK integrity or parsing logic."
        })
    except Exception as e:
        logger.error(f"Unexpected error in scan_manifest_vulnerabilities: {e}")


def scan_insecure_storage_patterns(decompiled_path, findings, app_id):
    """
    Scans common storage locations (like SharedPreferences XML files) for sensitive keywords.
    Note: This is a static analysis heuristic. Actual runtime data storage would require dynamic analysis.
    """
    sensitive_keywords = ["password", "token", "secret", "api_key", "credential", "auth", "sessionid", "pin"]

    # Common paths for SharedPreferences XML files in decompiled APK
    # These are usually under res/xml or sometimes in asset folders.
    # For runtime data, they'd be in /data/data/{package_name}/shared_prefs/
    # This static check focuses on bundled/default prefs or those found in resources.
    search_dirs = [
        os.path.join(decompiled_path, "res", "xml"),
        os.path.join(decompiled_path, "assets")
    ]

    for root_dir in search_dirs:
        if not os.path.exists(root_dir):
            continue
        for root, _, files in os.walk(root_dir):
            for file in files:
                if file.endswith(".xml") or file.endswith(".json"): # SharedPreferences are XML, but other configs might be JSON
                    file_path = os.path.join(root, file)
                    try:
                        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                            content = f.read().lower()
                            for keyword in sensitive_keywords:
                                if keyword in content:
                                    findings.append({
                                        "service": "Insecure Data Storage",
                                        "url": f"file://{os.path.relpath(file_path, decompiled_path)}",
                                        "issue": f"Sensitive keyword '{keyword}' found in {file}. Potential insecure storage.",
                                        "severity": "HIGH",
                                        "remediation": "Avoid storing sensitive data in plaintext in SharedPreferences or other accessible files. Use Android Keystore or encrypted storage solutions."
                                    })
                                    break # Only report once per file for the first keyword found
                    except Exception as e:
                        logger.warning(f"Could not read/parse {file_path}: {e}")

def scan_weak_cryptography(strings, findings, app_id):
    """
    Heuristically checks for mentions of weak cryptographic algorithms or hardcoded keys
    within extracted strings. Reports each *type* of weakness only once.
    """
    weak_algorithms_data = {
        "MD5": "MD5 is cryptographically broken and should not be used for hashing passwords or verifying integrity.",
        "SHA1": "SHA-1 is cryptographically broken and should not be used for digital signatures or integrity checks.",
        "DES": "Data Encryption Standard (DES) is insecure due to its small key size.",
        "RC4": "RC4 is a stream cipher with known vulnerabilities and should be avoided.",
        "ECB mode": "Electronic Codebook (ECB) mode is insecure for encryption as identical plaintext blocks produce identical ciphertext blocks.",
    }

    key_patterns_data = [
        ("hardcoded key (generic)", r"(key|secret|passphrase)[\s=:]*[\"']?([a-zA-Z0-9]{16,128})[\"']?"), # Generic key pattern
        ("hardcoded key (SecretKeySpec)", r"new SecretKeySpec\s*\((.*?)\)"), # Java SecretKeySpec constructor
    ]

    reported_weaknesses = set() # To store unique weakness identifiers (e.g., "MD5", "hardcoded key (generic)")

    for s in strings:
        # Check for weak algorithms
        for algo, desc in weak_algorithms_data.items():
            if algo.lower() in s.lower():
                if algo not in reported_weaknesses:
                    findings.append({
                        "service": "Weak Cryptography",
                        "url": "N/A",
                        "issue": f"Potential use of weak cryptographic algorithm: {algo}. {desc}",
                        "severity": "HIGH",
                        "remediation": f"Migrate to stronger algorithms (e.g., AES-256 with GCM, SHA-256/SHA-3 for hashing, PBKDF2/scrypt/bcrypt for passwords) and proper modes of operation (e.g., CBC, GCM)."
                    })
                    reported_weaknesses.add(algo)
                # No 'break' here, as a single string might contain multiple weak algorithms

        # Check for hardcoded keys (distinct from weak algorithms)
        for key_type, pattern in key_patterns_data:
            if re.search(pattern, s, re.IGNORECASE):
                if key_type not in reported_weaknesses:
                    findings.append({
                        "service": "Weak Cryptography",
                        "url": "N/A",
                        "issue": f"Potential {key_type} detected. This makes the key easily extractable.",
                        "severity": "CRITICAL",
                        "remediation": "Never hardcode encryption keys. Use Android Keystore, derive keys securely, or fetch them from a trusted backend."
                    })
                    reported_weaknesses.add(key_type)
                # No 'break' here, as a single string might contain multiple key patterns



# =====================
# Cloud service scanners
# Each takes (urls: list[str], findings: list, app_id: str)
# and appends dicts with findings to findings list
# =====================

def scan_firebase(urls, findings, app_id):
    for url in urls:
        test_url = url.rstrip('/') + '/.json'
        try:
            r = requests.get(test_url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Firebase",
                    "url": test_url,
                    "issue": "Publicly readable Firebase Realtime Database endpoint",
                    "severity": "CRITICAL",
                    "remediation": "Configure Firebase database rules to restrict public access."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Firebase",
                    "url": test_url,
                    "issue": "Firebase endpoint requires authentication (secure).",
                    "severity": "INFO",
                    "remediation": "No public exposure detected."
                })
            else:
                findings.append({
                    "service": "Firebase",
                    "url": test_url,
                    "issue": f"Unexpected HTTP status code {r.status_code}",
                    "severity": "LOW",
                    "remediation": "Investigate response status."
                })
        except Exception as e:
            findings.append({
                "service": "Firebase",
                "url": test_url,
                "issue": f"Error accessing Firebase endpoint: {e}",
                "severity": "LOW",
                "remediation": "Check network and URL correctness."
            })

def scan_aws(urls, findings, app_id):
    # Try to list S3 bucket if URL points to root or bucket endpoint
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200 and "<ListBucketResult" in r.text:
                findings.append({
                    "service": "AWS S3",
                    "url": url,
                    "issue": "Publicly listable S3 bucket",
                    "severity": "CRITICAL",
                    "remediation": "Apply bucket policies or block public access."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "AWS S3",
                    "url": url,
                    "issue": "S3 bucket access forbidden (likely secure)",
                    "severity": "INFO",
                    "remediation": "No public exposure detected."
                })
            else:
                findings.append({
                    "service": "AWS S3",
                    "url": url,
                    "issue": f"HTTP status {r.status_code} from bucket URL",
                    "severity": "LOW",
                    "remediation": "Review bucket permissions."
                })
        except Exception as e:
            findings.append({
                "service": "AWS S3",
                "url": url,
                "issue": f"Error accessing S3 bucket: {e}",
                "severity": "LOW",
                "remediation": "Check bucket URL and network."
            })

def scan_gcp(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            # For GCP buckets, if public listing allowed, response will contain <ListBucketResult> or JSON
            if r.status_code == 200:
                # Simplified heuristic: if response contains "items", bucket contents may be public
                if "items" in r.text or "ListBucketResult" in r.text:
                    findings.append({
                        "service": "Google Cloud Storage",
                        "url": url,
                        "issue": "Publicly readable GCP storage bucket",
                        "severity": "CRITICAL",
                        "remediation": "Set bucket IAM policies to restrict public access."
                    })
                else:
                    findings.append({
                        "service": "Google Cloud Storage",
                        "url": url,
                        "issue": "Bucket reachable but no clear listing detected",
                        "severity": "INFO",
                        "remediation": "Review bucket permissions manually."
                    })
            elif r.status_code == 403:
                findings.append({
                    "service": "Google Cloud Storage",
                    "url": url,
                    "issue": "Access forbidden, likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
            else:
                findings.append({
                    "service": "Google Cloud Storage",
                    "url": url,
                    "issue": f"HTTP status {r.status_code}",
                    "severity": "LOW",
                    "remediation": "Check bucket configuration."
                })
        except Exception as e:
            findings.append({
                "service": "Google Cloud Storage",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_azure(urls, findings, app_id):
    for url in urls:
        try:
            # Azure blob URL usually ends with container or blob name; test container-level access
            r = requests.get(url, timeout=10)
            if r.status_code == 200 and ("<?xml" in r.text and "<EnumerationResults" in r.text):
                findings.append({
                    "service": "Azure Blob Storage",
                    "url": url,
                    "issue": "Publicly listable Azure blob container",
                    "severity": "CRITICAL",
                    "remediation": "Restrict blob container public access."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Azure Blob Storage",
                    "url": url,
                    "issue": "Access forbidden (likely secure)",
                    "severity": "INFO",
                    "remediation": "No public exposure detected."
                })
            else:
                findings.append({
                    "service": "Azure Blob Storage",
                    "url": url,
                    "issue": f"Unexpected HTTP status {r.status_code}",
                    "severity": "LOW",
                    "remediation": "Review permissions."
                })
        except Exception as e:
            findings.append({
                "service": "Azure Blob Storage",
                "url": url,
                "issue": f"Error accessing blob storage: {e}",
                "severity": "LOW",
                "remediation": "Check network/URL."
            })

def scan_backblaze(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Backblaze B2",
                    "url": url,
                    "issue": "Accessible Backblaze B2 bucket (public files possible)",
                    "severity": "MEDIUM",
                    "remediation": "Check bucket policies and ACLs."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Backblaze B2",
                    "url": url,
                    "issue": "Access forbidden (likely secure)",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
            else:
                findings.append({
                    "service": "Backblaze B2",
                    "url": url,
                    "issue": f"HTTP status {r.status_code}",
                    "severity": "LOW",
                    "remediation": "Review bucket."
                })
        except Exception as e:
            findings.append({
                "service": "Backblaze B2",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_wasabi(urls, findings, app_id):
    # Wasabi is S3-compatible, reuse aws scan logic
    scan_aws(urls, findings, app_id)

def scan_supabase(urls, findings, app_id):
    for url in urls:
        # Supabase is a Postgres backend, check anon key presence or public access endpoint
        for url in urls:
            if "anon" in url.lower():
                findings.append({
                    "service": "Supabase",
                    "url": url,
                    "issue": "Supabase anon key or public API endpoint detected",
                    "severity": "HIGH",
                    "remediation": "Rotate anon keys and secure APIs."
                })
            else:
                findings.append({
                    "service": "Supabase",
                    "url": url,
                    "issue": "Supabase endpoint found, manual review needed",
                    "severity": "INFO",
                    "remediation": "Check supabase project security."
                })

def scan_heroku(urls, findings, app_id):
    for url in urls:
        # Scan for exposed debug pages or env vars (simple heuristic: check landing page content)
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200 and ("heroku" in r.text.lower() or "application error" in r.text.lower()):
                findings.append({
                    "service": "Heroku",
                    "url": url,
                    "issue": "Heroku app reachable; check for debug info exposure",
                    "severity": "MEDIUM",
                    "remediation": "Review Heroku app config and error page exposure."
                })
        except Exception:
            pass

def scan_netlify(urls, findings, app_id):
    for url in urls:
        # Check if .env or config files are accessible publicly
        env_url = url.rstrip('/') + "/.env"
        try:
            r = requests.get(env_url, timeout=10)
            if r.status_code == 200 and ("KEY=" in r.text or "SECRET" in r.text):
                findings.append({
                    "service": "Netlify",
                    "url": env_url,
                    "issue": "Exposed .env file with secrets",
                    "severity": "CRITICAL",
                    "remediation": "Remove .env from public web root."
                })
        except Exception:
            pass

def scan_vercel(urls, findings, app_id):
    for url in urls:
        # Check for common config files exposed
        try:
            config_url = url.rstrip('/') + "/api/_vercel_build_output"
            r = requests.get(config_url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Vercel",
                    "url": config_url,
                    "issue": "Potentially exposed Vercel build output",
                    "severity": "MEDIUM",
                    "remediation": "Secure build artifact endpoints."
                })
        except Exception:
            pass

def scan_digitalocean(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "DigitalOcean Spaces",
                    "url": url,
                    "issue": "Accessible DigitalOcean space; check ACLs",
                    "severity": "MEDIUM",
                    "remediation": "Review and tighten space permissions."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "DigitalOcean Spaces",
                    "url": url,
                    "issue": "Access forbidden; likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
        except Exception as e:
            findings.append({
                "service": "DigitalOcean Spaces",
                "url": url,
                "issue": f"Error accessing space: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_linode(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Linode Object Storage",
                    "url": url,
                    "issue": "Accessible Linode bucket; review ACLs",
                    "severity": "MEDIUM",
                    "remediation": "Review bucket permissions."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Linode Object Storage",
                    "url": url,
                    "issue": "Access forbidden; likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
        except Exception as e:
            findings.append({
                "service": "Linode Object Storage",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_ibm(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "IBM Cloud Object Storage",
                    "url": url,
                    "issue": "Accessible IBM Cloud bucket; review permissions",
                    "severity": "MEDIUM",
                    "remediation": "Review bucket IAM and CORS policies."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "IBM Cloud Object Storage",
                    "url": url,
                    "issue": "Access forbidden; likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
        except Exception as e:
            findings.append({
                "service": "IBM Cloud Object Storage",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_oracle(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Oracle Cloud Object Storage",
                    "url": url,
                    "issue": "Accessible Oracle bucket; review policies",
                    "severity": "MEDIUM",
                    "remediation": "Restrict public access."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Oracle Cloud Object Storage",
                    "url": url,
                    "issue": "Access forbidden; likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
        except Exception as e:
            findings.append({
                "service": "Oracle Cloud Object Storage",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_minio(urls, findings, app_id):
    for url in urls:
        try:
            # MinIO often exposes web dashboards on port 9000 or specific paths
            r = requests.get(url, timeout=10)
            if r.status_code == 200 and ("MinIO" in r.text or "minio" in r.text):
                findings.append({
                    "service": "MinIO",
                    "url": url,
                    "issue": "Accessible MinIO web dashboard or bucket",
                    "severity": "HIGH",
                    "remediation": "Secure MinIO endpoints and require authentication."
                })
        except Exception:
            pass

def scan_cloudflare(urls, findings, app_id):
    for url in urls:
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                findings.append({
                    "service": "Cloudflare R2 Storage",
                    "url": url,
                    "issue": "Accessible Cloudflare R2 bucket; check ACLs",
                    "severity": "MEDIUM",
                    "remediation": "Review bucket policies."
                })
            elif r.status_code == 403:
                findings.append({
                    "service": "Cloudflare R2 Storage",
                    "url": url,
                    "issue": "Access forbidden; likely secure",
                    "severity": "INFO",
                    "remediation": "No public exposure."
                })
        except Exception as e:
            findings.append({
                "service": "Cloudflare R2 Storage",
                "url": url,
                "issue": f"Error accessing bucket: {e}",
                "severity": "LOW",
                "remediation": "Check URL/network."
            })

def scan_parse(urls, findings, app_id):
    for url in urls:
        # Parse / Back4App backend service, check for open classes or exposed app id
        for url in urls:
            if "/classes/" in url:
                findings.append({
                    "service": "Parse / Back4App",
                    "url": url,
                    "issue": "Parse classes endpoint detected - check for public access",
                    "severity": "HIGH",
                    "remediation": "Secure Parse classes and API keys."
                })
            else:
                findings.append({
                    "service": "Parse / Back4App",
                    "url": url,
                    "issue": "Parse endpoint found - manual review needed",
                    "severity": "INFO",
                    "remediation": "Check backend security."
                })

def scan_kinvey(urls, findings, app_id):
    for url in urls:
        # Kinvey is a mobile backend as a service - look for exposed collections or app keys
        for url in urls:
            if "/appdata/" in url or "/appkey/" in url:
                findings.append({
                    "service": "Kinvey",
                    "url": url,
                    "issue": "Potential exposed Kinvey appdata or keys",
                    "severity": "HIGH",
                    "remediation": "Rotate keys and secure appdata endpoints."
                })
            else:
                findings.append({
                    "service": "Kinvey",
                    "url": url,
                    "issue": "Kinvey endpoint found - manual review needed",
                    "severity": "INFO",
                    "remediation": "Review Kinvey app security."
                })

def scan_packetfabric(urls, findings, app_id):
    for url in urls:
        findings.append({
            "service": "PacketFabric",
            "url": url,
            "issue": "Detected PacketFabric cloud endpoint — manual security review recommended",
            "severity": "INFO",
            "remediation": "Check PacketFabric access controls and permissions."
        })

# Map cloud services to their scanner functions
SCAN_FUNCTIONS = {
    "firebase": scan_firebase,
    "aws": scan_aws,
    "gcp": scan_gcp,
    "azure": scan_azure,
    "backblaze": scan_backblaze,
    "wasabi": scan_wasabi,
    "supabase": scan_supabase,
    "heroku": scan_heroku,
    "netlify": scan_netlify,
    "vercel": scan_vercel,
    "digitalocean": scan_digitalocean,
    "linode": scan_linode,
    "ibm": scan_ibm,
    "oracle": scan_oracle,
    "minio": scan_minio,
    "cloudflare": scan_cloudflare,
    "parse": scan_parse,
    "kinvey": scan_kinvey,
    "packetfabric": scan_packetfabric,
}

def extract_firebase_project_ids(decompiled_path):
    project_ids = set()

    # Look for google-services.json
    for root, _, files in os.walk(decompiled_path):
        for file in files:
            if file == 'google-services.json':
                try:
                    with open(os.path.join(root, file), 'r') as f:
                        data = json.load(f)
                        project_id = data.get("project_info", {}).get("project_id")
                        if project_id:
                            project_ids.add(project_id)
                except Exception:
                    pass

    # Search in smali or XML for firebaseio or firebaseapp
    for root, _, files in os.walk(decompiled_path):
        for file in files:
            if file.endswith('.smali') or file.endswith('.xml') or file.endswith('.json'):
                try:
                    with open(os.path.join(root, file), 'r', errors='ignore') as f:
                        content = f.read()
                        matches = re.findall(r'https?://([a-z0-9\-]+)\.firebaseio\.com', content)
                        matches += re.findall(r'https?://([a-z0-9\-]+)\.web\.app', content)
                        matches += re.findall(r'https?://([a-z0-9\-]+)\.firebaseapp\.com', content)
                        for match in matches:
                            project_ids.add(match)
                except Exception:
                    continue

    return list(project_ids)

def scan_firebase_project(project_id):
    findings = []

    base_urls = {
        "Realtime DB": f"https://{project_id}.firebaseio.com/.json",
        "Firestore": f"https://firestore.googleapis.com/v1/projects/{project_id}/databases/(default)/documents",
        "Storage": f"https://storage.googleapis.com/storage/v1/b/{project_id}.appspot.com/o",
        "Cloud Functions": f"https://{project_id}.cloudfunctions.net/"
    }

    for service, url in base_urls.items():
        try:
            r = requests.get(url, timeout=8)
            status_code = r.status_code # Renamed 'status' to 'status_code' to avoid conflict with the key 'status' in the dictionary below.
            if status_code == 200:
                findings.append({
                    "service": "Firebase " + service, # Added "Firebase " prefix for clarity in report
                    "url": url,
                    "issue": f"{service} is OPEN",
                    "severity": "CRITICAL",
                    "remediation": f"Configure Firebase {service} rules to restrict public access."
                })
            elif status_code in [401, 403]:
                findings.append({
                    "service": "Firebase " + service,
                    "url": url,
                    "issue": f"{service} requires authentication (secure).",
                    "severity": "INFO",
                    "remediation": "No public exposure detected."
                })
            elif status_code == 404:
                findings.append({
                    "service": "Firebase " + service,
                    "url": url,
                    "issue": f"{service} endpoint not found.",
                    "severity": "INFO",
                    "remediation": "Verify the endpoint URL or if the service is in use."
                })
            else:
                findings.append({
                    "service": "Firebase " + service,
                    "url": url,
                    "issue": f"Unexpected HTTP status code {status_code} for {service}",
                    "severity": "LOW",
                    "remediation": "Investigate response status."
                })
        except requests.exceptions.Timeout:
            findings.append({
                "service": "Firebase " + service,
                "url": url,
                "issue": f"Timeout connecting to {service} endpoint.",
                "severity": "LOW",
                "remediation": "Check network connectivity or if the endpoint is active."
            })
        except requests.exceptions.RequestException as e:
            findings.append({
                "service": "Firebase " + service,
                "url": url,
                "issue": f"Error accessing {service} endpoint: {str(e)}",
                "severity": "LOW",
                "remediation": "Check network and URL correctness."
            })

    return findings




# Main report generator
def generate_report(apk_path):
    tmpdir = tempfile.mkdtemp()
    app_id = os.path.splitext(os.path.basename(apk_path))[0]
    findings = []
    vulnerability_findings = []

    try:
        # Handle XAPK extraction if needed
        if apk_path.endswith(".xapk"):
            with zipfile.ZipFile(apk_path, 'r') as z:
                apk_files = [f for f in z.namelist() if f.endswith(".apk")]
                if apk_files:
                    z.extract(apk_files[0], tmpdir)
                    apk_path_extracted = os.path.join(tmpdir, apk_files[0])
                else:
                    raise Exception("No APK found inside XAPK")
        else:
            apk_path_extracted = apk_path

        # Decompile APK with apktool for resource scanning
        decompiled_dir = os.path.join(tmpdir, "decompiled")
        os.makedirs(decompiled_dir, exist_ok=True)
        os.system(f"apktool d -f -o {decompiled_dir} {apk_path_extracted} > /dev/null 2>&1")

        # Extract Firebase project IDs from decompiled APK
        project_ids = extract_firebase_project_ids(decompiled_dir)

        # Scan Firebase projects for exposure
        for pid in project_ids:
            firebase_findings = scan_firebase_project(pid)
            for finding in firebase_findings:
                findings.append({
                    "service": finding['service'], # Use the specific service name from scan_firebase_project
                    "url": finding['url'],
                    "issue": finding['issue'],
                    "severity": finding['severity'],
                    "remediation": finding['remediation']
                })


        apk = APK(apk_path_extracted)

        # Extract strings from APK file for scanning
        strings = extract_strings(apk_path_extracted)

        # --- NEW SCANNERS INTEGRATION ---
        scan_manifest_vulnerabilities(decompiled_dir, findings, app_id)
        scan_insecure_storage_patterns(decompiled_dir, findings, app_id)
        scan_weak_cryptography(strings, findings, app_id)
        # --- END NEW SCANNERS INTEGRATION ---

       # Detect secrets (API keys etc)
        secrets_found = detect_secrets(strings)
        aggragated_secrets = []
        for secret in secrets_found:
            if secret['type'] not in aggragated_secrets:
                aggragated_secrets.append(secret['type'])
                findings.append({
                    "service": "Secrets",
                    "url": "",
                    "issue": f"Possible secret/key detected: {secret['type']}",
                    "severity": "HIGH",
                    "remediation": "Remove secrets from code and use secure storage."
                })


        # Detect cloud services by regex on strings
        cloud_services_found = detect_cloud_services(strings)



        # Run cloud-specific scanners for each detected service
        for svc, urls in cloud_services_found.items():
            if svc in SCAN_FUNCTIONS:
                SCAN_FUNCTIONS[svc](urls, findings, app_id)

        # Generate HTML report
        report_dir = f"reports/{app_id}_report"
        os.makedirs(report_dir, exist_ok=True)

        report_path = os.path.join(report_dir, "report.html")
        with open(report_path, "w", encoding="utf-8") as f:
            f.write(f"<h1>Security Scan Report for {app_id}</h1>\n")
            f.write(f"<p>Scan Date: {datetime.utcnow().isoformat()} UTC</p>\n")
            f.write(f"<p><b>Package Name:</b> {apk.package}</p>\n")
            f.write(f"<p><b>Version:</b> {apk.get_androidversion_name() or 'Unknown'}</p>")
            f.write(f"<p><b>Version:</b> {apk.get_androidversion_code() or 'Unknown'}</p>")


            f.write("<h2>Detected Secrets</h2>\n")
            if secrets_found:
                f.write("<ul>\n")
                for secret in secrets_found:
                    f.write(f"<li>{secret['type']}: {secret['value'][:50]}...</li>\n")
                f.write("</ul>\n")
            else:
                f.write("<p>None found.</p>\n")

            f.write("<h2>Detected Cloud Services</h2>\n")
            if cloud_services_found:
                f.write("<ul>\n")
                for svc, urls in cloud_services_found.items():
                    f.write(f"<li>{svc} ({len(urls)} URL(s))</li>\n")
                f.write("</ul>\n")
            else:
                f.write("<p>No cloud services detected.</p>\n")

            f.write("<h2>Vulnerability Findings</h2>\n")
            if findings:
                f.write("<ul>\n")
                for issue in findings:
                    f.write(
                        f"<li><b>{issue['service']}</b> - {issue['url']} - "
                        f"{issue['issue']} <br>"
                        f"<b>Severity:</b> {issue['severity']} <br>"
                        f"<b>Remediation:</b> {issue['remediation']}</li>\n"
                    )
                f.write("</ul>\n")
            else:
                f.write("<p>No vulnerabilities detected.</p>\n")

        print(f"Report generated for {app_id} at {report_path}")

    except Exception as e:
        print(f"Error processing {apk_path}: {e}")

    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)


if __name__ == "__main__":
    apk_files = [os.path.join("apks", f) for f in os.listdir("apks") if f.lower().endswith((".apk", ".xapk"))]
    for apk_file in tqdm(apk_files, desc="Scanning APKs"):
        generate_report(apk_file)

    print("\n✅ Done! Check the 'reports/' folder for results.")


Scanning APKs:   0%|          | 0/1 [00:00<?, ?it/s]

Report generated for Grindr - Gay Dating & Chat_25.12.1_APKPure at reports/Grindr - Gay Dating & Chat_25.12.1_APKPure_report/report.html


Scanning APKs: 100%|██████████| 1/1 [01:55<00:00, 115.82s/it]



✅ Done! Check the 'reports/' folder for results.


In [1]:
import requests
from datetime import datetime

API_KEY = "AIzaSyDD5Ceh8j-a6Xw2R_seA7d5FZ5W09PcGkI"  # Replace with the API key you want to test

# Define Google APIs to test
google_api_tests = [
    {
        "name": "YouTube Data API",
        "url": f"https://www.googleapis.com/youtube/v3/videos?part=snippet&id=dQw4w9WgXcQ&key={API_KEY}"
    },
    {
        "name": "Geocoding API",
        "url": f"https://maps.googleapis.com/maps/api/geocode/json?address=1600+Amphitheatre+Parkway,+Mountain+View,+CA&key={API_KEY}"
    },
    {
        "name": "Google Maps Elevation API",
        "url": f"https://maps.googleapis.com/maps/api/elevation/json?locations=36.578581,-118.291994&key={API_KEY}"
    },
    {
        "name": "Google Maps Static API",
        "url": f"https://maps.googleapis.com/maps/api/staticmap?center=40.714224,-73.961452&zoom=15&size=600x300&key={API_KEY}"
    },
    {
        "name": "Google Translate API",
        "url": f"https://translation.googleapis.com/language/translate/v2?key={API_KEY}&q=hello&target=es"
    },
    {
        "name": "Google Custom Search API",
        "url": f"https://www.googleapis.com/customsearch/v1?q=test&key={API_KEY}&cx=017576662512468239146:omuauf_lfve"
    },
    {
        "name": "Google Books API",
        "url": f"https://www.googleapis.com/books/v1/volumes?q=isbn:0747532699&key={API_KEY}"
    },
    {
        "name": "Google Safe Browsing API",
        "url": f"https://safebrowsing.googleapis.com/v4/threatLists?key={API_KEY}"
    },
    {
        "name": "Google Fonts API",
        "url": f"https://www.googleapis.com/webfonts/v1/webfonts?key={API_KEY}"
    },
    {
        "name": "Google Calendar API",
        "url": f"https://www.googleapis.com/calendar/v3/users/me/calendarList?key={API_KEY}"
    },
    {
        "name": "Google Drive API",
        "url": f"https://www.googleapis.com/drive/v3/files?key={API_KEY}"
    },
    {
        "name": "Google Sheets API",
        "url": f"https://sheets.googleapis.com/v4/spreadsheets?key={API_KEY}"
    },
    {
        "name": "Google Vision API",
        "url": f"https://vision.googleapis.com/v1/images:annotate?key={API_KEY}"
    },
    {
        "name": "Google Speech-to-Text API",
        "url": f"https://speech.googleapis.com/v1/speech:recognize?key={API_KEY}"
    },
    {
        "name": "Google Natural Language API",
        "url": f"https://language.googleapis.com/v1/documents:analyzeEntities?key={API_KEY}"
    },
    {
        "name": "Firebase Realtime Database (generic)",
        "url": f"https://smule-com-api-project-293071437640.firebaseio.com/.json?key={API_KEY}"
    },
    {
        "name": "Google Tag Manager API",
        "url": f"https://www.googleapis.com/tagmanager/v2/accounts?key={API_KEY}"
    },
    {
        "name": "Google Cloud Billing API",
        "url": f"https://cloudbilling.googleapis.com/v1/billingAccounts?key={API_KEY}"
    },
    {
        "name": "anon account registry",
        "url": f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={API_KEY}"

    },
    {
        "name": "unauth login",
        "url": f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={API_KEY}"

    },
    {
        "name": "unauth acct lookup",
        "url": f"https://identitytoolkit.googleapis.com/v1/accounts:lookup?key={API_KEY}"

    }
]

print(f"🔍 Scanning Google API key: {API_KEY}")
print(f"📅 Timestamp: {datetime.utcnow().isoformat()} UTC\n")

for test in google_api_tests:
    name, url = test["name"], test["url"]
    try:
        response = requests.get(url)
        if response.status_code == 200:
            print(f"[✅ ACCESS] {name}")
        elif response.status_code == 403:
            print(f"[🔒 FORBIDDEN] {name} – Access denied")
        elif response.status_code == 400:
            print(f"[⚠️ BAD REQUEST] {name} – Key accepted, but request malformed")
        elif response.status_code == 404:
            print(f"[❓ NOT FOUND] {name} – Endpoint may not be enabled")
        else:
            print(f"[❗ UNEXPECTED] {name} – Status: {response.status_code}")
    except Exception as e:
        print(f"[❌ ERROR] {name} – {str(e)}")


🔍 Scanning Google API key: AIzaSyDD5Ceh8j-a6Xw2R_seA7d5FZ5W09PcGkI
📅 Timestamp: 2025-07-31T03:57:49.969553 UTC

[🔒 FORBIDDEN] YouTube Data API – Access denied
[✅ ACCESS] Geocoding API
[✅ ACCESS] Google Maps Elevation API
[🔒 FORBIDDEN] Google Maps Static API – Access denied
[🔒 FORBIDDEN] Google Translate API – Access denied
[🔒 FORBIDDEN] Google Custom Search API – Access denied
[🔒 FORBIDDEN] Google Books API – Access denied
[🔒 FORBIDDEN] Google Safe Browsing API – Access denied
[🔒 FORBIDDEN] Google Fonts API – Access denied
[❗ UNEXPECTED] Google Calendar API – Status: 401
[🔒 FORBIDDEN] Google Drive API – Access denied
[❓ NOT FOUND] Google Sheets API – Endpoint may not be enabled
[❓ NOT FOUND] Google Vision API – Endpoint may not be enabled
[❓ NOT FOUND] Google Speech-to-Text API – Endpoint may not be enabled
[❓ NOT FOUND] Google Natural Language API – Endpoint may not be enabled
[❗ UNEXPECTED] Firebase Realtime Database (generic) – Status: 423
[❗ UNEXPECTED] Google Tag Manager API – Sta