In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
import time
import random
import ssl
import socket

# --- User-Agent Pool (rotates per request) ---
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:119.0) Gecko/20100101 Firefox/119.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36",
]

# Suspicious TLD list
SUSPICIOUS_TLDS = {"xyz", "tk", "top", "click", "link", "info"}

# Retry settings
MAX_RETRIES = 3
RETRY_BACKOFF = [1, 3, 5]  # seconds


# --- Headers ---
def get_headers():
    """Return randomized browser-like headers."""
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
    }


# --- Network Fetching ---
def fetch_url(url):
    """Fetch URL with retries and browser-like headers."""
    for attempt in range(MAX_RETRIES):
        try:
            response = requests.get(url, headers=get_headers(), timeout=10)
            response.raise_for_status()
            # add jitter sleep between requests
            time.sleep(random.uniform(1, 3))
            return response
        except requests.exceptions.RequestException as e:
            wait_time = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)]
            print(f"[WARN] Fetch failed ({e}), retrying in {wait_time}s...")
            time.sleep(wait_time + random.random())
    print(f"[ERROR] Could not fetch {url} after {MAX_RETRIES} retries.")
    return None


# --- Helpers ---
def is_external_link(base_url, link_url):
    """Check if a link is external vs internal."""
    base_domain = urlparse(base_url).netloc
    link_domain = urlparse(link_url).netloc
    return base_domain != "" and base_domain != link_domain


def get_links(url):
    """Extract all valid links from a webpage."""
    response = fetch_url(url)
    if not response:
        return [], url
    try:
        soup = BeautifulSoup(response.text, "html.parser")
        links = []
        for a in soup.find_all("a", href=True):
            href = urljoin(url, a["href"])  # make absolute
            if href.startswith("http"):
                links.append(href)
        return links, response.url
    except Exception as e:
        print(f"[ERROR] Parsing failed for {url} -> {e}")
        return [], url


def check_ssl_certificate(domain):
    """Check SSL validity of a domain (basic)."""
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((domain, 443), timeout=5) as sock:
            with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
                cert = ssock.getpeercert()
                return True if cert else False
    except Exception:
        return False


def has_suspicious_tld(domain):
    """Check if domain has suspicious TLD."""
    try:
        tld = domain.split(".")[-1].lower()
        return tld in SUSPICIOUS_TLDS
    except Exception:
        return False


# --- Main Analyzer ---
def check_page_health(url, depth=1):
    """Check page features and classify as Healthy/Unhealthy, including external links."""
    visited = set()
    report = {}

    def analyze(u, d, parent=None):
        if u in visited or d > depth:
            return
        visited.add(u)

        parsed = urlparse(u)
        domain = parsed.netloc

        # --- Feature extraction ---
        ssl_ok = check_ssl_certificate(domain) if parsed.scheme == "https" else False
        bad_tld = has_suspicious_tld(domain)
        links, final_url = get_links(u)

        num_links = len(links)
        external_links = [l for l in links if is_external_link(u, l)]
        external_ratio = (len(external_links) / num_links) if num_links > 0 else 0

        # --- Classification Rules ---
        if bad_tld:
            status = "Unhealthy (Suspicious TLD)"
        elif (not ssl_ok and parsed.scheme == "https"):
            status = "Unhealthy (Invalid SSL)"
        elif (parsed.scheme != "https"):
            status = "Unhealthy ()"
        elif external_ratio > 0.7:
            status = "Unhealthy (Too Many External Links)"
        elif num_links > 50:
            status = "Unhealthy (Suspiciously High Link Count)"
        elif num_links == 0:
            status = "Unhealthy (No Links / Fetch Failed)"
        else:
            status = "Healthy"

        # --- Store report ---
        report[u] = {
            "domain": domain,
            "num_links": num_links,
            "external_ratio": round(external_ratio, 2),
            "ssl_valid": ssl_ok,
            "suspicious_tld": bad_tld,
            "classification": status,
            "parent_page": parent,
            "external_links": external_links[:10],  # limit list size for readability
        }

        # --- Recursive Analysis: both internal + external links ---
        for link in links:
            analyze(link, d + 1, parent=u)

    analyze(url, 0)
    return report


# --- Example Usage ---
if __name__ == "__main__":
    test_url = "http://www.garage-pirenne.be/index.php?option=com_content&view=article&id=70&vsig70_0=15"  # replace with phishing or legit URLs
    results = check_page_health(test_url, depth=1)

    print("\n=== Scan Report ===")
    for page, features in results.items():
        print(f"\nPage: {page}")
        for k, v in features.items():
            print(f"  {k}: {v}")



=== Scan Report ===

Page: http://www.garage-pirenne.be/index.php?option=com_content&view=article&id=70&vsig70_0=15
  domain: www.garage-pirenne.be
  num_links: 1
  external_ratio: 0.0
  ssl_valid: False
  suspicious_tld: False
  classification: Unhealthy ()
  parent_page: None
  external_links: []

Page: http://www.garage-pirenne.be/index.php?option=com_content&view=article&id=70&vsig70_0=15&tr_uuid=20250830-1740-4116-a51e-22bffc1781d7&fp=-3
  domain: www.garage-pirenne.be
  num_links: 4
  external_ratio: 0.75
  ssl_valid: False
  suspicious_tld: False
  classification: Unhealthy ()
  parent_page: http://www.garage-pirenne.be/index.php?option=com_content&view=article&id=70&vsig70_0=15
  external_links: ['http://www.above.com/marketplace/garage-pirenne.be', 'http://www.above.com/marketplace/garage-pirenne.be', 'https://www.sedo.com/services/parking.php3']


### **Lexcial and Host Features**

In [None]:
#!/usr/bin/env python3
"""
Real-time Phishing Detector
- Lexical features + Host features + content checks
- Rotating headers, retries, jitter
- External link validation (1-hop)
- Separate handling/classification for:
    * Non-HTTPS pages -> Unhealthy (No HTTPS)
    * HTTPS with invalid/expired SSL -> Unhealthy (Invalid SSL)
- Exports CSV for ML

Requirements (install if missing):
    pip install requests beautifulsoup4 tldextract python-whois
"""

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
import tldextract
import whois
import socket
import ssl
import time
import random
import re
import csv
import datetime
import math
from difflib import SequenceMatcher

# ----------------------------
# Config
# ----------------------------
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:119.0) Gecko/20100101 Firefox/119.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36",
]

SUSPICIOUS_TLDS = {"xyz", "tk", "top", "click", "link", "info", "pw", "cn"}
KEYWORD_LIST = {"login", "secure", "verify", "update", "bank", "free", "account", "signin"}
BLACKLIST = set()  # populate with known bad domains if you have a list

MAX_RETRIES = 3
RETRY_BACKOFF = [1, 3, 6]  # seconds
REQUEST_TIMEOUT = 10
SLEEP_JITTER = (0.8, 2.5)  # seconds between requests to appear human
MAX_EXTERNAL_LISTED = 20  # how many external links to store in report for readability

# domains to ignore/noise (analytics/ads) to reduce noise
SKIP_DOMAINS_CONTAINS = ["google", "googlesyndication", "facebook", "doubleclick", "googletagmanager", "instagram", "twitter"]


# ----------------------------
# Networking helpers
# ----------------------------
def get_headers():
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
    }


def fetch_url(url):
    """Fetch URL with retries and random headers. Returns requests.Response or None."""
    for attempt in range(MAX_RETRIES):
        try:
            resp = requests.get(url, headers=get_headers(), timeout=REQUEST_TIMEOUT)
            resp.raise_for_status()
            time.sleep(random.uniform(*SLEEP_JITTER))
            return resp
        except requests.exceptions.RequestException as e:
            wait = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)]
            print(f"[WARN] Fetch failed ({e}) for {url}, retrying in {wait}s...")
            time.sleep(wait + random.random())
    print(f"[ERROR] Could not fetch {url} after {MAX_RETRIES} retries.")
    return None


# ----------------------------
# Link extraction & filtering
# ----------------------------
BAD_HREFS = {"", "#", "/", "/undefined"}


def is_junk_href(href):
    if not href:
        return True
    href = href.strip()
    lower = href.lower()
    if lower in BAD_HREFS:
        return True
    if lower.startswith(("javascript:", "mailto:", "tel:")):
        return True
    return False


def get_links(base_url):
    """Return (links_list, final_url). links filtered to valid http(s) absolute URLs."""
    resp = fetch_url(base_url)
    if not resp:
        return [], base_url
    try:
        soup = BeautifulSoup(resp.text, "html.parser")
    except Exception as e:
        print(f"[ERROR] Couldn't parse HTML for {base_url}: {e}")
        return [], resp.url

    links = []
    for a in soup.find_all("a", href=True):
        href = a["href"].strip()
        if is_junk_href(href):
            continue
        abs_url = urljoin(base_url, href)
        parsed = urlparse(abs_url)
        if parsed.scheme not in ("http", "https"):
            continue
        # skip noise domains
        if any(x in parsed.netloc for x in SKIP_DOMAINS_CONTAINS):
            continue
        links.append(abs_url)
    return links, resp.url


# ----------------------------
# Lexical feature helpers
# ----------------------------
IP_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")


def url_length(url):
    return len(url)


def num_subdomains(domain):
    # Use tldextract to get subdomain reliably
    ext = tldextract.extract(domain)
    sub = ext.subdomain
    if not sub:
        return 0
    return len(sub.split("."))


def count_special_chars(url):
    specials = ['@', '-', '_', '%', '=', '&']
    # ignore query separators if it's a "normal" parameter-rich URL
    parsed = urlparse(url)
    path_plus = parsed.netloc + parsed.path
    # count occurrences in path+netloc (not counting query string bias)
    return sum(path_plus.count(ch) for ch in specials)


def contains_ip(domain):
    return bool(IP_RE.match(domain))


def typosquat_score(domain, whitelist=None):
    # returns True if suspiciously similar to a whitelist domain
    if not whitelist:
        whitelist = ["google.com", "facebook.com", "amazon.com", "paypal.com", "microsoft.com", "apple.com"]
    domain_norm = domain.lower()
    for legit in whitelist:
        ratio = SequenceMatcher(None, domain_norm, legit).ratio()
        # > 0.85 similar but not exact likely typo-squat; tune threshold as needed
        if 0.85 < ratio < 0.99 and domain_norm != legit:
            return True
    return False


def contains_unicode(domain):
    # homograph: presence of non-ascii characters in domain
    try:
        domain.encode("ascii")
        return False
    except Exception:
        return True


def label_entropy(domain_label):
    # Shannon entropy of domain label
    if not domain_label:
        return 0.0
    freq = {}
    for ch in domain_label:
        freq[ch] = freq.get(ch, 0) + 1
    entropy = 0.0
    L = len(domain_label)
    for v in freq.values():
        p = v / L
        entropy -= p * math.log2(p)
    return entropy


def has_randomized_label(domain):
    # very high entropy and long label indicates random string
    ext = tldextract.extract(domain)
    label = ext.subdomain.split(".")[-1] if ext.subdomain else ext.domain
    ent = label_entropy(label)
    return (len(label) >= 8 and ent > 3.8)


def contains_suspicious_keyword(url):
    u = url.lower()
    # we consider keywords in subdomain or path (not query params as strictly suspicious)
    return any(k in u for k in KEYWORD_LIST)


def has_incorrect_casing(domain):
    # mixed-case in domain is unusual
    return any(ch.isupper() for ch in domain) and not domain.isupper()


# ----------------------------
# Host-based helpers
# ----------------------------
def check_ssl_cert(domain):
    """
    Return dict: { 'ssl_valid': bool, 'not_before': datetime or None, 'not_after': datetime or None, 'self_signed': bool }
    """
    out = {"ssl_valid": False, "not_before": None, "not_after": None, "self_signed": False}
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((domain, 443), timeout=6) as sock:
            with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
                cert = ssock.getpeercert()
                # parse dates
                def _parse(x):
                    try:
                        return datetime.datetime.strptime(x, "%b %d %H:%M:%S %Y %Z")
                    except Exception:
                        try:
                            return datetime.datetime.strptime(x, "%Y%m%d%H%M%SZ")
                        except:
                            return None

                not_before = _parse(cert.get("notBefore")) or None
                not_after = _parse(cert.get("notAfter")) or None
                out["not_before"] = not_before
                out["not_after"] = not_after

                now = datetime.datetime.utcnow()
                if (not_before and now < not_before) or (not_after and now > not_after):
                    out["ssl_valid"] = False
                else:
                    out["ssl_valid"] = True
                # crude self-signed detection: issuer == subject
                issuer = cert.get("issuer")
                subject = cert.get("subject")
                out["self_signed"] = (issuer == subject)
                return out
    except Exception:
        return out


def get_domain_whois(domain):
    """
    Return dict with domain age (days) and whether registrant is private/hidden.
    If whois fails, returns None values.
    """
    try:
        w = whois.whois(domain)
        creation = w.creation_date
        # creation_date may be list, handle it
        if isinstance(creation, list):
            creation = creation[0] if creation else None
        if creation and isinstance(creation, str):
            # some whois returns string; attempt parse
            creation = datetime.datetime.fromisoformat(creation)
        if creation:
            age_days = (datetime.datetime.utcnow() - creation).days
        else:
            age_days = None
        registrant = str(w.get("org") or w.get("registrant") or w.get("name") or "")
        whois_privacy = any(x in str(w).lower() for x in ["privacy", "redacted", "whoisguard", "contact privacy", "private"])
        return {"age_days": age_days, "whois_privacy": whois_privacy, "registrant": registrant}
    except Exception:
        return {"age_days": None, "whois_privacy": None, "registrant": None}


def hosting_country(domain):
    """
    Simple free GeoIP via ipapi.co - fallback to None on failure.
    """
    try:
        ip = socket.gethostbyname(domain)
        # ipapi.co free endpoint
        resp = requests.get(f"https://ipapi.co/{ip}/country/", timeout=6)
        if resp.status_code == 200:
            return resp.text.strip()
        return None
    except Exception:
        return None


def in_blacklist(domain):
    # local blacklist check
    d = domain.lower()
    if d in BLACKLIST:
        return True
    # also check TLD or domain label variants (tune per needs)
    return False


# ----------------------------
# Page Analyzer
# ----------------------------
def classify_page(url, features):
    """
    Return final classification string with distinct handling for non-https and invalid/expired SSL.
    - features is dict containing 'scheme', 'ssl' dict, etc.
    """
    scheme = features.get("scheme", "")
    ssl_info = features.get("ssl", {})
    # Non-https
    if scheme != "https":
        return "Unhealthy (No HTTPS)"
    # Has HTTPS but SSL invalid
    if ssl_info:
        if not ssl_info.get("ssl_valid", False):
            # differentiate expired vs future vs self-signed if possible
            not_before, not_after = ssl_info.get("not_before"), ssl_info.get("not_after")
            if not_before and datetime.datetime.utcnow() < not_before:
                return "Unhealthy (SSL Not Yet Valid)"
            if not_after and datetime.datetime.utcnow() > not_after:
                return "Unhealthy (SSL Expired)"
            if ssl_info.get("self_signed"):
                return "Unhealthy (SSL Self-Signed)"
            return "Unhealthy (Invalid SSL)"
    # Continue other heuristics based on features:
    if features.get("suspicious_tld"):
        return "Unhealthy (Suspicious TLD)"
    if features.get("typosquat"):
        return "Unhealthy (Typosquat)"
    if features.get("homograph"):
        return "Unhealthy (Possible Homograph)"
    if features.get("external_ratio", 0) > 0.7:
        return "Unhealthy (Too Many External Links)"
    if features.get("num_links", 0) > 100:
        return "Unhealthy (Too Many Links)"
    if features.get("randomized_label"):
        return "Unhealthy (Randomized Domain Label)"
    # domain age
    age = features.get("domain_age_days")
    if age is not None and age < 30:
        return "Unhealthy (New Domain)"
    if features.get("blacklisted"):
        return "Unhealthy (Blacklisted)"
    # default
    return "Healthy"


def analyze_url(url, depth=1, max_pages=500):
    """
    Analyze url and 1-hop neighbors (depth controls recursion; recommended depth=1).
    Returns dict: {url: features_dict}
    """
    seen = set()
    report = {}

    def _analyze(u, d, parent=None):
        if u in seen or len(seen) >= max_pages:
            return
        seen.add(u)
        parsed = urlparse(u)
        domain = parsed.netloc.lower()
        scheme = parsed.scheme.lower()

        # Lexical features
        lexical = {}
        lexical["url_length"] = url_length(u)
        lexical["num_subdomains"] = num_subdomains(domain)
        lexical["special_char_count"] = count_special_chars(u)
        lexical["has_ip"] = contains_ip(domain)
        lexical["typosquat"] = typosquat_score(domain)
        lexical["homograph"] = contains_unicode(domain)
        lexical["randomized_label"] = has_randomized_label(domain)
        lexical["contains_keyword"] = contains_suspicious_keyword(u)
        lexical["incorrect_casing"] = has_incorrect_casing(domain)
        # suspicious tld
        ext = tldextract.extract(domain)
        lexical["tld"] = ext.suffix
        lexical["suspicious_tld"] = ext.suffix.lower() in SUSPICIOUS_TLDS

        # Host features
        host = {}
        # SSL check (only attempt if https)
        if scheme == "https":
            host["ssl"] = check_ssl_cert(domain)
        else:
            host["ssl"] = {"ssl_valid": False, "not_before": None, "not_after": None, "self_signed": False}
        # whois (may fail or be slow) - we try once per domain
        whois_info = get_domain_whois(domain)
        host["domain_age_days"] = whois_info.get("age_days")
        host["whois_privacy"] = whois_info.get("whois_privacy")
        host["registrant"] = whois_info.get("registrant")
        # geoip
        host["hosting_country"] = hosting_country(domain)
        # blacklist
        host["blacklisted"] = in_blacklist(domain)

        # Content/features: links
        links, final_url = get_links(u)
        num_links = len(links)
        external_links = [l for l in links if urlparse(l).netloc.lower() != domain]
        external_ratio = (len(external_links) / num_links) if num_links > 0 else 0.0

        # build features
        features = {}
        features.update(lexical)
        features.update(host)
        features["scheme"] = scheme
        features["num_links"] = num_links
        features["external_ratio"] = round(external_ratio, 3)
        features["parent"] = parent
        # store up to a few external links for inspection (full crawl optional)
        features["external_links_sample"] = external_links[:MAX_EXTERNAL_LISTED]

        # classification (handles non-https and invalid ssl separately)
        features["classification"] = classify_page(u, {"scheme": scheme, "ssl": host.get("ssl"), **features})
        features["url_final"] = final_url

        report[u] = features

        # recurse 1 level deep into links if allowed (we will analyze both internal and external links)
        if d < depth:
            for link in links:
                _analyze(link, d + 1, parent=u)

    _analyze(url, 0, parent=None)
    return report


# ----------------------------
# CSV Export helper
# ----------------------------
def export_report_csv(report, filename="phish_report.csv"):
    """
    Flatten the report dict into CSV rows.
    Each row corresponds to one URL analyzed.
    """
    fieldnames = [
        "url", "url_final", "domain", "scheme", "tld", "classification",
        "url_length", "num_subdomains", "special_char_count", "has_ip",
        "typosquat", "homograph", "randomized_label", "contains_keyword", "incorrect_casing",
        "num_links", "external_ratio", "domain_age_days", "whois_privacy", "registrant",
        "hosting_country", "blacklisted", "ssl_valid", "ssl_not_before", "ssl_not_after", "ssl_self_signed",
        "parent", "external_links_sample"
    ]
    rows = []
    for url, f in report.items():
        parsed = urlparse(url)
        domain = parsed.netloc
        ssl = f.get("ssl") or {}
        row = {
            "url": url,
            "url_final": f.get("url_final"),
            "domain": domain,
            "scheme": f.get("scheme"),
            "tld": f.get("tld"),
            "classification": f.get("classification"),
            "url_length": f.get("url_length"),
            "num_subdomains": f.get("num_subdomains"),
            "special_char_count": f.get("special_char_count"),
            "has_ip": f.get("has_ip"),
            "typosquat": f.get("typosquat"),
            "homograph": f.get("homograph"),
            "randomized_label": f.get("randomized_label"),
            "contains_keyword": f.get("contains_keyword"),
            "incorrect_casing": f.get("incorrect_casing"),
            "num_links": f.get("num_links"),
            "external_ratio": f.get("external_ratio"),
            "domain_age_days": f.get("domain_age_days"),
            "whois_privacy": f.get("whois_privacy"),
            "registrant": f.get("registrant"),
            "hosting_country": f.get("hosting_country"),
            "blacklisted": f.get("blacklisted"),
            "ssl_valid": ssl.get("ssl_valid"),
            "ssl_not_before": ssl.get("not_before"),
            "ssl_not_after": ssl.get("not_after"),
            "ssl_self_signed": ssl.get("self_signed"),
            "parent": f.get("parent"),
            "external_links_sample": ";".join(f.get("external_links_sample", [])),
        }
        rows.append(row)

    with open(filename, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)
    print(f"[INFO] Exported report CSV to {filename}")


# ----------------------------
# Example usage (main)
# ----------------------------
if __name__ == "__main__":
    # Test URLs (replace or add your own)
    TEST_URLS = [
        "https://www.bbc.com/news",
        # "https://example.com",
        # "http://192.168.1.1/login",  # IP example
        # "http://paypa1.com",  # typosquat example (do not run against actual malicious infra)
    ]

    final_report = {}
    for t in TEST_URLS:
        try:
            r = analyze_url(t, depth=1)
            final_report.update(r)
        except KeyboardInterrupt:
            print("Interrupted by user.")
            break
        except Exception as e:
            print(f"[ERROR] Unexpected error analyzing {t}: {e}")

    # Pretty print small report summary
    for url, f in final_report.items():
        print("\n---")
        print(f"URL: {url}")
        print(f" Final: {f.get('url_final')}")
        print(f" Class: {f.get('classification')}")
        print(f" Domain age (days): {f.get('domain_age_days')}")
        print(f" SSL valid: {f.get('ssl', {}).get('ssl_valid')}")
        print(f" Suspicious TLD: {f.get('suspicious_tld')}")
        print(f" Typosquat: {f.get('typosquat')}")
        print(f" Homograph: {f.get('homograph')}")
        print(f" Randomized label: {f.get('randomized_label')}")
        print(f" Num links: {f.get('num_links')}, External ratio: {f.get('external_ratio')}")
        print(f" External sample (<= {MAX_EXTERNAL_LISTED}): {f.get('external_links_sample')}")

    # Export CSV for ML
    export_report_csv(final_report, filename="phish_report.csv")


  now = datetime.datetime.utcnow()


[WARN] Fetch failed (403 Client Error: Forbidden for url: https://help.bbc.com/hc/) for https://help.bbc.com/hc/, retrying in 1s...
[WARN] Fetch failed (403 Client Error: Forbidden for url: https://help.bbc.com/hc/) for https://help.bbc.com/hc/, retrying in 3s...
[WARN] Fetch failed (403 Client Error: Forbidden for url: https://help.bbc.com/hc/) for https://help.bbc.com/hc/, retrying in 6s...
[ERROR] Could not fetch https://help.bbc.com/hc/ after 3 retries.

---
URL: https://www.bbc.com/news
 Final: https://www.bbc.com/news
 Class: Unhealthy (Too Many Links)
 Domain age (days): None
 SSL valid: True
 Suspicious TLD: False
 Typosquat: False
 Homograph: False
 Randomized label: False
 Num links: 187, External ratio: 0.037
 External sample (<= 20): ['https://shop.bbc.com/', 'https://www.britbox.com/?utm_source=bbc.com&utm_medium=referral&utm_campaign=footer', 'https://www.bbc.co.uk/aboutthebbc', 'https://www.bbc.co.uk/accessibility/', 'https://www.bbc.co.uk/contact', 'https://help.bbc.com

In [None]:
#!/usr/bin/env python3
"""
Real-time Phishing Detector
- Lexical features + Host features + content checks
- Rotating headers, retries, jitter
- External link validation (1-hop)
- Separate handling/classification for:
    * Non-HTTPS pages -> Unhealthy (No HTTPS)
    * HTTPS with invalid/expired SSL -> Unhealthy (Invalid SSL)
- Exports CSV for ML

Requirements (install if missing):
    pip install requests beautifulsoup4 tldextract python-whois
"""

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
import tldextract
import whois
import socket
import ssl
import time
import random
import re
import csv
import datetime
import math
from difflib import SequenceMatcher

# ----------------------------
# Config
# ----------------------------
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:119.0) Gecko/20100101 Firefox/119.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36",
]

SUSPICIOUS_TLDS = {"xyz", "tk", "top", "click", "link", "info", "pw", "cn"}
KEYWORD_LIST = {"login", "secure", "verify", "update", "bank", "free", "account", "signin"}
BLACKLIST = set()  # populate with known bad domains if you have a list

MAX_RETRIES = 3
RETRY_BACKOFF = [1, 3, 6]  # seconds
REQUEST_TIMEOUT = 10
SLEEP_JITTER = (0.8, 2.5)  # seconds between requests to appear human
MAX_EXTERNAL_LISTED = 20  # how many external links to store in report for readability

# domains to ignore/noise (analytics/ads) to reduce noise
SKIP_DOMAINS_CONTAINS = ["google", "googlesyndication", "facebook", "doubleclick", "googletagmanager", "instagram", "twitter"]


# ----------------------------
# Networking helpers
# ----------------------------
def get_headers():
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
    }


def fetch_url(url):
    """Fetch URL with retries and random headers. Returns requests.Response or None."""
    for attempt in range(MAX_RETRIES):
        try:
            resp = requests.get(url, headers=get_headers(), timeout=REQUEST_TIMEOUT)
            resp.raise_for_status()
            time.sleep(random.uniform(*SLEEP_JITTER))
            return resp
        except requests.exceptions.RequestException as e:
            wait = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)]
            print(f"[WARN] Fetch failed ({e}) for {url}, retrying in {wait}s...")
            time.sleep(wait + random.random())
    print(f"[ERROR] Could not fetch {url} after {MAX_RETRIES} retries.")
    return None


# ----------------------------
# Link extraction & filtering
# ----------------------------
BAD_HREFS = {"", "#", "/", "/undefined"}


def is_junk_href(href):
    if not href:
        return True
    href = href.strip()
    lower = href.lower()
    if lower in BAD_HREFS:
        return True
    if lower.startswith(("javascript:", "mailto:", "tel:")):
        return True
    return False


def get_links(base_url):
    """Return (links_list, final_url). links filtered to valid http(s) absolute URLs."""
    resp = fetch_url(base_url)
    if not resp:
        return [], base_url
    try:
        soup = BeautifulSoup(resp.text, "html.parser")
    except Exception as e:
        print(f"[ERROR] Couldn't parse HTML for {base_url}: {e}")
        return [], resp.url

    links = []
    for a in soup.find_all("a", href=True):
        href = a["href"].strip()
        if is_junk_href(href):
            continue
        abs_url = urljoin(base_url, href)
        parsed = urlparse(abs_url)
        if parsed.scheme not in ("http", "https"):
            continue
        # skip noise domains
        if any(x in parsed.netloc for x in SKIP_DOMAINS_CONTAINS):
            continue
        links.append(abs_url)
    return links, resp.url


# ----------------------------
# Lexical feature helpers
# ----------------------------
IP_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")


def url_length(url):
    return len(url)


def num_subdomains(domain):
    # Use tldextract to get subdomain reliably
    ext = tldextract.extract(domain)
    sub = ext.subdomain
    if not sub:
        return 0
    return len(sub.split("."))


def count_special_chars(url):
    specials = ['@', '-', '_', '%', '=', '&']
    # ignore query separators if it's a "normal" parameter-rich URL
    parsed = urlparse(url)
    path_plus = parsed.netloc + parsed.path
    # count occurrences in path+netloc (not counting query string bias)
    return sum(path_plus.count(ch) for ch in specials)


def contains_ip(domain):
    return bool(IP_RE.match(domain))


def typosquat_score(domain, whitelist=None):
    # returns True if suspiciously similar to a whitelist domain
    if not whitelist:
        whitelist = ["google.com", "facebook.com", "amazon.com", "paypal.com", "microsoft.com", "apple.com"]
    domain_norm = domain.lower()
    for legit in whitelist:
        ratio = SequenceMatcher(None, domain_norm, legit).ratio()
        # > 0.85 similar but not exact likely typo-squat; tune threshold as needed
        if 0.85 < ratio < 0.99 and domain_norm != legit:
            return True
    return False


def contains_unicode(domain):
    # homograph: presence of non-ascii characters in domain
    try:
        domain.encode("ascii")
        return False
    except Exception:
        return True


def label_entropy(domain_label):
    # Shannon entropy of domain label
    if not domain_label:
        return 0.0
    freq = {}
    for ch in domain_label:
        freq[ch] = freq.get(ch, 0) + 1
    entropy = 0.0
    L = len(domain_label)
    for v in freq.values():
        p = v / L
        entropy -= p * math.log2(p)
    return entropy


def has_randomized_label(domain):
    # very high entropy and long label indicates random string
    ext = tldextract.extract(domain)
    label = ext.subdomain.split(".")[-1] if ext.subdomain else ext.domain
    ent = label_entropy(label)
    return (len(label) >= 8 and ent > 3.8)


def contains_suspicious_keyword(url):
    u = url.lower()
    # we consider keywords in subdomain or path (not query params as strictly suspicious)
    return any(k in u for k in KEYWORD_LIST)


def has_incorrect_casing(domain):
    # mixed-case in domain is unusual
    return any(ch.isupper() for ch in domain) and not domain.isupper()


# ----------------------------
# Host-based helpers
# ----------------------------
def check_ssl_cert(domain):
    """
    Return dict: { 'ssl_valid': bool, 'not_before': datetime or None, 'not_after': datetime or None, 'self_signed': bool }
    """
    out = {"ssl_valid": False, "not_before": None, "not_after": None, "self_signed": False}
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((domain, 443), timeout=6) as sock:
            with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
                cert = ssock.getpeercert()
                # parse dates
                def _parse(x):
                    try:
                        return datetime.datetime.strptime(x, "%b %d %H:%M:%S %Y %Z")
                    except Exception:
                        try:
                            return datetime.datetime.strptime(x, "%Y%m%d%H%M%SZ")
                        except:
                            return None

                not_before = _parse(cert.get("notBefore")) or None
                not_after = _parse(cert.get("notAfter")) or None
                out["not_before"] = not_before
                out["not_after"] = not_after

                now = datetime.datetime.utcnow()
                if (not_before and now < not_before) or (not_after and now > not_after):
                    out["ssl_valid"] = False
                else:
                    out["ssl_valid"] = True
                # crude self-signed detection: issuer == subject
                issuer = cert.get("issuer")
                subject = cert.get("subject")
                out["self_signed"] = (issuer == subject)
                return out
    except Exception:
        return out


def get_domain_whois(domain):
    """
    Return dict with domain age (days) and whether registrant is private/hidden.
    If whois fails, returns None values.
    """
    try:
        w = whois.whois(domain)
        creation = w.creation_date
        # creation_date may be list, handle it
        if isinstance(creation, list):
            creation = creation[0] if creation else None
        if creation and isinstance(creation, str):
            # some whois returns string; attempt parse
            creation = datetime.datetime.fromisoformat(creation)
        if creation:
            age_days = (datetime.datetime.utcnow() - creation).days
        else:
            age_days = None
        registrant = str(w.get("org") or w.get("registrant") or w.get("name") or "")
        whois_privacy = any(x in str(w).lower() for x in ["privacy", "redacted", "whoisguard", "contact privacy", "private"])
        return {"age_days": age_days, "whois_privacy": whois_privacy, "registrant": registrant}
    except Exception:
        return {"age_days": None, "whois_privacy": None, "registrant": None}


def hosting_country(domain):
    """
    Simple free GeoIP via ipapi.co - fallback to None on failure.
    """
    try:
        ip = socket.gethostbyname(domain)
        # ipapi.co free endpoint
        resp = requests.get(f"https://ipapi.co/{ip}/country/", timeout=6)
        if resp.status_code == 200:
            return resp.text.strip()
        return None
    except Exception:
        return None


def in_blacklist(domain):
    # local blacklist check
    d = domain.lower()
    if d in BLACKLIST:
        return True
    # also check TLD or domain label variants (tune per needs)
    return False


# ----------------------------
# Page Analyzer
# ----------------------------
def classify_page(url, features):
    """
    Return final classification string with distinct handling for non-https and invalid/expired SSL.
    - features is dict containing 'scheme', 'ssl' dict, etc.
    """
    scheme = features.get("scheme", "")
    ssl_info = features.get("ssl", {})
    # Non-https
    if scheme != "https":
        return "Unhealthy (No HTTPS)"
    # Has HTTPS but SSL invalid
    if ssl_info:
        if not ssl_info.get("ssl_valid", False):
            # differentiate expired vs future vs self-signed if possible
            not_before, not_after = ssl_info.get("not_before"), ssl_info.get("not_after")
            if not_before and datetime.datetime.utcnow() < not_before:
                return "Unhealthy (SSL Not Yet Valid)"
            if not_after and datetime.datetime.utcnow() > not_after:
                return "Unhealthy (SSL Expired)"
            if ssl_info.get("self_signed"):
                return "Unhealthy (SSL Self-Signed)"
            return "Unhealthy (Invalid SSL)"
    # Continue other heuristics based on features:
    if features.get("suspicious_tld"):
        return "Unhealthy (Suspicious TLD)"
    if features.get("typosquat"):
        return "Unhealthy (Typosquat)"
    if features.get("homograph"):
        return "Unhealthy (Possible Homograph)"
    if features.get("external_ratio", 0) > 0.7:
        return "Unhealthy (Too Many External Links)"
    if features.get("num_links", 0) > 100:
        return "Unhealthy (Too Many Links)"
    if features.get("randomized_label"):
        return "Unhealthy (Randomized Domain Label)"
    # domain age
    age = features.get("domain_age_days")
    if age is not None and age < 30:
        return "Unhealthy (New Domain)"
    if features.get("blacklisted"):
        return "Unhealthy (Blacklisted)"
    # default
    return "Healthy"


def analyze_url(url, depth=1, max_pages=500):
    """
    Analyze url and 1-hop neighbors (depth controls recursion; recommended depth=1).
    Returns dict: {url: features_dict}
    """
    seen = set()
    report = {}

    def _analyze(u, d, parent=None):
        if u in seen or len(seen) >= max_pages:
            return
        seen.add(u)
        parsed = urlparse(u)
        domain = parsed.netloc.lower()
        scheme = parsed.scheme.lower()

        # Lexical features
        lexical = {}
        lexical["url_length"] = url_length(u)
        lexical["num_subdomains"] = num_subdomains(domain)
        lexical["special_char_count"] = count_special_chars(u)
        lexical["has_ip"] = contains_ip(domain)
        lexical["typosquat"] = typosquat_score(domain)
        lexical["homograph"] = contains_unicode(domain)
        lexical["randomized_label"] = has_randomized_label(domain)
        lexical["contains_keyword"] = contains_suspicious_keyword(u)
        lexical["incorrect_casing"] = has_incorrect_casing(domain)
        # suspicious tld
        ext = tldextract.extract(domain)
        lexical["tld"] = ext.suffix
        lexical["suspicious_tld"] = ext.suffix.lower() in SUSPICIOUS_TLDS

        # Host features
        host = {}
        # SSL check (only attempt if https)
        if scheme == "https":
            host["ssl"] = check_ssl_cert(domain)
        else:
            host["ssl"] = {"ssl_valid": False, "not_before": None, "not_after": None, "self_signed": False}
        # whois (may fail or be slow) - we try once per domain
        whois_info = get_domain_whois(domain)
        host["domain_age_days"] = whois_info.get("age_days")
        host["whois_privacy"] = whois_info.get("whois_privacy")
        host["registrant"] = whois_info.get("registrant")
        # geoip
        host["hosting_country"] = hosting_country(domain)
        # blacklist
        host["blacklisted"] = in_blacklist(domain)

        # Content/features: links
        links, final_url = get_links(u)
        num_links = len(links)
        external_links = [l for l in links if urlparse(l).netloc.lower() != domain]
        external_ratio = (len(external_links) / num_links) if num_links > 0 else 0.0

        # build features
        features = {}
        features.update(lexical)
        features.update(host)
        features["scheme"] = scheme
        features["num_links"] = num_links
        features["external_ratio"] = round(external_ratio, 3)
        features["parent"] = parent
        # store up to a few external links for inspection (full crawl optional)
        features["external_links_sample"] = external_links[:MAX_EXTERNAL_LISTED]

        # classification (handles non-https and invalid ssl separately)
        features["classification"] = classify_page(u, {"scheme": scheme, "ssl": host.get("ssl"), **features})
        features["url_final"] = final_url

        report[u] = features

        # recurse 1 level deep into links if allowed (we will analyze both internal and external links)
        if d < depth:
            for link in links:
                _analyze(link, d + 1, parent=u)

    _analyze(url, 0, parent=None)
    return report


# ----------------------------
# CSV Export helper
# ----------------------------
def export_report_csv(report, filename="phish_report.csv"):
    """
    Flatten the report dict into CSV rows.
    Each row corresponds to one URL analyzed.
    """
    fieldnames = [
        "url", "url_final", "domain", "scheme", "tld", "classification",
        "url_length", "num_subdomains", "special_char_count", "has_ip",
        "typosquat", "homograph", "randomized_label", "contains_keyword", "incorrect_casing",
        "num_links", "external_ratio", "domain_age_days", "whois_privacy", "registrant",
        "hosting_country", "blacklisted", "ssl_valid", "ssl_not_before", "ssl_not_after", "ssl_self_signed",
        "parent", "external_links_sample"
    ]
    rows = []
    for url, f in report.items():
        parsed = urlparse(url)
        domain = parsed.netloc
        ssl = f.get("ssl") or {}
        row = {
            "url": url,
            "url_final": f.get("url_final"),
            "domain": domain,
            "scheme": f.get("scheme"),
            "tld": f.get("tld"),
            "classification": f.get("classification"),
            "url_length": f.get("url_length"),
            "num_subdomains": f.get("num_subdomains"),
            "special_char_count": f.get("special_char_count"),
            "has_ip": f.get("has_ip"),
            "typosquat": f.get("typosquat"),
            "homograph": f.get("homograph"),
            "randomized_label": f.get("randomized_label"),
            "contains_keyword": f.get("contains_keyword"),
            "incorrect_casing": f.get("incorrect_casing"),
            "num_links": f.get("num_links"),
            "external_ratio": f.get("external_ratio"),
            "domain_age_days": f.get("domain_age_days"),
            "whois_privacy": f.get("whois_privacy"),
            "registrant": f.get("registrant"),
            "hosting_country": f.get("hosting_country"),
            "blacklisted": f.get("blacklisted"),
            "ssl_valid": ssl.get("ssl_valid"),
            "ssl_not_before": ssl.get("not_before"),
            "ssl_not_after": ssl.get("not_after"),
            "ssl_self_signed": ssl.get("self_signed"),
            "parent": f.get("parent"),
            "external_links_sample": ";".join(f.get("external_links_sample", [])),
        }
        rows.append(row)

    with open(filename, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)
    print(f"[INFO] Exported report CSV to {filename}")


# ----------------------------
# Example usage (main)
# ----------------------------
if __name__ == "__main__":
    # Test URLs (replace or add your own)
    TEST_URLS = [
        "https://www.google.com/search?q=data+frame+see+the+full+content+of+the+cell&sxsrf=AE3TifNzPa2Ab-WiFqjPxAgA3QVCxI5vEQ%3A1756538031692",
        #"https://www.bbc.com/news",
        "https://example.com",
        "http://192.168.1.1/login",  # IP example
        "http://paypa1.com",  # typosquat example (do not run against actual malicious infra)
    ]

    final_report = {}
    for t in TEST_URLS:
        try:
            r = analyze_url(t, depth=1)
            final_report.update(r)
        except KeyboardInterrupt:
            print("Interrupted by user.")
            break
        except Exception as e:
            print(f"[ERROR] Unexpected error analyzing {t}: {e}")

    # Pretty print small report summary
    for url, f in final_report.items():
        print("\n---")
        print(f"URL: {url}")
        print(f" Final: {f.get('url_final')}")
        print(f" Class: {f.get('classification')}")
        print(f" Domain age (days): {f.get('domain_age_days')}")
        print(f" SSL valid: {f.get('ssl', {}).get('ssl_valid')}")
        print(f" Suspicious TLD: {f.get('suspicious_tld')}")
        print(f" Typosquat: {f.get('typosquat')}")
        print(f" Homograph: {f.get('homograph')}")
        print(f" Randomized label: {f.get('randomized_label')}")
        print(f" Num links: {f.get('num_links')}, External ratio: {f.get('external_ratio')}")
        print(f" External sample (<= {MAX_EXTERNAL_LISTED}): {f.get('external_links_sample')}")

    # Export CSV for ML
    export_report_csv(final_report, filename="phish_report.csv")


#### END

### **All 4 Categories**

In [None]:
!pip install requests beautifulsoup4 tldextract python-whois

Collecting tldextract
  Downloading tldextract-5.3.0-py3-none-any.whl.metadata (11 kB)
Collecting python-whois
  Downloading python_whois-0.9.5-py3-none-any.whl.metadata (2.6 kB)
Collecting requests-file>=1.4 (from tldextract)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading tldextract-5.3.0-py3-none-any.whl (107 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m107.4/107.4 kB[0m [31m730.7 kB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_whois-0.9.5-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.2/104.2 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading requests_file-2.1.0-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: requests-file, python-whois, tldextract
Successfully installed python-whois-0.9.5 requests-file-2.1.0 tldextract-5.3.0


In [None]:
#!/usr/bin/env python3
"""
phish_detector.py

Real-time Phishing Detector (modular)
- Lexical features
- Host-based features (WHOIS, SSL, GeoIP, blacklist)
- Content-based features (redirects, iframes, login forms, brand mismatch)
- Behavioral features (file downloads, mouseover mismatch, popups)
- 1-hop crawling and external link validation
- Exports CSV and JSON report
"""

import re
import math
import time
import random
import csv
import json
import socket
import ssl
import datetime
from urllib.parse import urlparse, urljoin
from difflib import SequenceMatcher
from collections import Counter

import requests
from bs4 import BeautifulSoup
import tldextract
import whois

# ---------------------------
# Config
# ---------------------------
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:119.0) Gecko/20100101 Firefox/119.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36",
]

SUSPICIOUS_TLDS = {"xyz", "tk", "top", "click", "link", "info", "pw", "cn"}
KEYWORD_LIST = {"login", "secure", "verify", "update", "bank", "free", "account", "signin"}
BLACKLIST = set()  # add known bad domains if available
MAX_RETRIES = 3
RETRY_BACKOFF = [1, 3, 6]
REQUEST_TIMEOUT = 10
SLEEP_JITTER = (0.8, 2.0)
MAX_EXTERNAL_SAMPLE = 20
SKIP_DOMAINS_CONTAINS = ["googletagmanager", "googlesyndication", "doubleclick", "facebook", "instagram", "twitter"]
BRAND_LIST = ["paypal", "google", "amazon", "facebook", "microsoft", "apple", "bank"]  # extend as needed

# ---------------------------
# Networking helpers
# ---------------------------
def get_headers():
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
    }


def fetch_url(url, allow_redirects=True):
    """Fetch a URL with retries and randomized headers. Returns requests.Response or None."""
    for attempt in range(MAX_RETRIES):
        try:
            resp = requests.get(url, headers=get_headers(), timeout=REQUEST_TIMEOUT, allow_redirects=allow_redirects)
            resp.raise_for_status()
            time.sleep(random.uniform(*SLEEP_JITTER))
            return resp
        except requests.exceptions.RequestException as e:
            wait = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)]
            print(f"[WARN] Fetch failed ({e}) for {url} — retrying in {wait}s...")
            time.sleep(wait + random.random())
    print(f"[ERROR] Could not fetch {url} after {MAX_RETRIES} retries.")
    return None


# ---------------------------
# Link extraction & sanitization
# ---------------------------
BAD_HREFS = {"", "#", "/", "/undefined"}


def is_junk_href(href):
    if not href:
        return True
    lower = href.strip().lower()
    if lower in BAD_HREFS:
        return True
    if lower.startswith(("javascript:", "mailto:", "tel:", "data:")):
        return True
    return False


def extract_links(base_url, resp):
    """
    Extract cleaned absolute http/https links from a BeautifulSoup-parsed page or response text.
    Returns list of absolute URLs.
    """
    try:
        soup = BeautifulSoup(resp.text, "html.parser")
    except Exception:
        soup = None

    links = []
    if not soup:
        return links, None

    for a in soup.find_all("a", href=True):
        href = a["href"].strip()
        if is_junk_href(href):
            continue
        abs_url = urljoin(base_url, href)
        parsed = urlparse(abs_url)
        if parsed.scheme not in ("http", "https"):
            continue
        # skip analytics/ad domains to reduce noise
        if any(skip in parsed.netloc for skip in SKIP_DOMAINS_CONTAINS):
            continue
        links.append(abs_url)
    return links, soup


# ---------------------------
# Lexical features module
# ---------------------------
IP_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")


def lexical_features(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    ext = tldextract.extract(domain)

    features = {}
    features["url_length"] = len(url)
    features["scheme"] = parsed.scheme.lower()
    features["domain"] = domain.lower()
    features["tld"] = ext.suffix.lower() if ext.suffix else ""
    features["subdomain_count"] = 0 if not ext.subdomain else len(ext.subdomain.split("."))
    # special chars in netloc + path (ignore query portion to not penalize normal queries)
    path_netloc = parsed.netloc + parsed.path
    features["special_char_count"] = sum(path_netloc.count(ch) for ch in ['@', '-', '_', '%', '=', '&'])
    features["has_ip"] = bool(IP_RE.match(ext.registered_domain or domain))
    # typosquat (compare registered_domain to whitelist)
    whitelist = ["google.com", "facebook.com", "amazon.com", "paypal.com", "microsoft.com", "apple.com"]
    domain_norm = (ext.registered_domain or domain).lower()
    def typosquat_check(dnm):
        for legit in whitelist:
            ratio = SequenceMatcher(None, dnm, legit).ratio()
            if 0.85 < ratio < 0.995 and dnm != legit:
                return True
        return False
    features["typosquat"] = typosquat_check(domain_norm)
    # homograph (non-ascii characters in domain)
    try:
        domain.encode("ascii")
        features["homograph"] = False
    except Exception:
        features["homograph"] = True
    # label entropy (check domain label randomness)
    label = ext.domain or ""
    if label:
        freq = Counter(label)
        L = len(label)
        entropy = -sum((v / L) * math.log2(v / L) for v in freq.values())
    else:
        entropy = 0.0
    features["label_entropy"] = round(entropy, 3)
    features["randomized_label"] = (len(label) >= 8 and entropy > 3.8)
    features["contains_keyword"] = any(k in (parsed.path + ".".join([ext.subdomain or ""])).lower() for k in KEYWORD_LIST)
    features["incorrect_casing"] = any(ch.isupper() for ch in domain) and not domain.isupper()
    features["suspicious_tld"] = features["tld"] in SUSPICIOUS_TLDS
    return features


# ---------------------------
# Host-based features module
# ---------------------------
WHOIS_CACHE = {}
GEOIP_CACHE = {}
SSL_CACHE = {}


def get_whois(domain):
    if domain in WHOIS_CACHE:
        return WHOIS_CACHE[domain]
    result = {"age_days": None, "whois_privacy": None, "registrant": None}
    try:
        w = whois.whois(domain)
        creation = w.creation_date
        if isinstance(creation, list):
            creation = creation[0] if creation else None
        if isinstance(creation, str):
            try:
                creation = datetime.datetime.fromisoformat(creation)
            except Exception:
                creation = None
        if creation:
            age_days = (datetime.datetime.utcnow() - creation).days
        else:
            age_days = None
        registrant = (w.get("org") or w.get("name") or "") if isinstance(w, dict) or hasattr(w, "get") else ""
        whois_privacy = any(x in str(w).lower() for x in ["privacy", "redacted", "whoisguard", "contact privacy", "protected"])
        result = {"age_days": age_days, "whois_privacy": whois_privacy, "registrant": registrant}
    except Exception as e:
        # whois may fail; keep None values
        pass
    WHOIS_CACHE[domain] = result
    return result


def geolocate_domain(domain):
    """Simple GeoIP using ipapi.co; cached. Returns country code or None."""
    if domain in GEOIP_CACHE:
        return GEOIP_CACHE[domain]
    try:
        ip = socket.gethostbyname(domain)
        resp = requests.get(f"https://ipapi.co/{ip}/country/", timeout=6)
        if resp.status_code == 200:
            country = resp.text.strip()
            GEOIP_CACHE[domain] = country
            return country
    except Exception:
        pass
    GEOIP_CACHE[domain] = None
    return None


def check_ssl(domain):
    """Return ssl info dict (ssl_valid, not_before, not_after, self_signed). Cached per domain."""
    if domain in SSL_CACHE:
        return SSL_CACHE[domain]
    out = {"ssl_valid": False, "not_before": None, "not_after": None, "self_signed": False}
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((domain, 443), timeout=6) as sock:
            with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
                cert = ssock.getpeercert()
                # parse dates (try multiple formats)
                def _parse(x):
                    if not x:
                        return None
                    for fmt in ("%b %d %H:%M:%S %Y %Z", "%Y%m%d%H%M%SZ"):
                        try:
                            return datetime.datetime.strptime(x, fmt)
                        except Exception:
                            continue
                    return None
                not_before = _parse(cert.get("notBefore"))
                not_after = _parse(cert.get("notAfter"))
                out["not_before"] = not_before
                out["not_after"] = not_after
                now = datetime.datetime.utcnow()
                if (not_before and now < not_before) or (not_after and now > not_after):
                    out["ssl_valid"] = False
                else:
                    out["ssl_valid"] = True
                issuer = cert.get("issuer")
                subject = cert.get("subject")
                out["self_signed"] = (issuer == subject)
    except Exception:
        pass
    SSL_CACHE[domain] = out
    return out


def host_features(domain):
    hf = {}
    hf["whois"] = get_whois(domain)
    hf["hosting_country"] = geolocate_domain(domain)
    hf["blacklisted"] = domain.lower() in BLACKLIST
    hf["ssl"] = check_ssl(domain)
    return hf


# ---------------------------
# Content-based features module
# ---------------------------
def content_features_from_response(url, resp, soup=None):
    """
    Accepts response (requests.Response) and parsed soup (optional).
    Returns content-based feature dict.
    """
    cf = {}
    if resp is None:
        # default values if page couldn't be fetched
        cf.update({
            "num_redirects": 0,
            "hidden_iframes": 0,
            "obfuscated_scripts": 0,
            "login_form_insecure": False,
            "domain_brand_mismatch": False,
        })
        return cf

    # num redirects
    cf["num_redirects"] = len(resp.history) if hasattr(resp, "history") else 0

    # parse if not provided
    if not soup:
        try:
            soup = BeautifulSoup(resp.text, "html.parser")
        except Exception:
            soup = None

    # hidden iframes
    hidden_iframes = 0
    if soup:
        for iframe in soup.find_all("iframe"):
            style = iframe.get("style", "") or ""
            width = iframe.get("width")
            height = iframe.get("height")
            # heuristics for hidden iframe
            if "display:none" in style.lower() or "visibility:hidden" in style.lower() or (width == "0" or height == "0"):
                hidden_iframes += 1
    cf["hidden_iframes"] = hidden_iframes

    # obfuscated scripts (naive: long script tags or presence of eval/obfuscation patterns)
    obf = 0
    if soup:
        for s in soup.find_all("script"):
            txt = (s.string or "") or ""
            if len(txt) > 2000:  # very long inline script
                obf += 1
            if "eval(" in txt or "document.write(unescape" in txt or ("_0x" in txt and "push(" in txt):
                obf += 1
    cf["obfuscated_scripts"] = obf

    # login form on insecure site — caller must check scheme (we return whether forms with password exist)
    login_forms = 0
    if soup:
        for form in soup.find_all("form"):
            if form.find("input", {"type": "password"}):
                login_forms += 1
    cf["login_forms_count"] = login_forms

    # domain-brand mismatch: search for brand names in text/title but not in domain
    domain = urlparse(resp.url).netloc.lower() if resp else urlparse(url).netloc.lower()
    text = ""
    if soup:
        title = soup.title.string.strip() if soup.title and soup.title.string else ""
        body = soup.get_text(" ", strip=True)[:20000]  # limit size
        text = (title + " " + body).lower()
    mismatch = False
    for brand in BRAND_LIST:
        if brand in text and brand not in domain:
            mismatch = True
            break
    cf["domain_brand_mismatch"] = mismatch

    return cf


# ---------------------------
# Behavioral features module (approximate w/o full JS runtime)
# ---------------------------
def behavioral_features_from_soup(soup):
    bf = {}
    if not soup:
        bf.update({
            "suspicious_download_links": 0,
            "mouseover_mismatch_count": 0,
            "popup_like_counts": 0,
        })
        return bf

    # suspicious downloads
    suspicious_exts = (".exe", ".scr", ".apk", ".bat", ".msi")
    download_count = 0
    for a in soup.find_all("a", href=True):
        href = a["href"].lower()
        if any(href.endswith(ext) for ext in suspicious_exts):
            download_count += 1
    bf["suspicious_download_links"] = download_count

    # mouseover mismatch: anchor text looks like a URL but href different OR visible text different from href host
    mm = 0
    for a in soup.find_all("a", href=True):
        txt = (a.text or "").strip()
        href = a["href"].strip()
        if txt and href and txt.count(".") >= 1:
            # if displayed text contains domain-like string different from link host
            try:
                txt_host = urlparse(txt if txt.startswith("http") else "http://" + txt).netloc.lower()
                href_host = urlparse(urljoin("http://dummy", href)).netloc.lower()
                if txt_host and href_host and txt_host != href_host:
                    mm += 1
            except Exception:
                pass
    bf["mouseover_mismatch_count"] = mm

    # popups via script detection (alert, confirm, window.open)
    pop_count = 0
    for s in soup.find_all("script"):
        txt = (s.string or "") or ""
        pop_count += txt.lower().count("alert(")
        pop_count += txt.lower().count("confirm(")
        pop_count += txt.lower().count("prompt(")
        pop_count += txt.lower().count("window.open")
    bf["popup_like_counts"] = pop_count

    return bf


# ---------------------------
# Classification (collects ALL reasons)
# ---------------------------
def collect_reasons_and_classify(features):
    """
    Input: features dict containing lexical, host, content, behavioral
    Return: (classification, reasons_list)
    """
    reasons = []

    # Scheme / SSL reasons
    scheme = features.get("lexical", {}).get("scheme", "")
    ssl_info = features.get("host", {}).get("ssl", {}) or {}
    if scheme != "https":
        reasons.append("No HTTPS")
    else:
        if not ssl_info.get("ssl_valid", True):
            # identify specific SSL reasons
            nb = ssl_info.get("not_before")
            na = ssl_info.get("not_after")
            if nb and isinstance(nb, datetime.datetime) and datetime.datetime.utcnow() < nb:
                reasons.append("SSL Not Yet Valid")
            if na and isinstance(na, datetime.datetime) and datetime.datetime.utcnow() > na:
                reasons.append("SSL Expired")
            if ssl_info.get("self_signed"):
                reasons.append("SSL Self-Signed")
            if "SSL Not Yet Valid" not in reasons and "SSL Expired" not in reasons and "SSL Self-Signed" not in reasons:
                reasons.append("Invalid SSL")

    # Host-based reasons
    if features.get("host", {}).get("blacklisted"):
        reasons.append("Blacklisted")
    domain_age = features.get("host", {}).get("whois", {}).get("age_days")
    if domain_age is not None and domain_age < 30:
        reasons.append("New Domain (<30 days)")
    if features.get("host", {}).get("whois", {}).get("whois_privacy"):
        reasons.append("WHOIS Privacy / Redacted")

    # Lexical reasons
    lex = features.get("lexical", {})
    if lex.get("suspicious_tld"):
        reasons.append("Suspicious TLD")
    if lex.get("typosquat"):
        reasons.append("Typosquat-like Domain")
    if lex.get("homograph"):
        reasons.append("Possible Homograph (Unicode)")
    if lex.get("randomized_label"):
        reasons.append("Randomized Domain Label")
    if lex.get("contains_keyword"):
        reasons.append("Suspicious Keyword in URL")
    if lex.get("incorrect_casing"):
        reasons.append("Incorrect / Mixed Casing in Domain")

    # Content-based reasons
    cf = features.get("content", {})
    if cf.get("num_redirects", 0) > 3:
        reasons.append("Excessive Redirects")
    if cf.get("hidden_iframes", 0) > 0:
        reasons.append("Hidden Iframes")
    if cf.get("obfuscated_scripts", 0) > 0:
        reasons.append("Obfuscated Scripts")
    if cf.get("login_forms_count", 0) > 0 and lex.get("scheme") != "https":
        reasons.append("Login Form on Insecure Page")
    if cf.get("domain_brand_mismatch"):
        reasons.append("Domain/Brand Mismatch in Content")

    # Behavioral reasons
    bf = features.get("behavioral", {})
    if bf.get("suspicious_download_links", 0) > 0:
        reasons.append("Suspicious Download Links")
    if bf.get("mouseover_mismatch_count", 0) > 0:
        reasons.append("Mouseover URL Mismatch")
    if bf.get("popup_like_counts", 0) > 2:
        reasons.append("Excessive Popup-like Scripts")

    classification = "Healthy" if not reasons else "Unhealthy"
    return classification, reasons


# ---------------------------
# Main analyzer: analyze_url (1-hop)
# ---------------------------
def analyze_url(start_url, depth=1, max_pages=1000):
    """
    Analyze start_url and crawl at most 'depth' hops (recommended depth=1).
    Returns report dict: {url: features}
    """
    seen = set()
    report = {}

    def _analyze(url, d, parent=None):
        if url in seen or len(seen) >= max_pages:
            return
        seen.add(url)
        print(f"[INFO] Analyzing: {url} (depth={d})")

        # Lexical
        lex = lexical_features(url)

        # Fetch the URL allowing redirects (we need history for num_redirects)
        resp = fetch_url(url, allow_redirects=True)
        final_url = resp.url if resp else url

        # Host features (use domain of final_url)
        domain = urlparse(final_url).netloc.lower()
        host = host_features(domain)

        # Content features (use response and parsed soup)
        links, soup = ([], None)
        if resp:
            links, soup = extract_links(final_url, resp)
        cf = content_features_from_response(final_url, resp, soup)

        # Behavioral features (from parsed soup)
        bf = behavioral_features_from_soup(soup)

        # External ratio and sample
        num_links = len(links)
        external_links = [l for l in links if urlparse(l).netloc.lower() != domain]
        external_ratio = (len(external_links) / num_links) if num_links > 0 else 0.0

        # Compose features
        features = {
            "lexical": lex,
            "host": host,
            "content": cf,
            "behavioral": bf,
            "num_links": num_links,
            "external_ratio": round(external_ratio, 3),
            "external_links_sample": external_links[:MAX_EXTERNAL_SAMPLE],
            "parent": parent,
            "final_url": final_url,
        }

        # Classification with all reasons
        classification, reasons = collect_reasons_and_classify(features)
        features["classification"] = classification
        features["reasons"] = reasons

        report[url] = features

        # Recurse 1-hop (internal + external links) if allowed by depth
        if d < depth:
            for link in links:
                # small guard: skip too many recursions on analytics ads etc.
                _analyze(link, d + 1, parent=url)

    _analyze(start_url, 0, parent=None)
    return report


# ---------------------------
# Export helpers
# ---------------------------
def export_report_csv(report, filename="phish_report.csv"):
    fieldnames = [
        "url", "final_url", "domain", "classification", "reasons",
        "scheme", "tld", "url_length", "subdomain_count", "special_char_count",
        "has_ip", "typosquat", "homograph", "label_entropy", "randomized_label",
        "contains_keyword", "incorrect_casing", "num_links", "external_ratio",
        "num_redirects", "hidden_iframes", "obfuscated_scripts", "login_forms_count",
        "domain_brand_mismatch", "suspicious_download_links", "mouseover_mismatch_count",
        "popup_like_counts", "domain_age_days", "whois_privacy", "registrant",
        "hosting_country", "blacklisted", "ssl_valid", "ssl_not_before", "ssl_not_after", "ssl_self_signed",
        "parent"
    ]
    rows = []
    for url, f in report.items():
        parsed_final = urlparse(f.get("final_url") or url)
        domain = parsed_final.netloc
        who = f.get("host", {}).get("whois", {}) or {}
        ssl = f.get("host", {}).get("ssl", {}) or {}
        row = {
            "url": url,
            "final_url": f.get("final_url"),
            "domain": domain,
            "classification": f.get("classification"),
            "reasons": ";".join(f.get("reasons", [])),
            "scheme": f.get("lexical", {}).get("scheme"),
            "tld": f.get("lexical", {}).get("tld"),
            "url_length": f.get("lexical", {}).get("url_length"),
            "subdomain_count": f.get("lexical", {}).get("subdomain_count"),
            "special_char_count": f.get("lexical", {}).get("special_char_count"),
            "has_ip": f.get("lexical", {}).get("has_ip"),
            "typosquat": f.get("lexical", {}).get("typosquat"),
            "homograph": f.get("lexical", {}).get("homograph"),
            "label_entropy": f.get("lexical", {}).get("label_entropy"),
            "randomized_label": f.get("lexical", {}).get("randomized_label"),
            "contains_keyword": f.get("lexical", {}).get("contains_keyword"),
            "incorrect_casing": f.get("lexical", {}).get("incorrect_casing"),
            "num_links": f.get("num_links"),
            "external_ratio": f.get("external_ratio"),
            "num_redirects": f.get("content", {}).get("num_redirects"),
            "hidden_iframes": f.get("content", {}).get("hidden_iframes"),
            "obfuscated_scripts": f.get("content", {}).get("obfuscated_scripts"),
            "login_forms_count": f.get("content", {}).get("login_forms_count"),
            "domain_brand_mismatch": f.get("content", {}).get("domain_brand_mismatch"),
            "suspicious_download_links": f.get("behavioral", {}).get("suspicious_download_links"),
            "mouseover_mismatch_count": f.get("behavioral", {}).get("mouseover_mismatch_count"),
            "popup_like_counts": f.get("behavioral", {}).get("popup_like_counts"),
            "domain_age_days": who.get("age_days"),
            "whois_privacy": who.get("whois_privacy"),
            "registrant": who.get("registrant"),
            "hosting_country": f.get("host", {}).get("hosting_country"),
            "blacklisted": f.get("host", {}).get("blacklisted"),
            "ssl_valid": ssl.get("ssl_valid"),
            "ssl_not_before": ssl.get("not_before"),
            "ssl_not_after": ssl.get("not_after"),
            "ssl_self_signed": ssl.get("self_signed"),
            "parent": f.get("parent"),
        }
        rows.append(row)

    with open(filename, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            # Convert datetimes to ISO strings
            if r["ssl_not_before"] and isinstance(r["ssl_not_before"], datetime.datetime):
                r["ssl_not_before"] = r["ssl_not_before"].isoformat()
            if r["ssl_not_after"] and isinstance(r["ssl_not_after"], datetime.datetime):
                r["ssl_not_after"] = r["ssl_not_after"].isoformat()
            writer.writerow(r)
    print(f"[INFO] CSV exported to {filename}")


def export_report_json(report, filename="phish_report.json"):
    # convert non-serializable types minimally
    serializable = {}
    for url, f in report.items():
        ff = json.loads(json.dumps(f, default=str))
        serializable[url] = ff
    with open(filename, "w", encoding="utf-8") as fh:
        json.dump(serializable, fh, indent=2)
    print(f"[INFO] JSON exported to {filename}")


# ---------------------------
# Example main — edit TEST_URLS to run scans
# ---------------------------
if __name__ == "__main__":
    TEST_URLS = [
        "https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html",
        #"https://ltouurigo.com/cj1i",
        #"https://rebrand.ly/3b33f0"
        #"https://www.google.com/search?q=data+frame+see+the+full+content+of+the+cell&sxsrf=AE3TifNzPa2Ab-WiFqjPxAgA3QVCxI5vEQ%3A1756538031692",
        #"https://example.com",
        #"http://192.168.1.1/login",
        #"http://paypa1.com",
    ]

    all_report = {}
    for url in TEST_URLS:
        try:
            r = analyze_url(url, depth=1)
            all_report.update(r)
        except KeyboardInterrupt:
            print("Interrupted.")
            break
        except Exception as e:
            print(f"[ERROR] Unexpected error scanning {url}: {e}")

    # Print a concise human-readable summary
    for u, f in all_report.items():
        print("\n---------------------------------")
        print(f"URL: {u}")
        print(f"Final URL: {f.get('final_url')}")
        print(f"Classification: {f.get('classification')}")
        print(f"Reasons: {f.get('reasons')}")
        print(f"Domain age (days): {f.get('host', {}).get('whois', {}).get('age_days')}")
        print(f"SSL valid: {f.get('host', {}).get('ssl', {}).get('ssl_valid')}")
        print(f"Suspicious TLD: {f.get('lexical', {}).get('suspicious_tld')}")
        print(f"Typosquat: {f.get('lexical', {}).get('typosquat')}")
        print(f"Homograph: {f.get('lexical', {}).get('homograph')}")
        print(f"Randomized label: {f.get('lexical', {}).get('randomized_label')}")
        print(f"Num links: {f.get('num_links')}, External ratio: {f.get('external_ratio')}")
        print(f"External sample: {f.get('external_links_sample')}")

    # Export
    export_report_csv(all_report, filename="phish_report.csv")
    export_report_json(all_report, filename="phish_report.json")

[INFO] Analyzing: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html (depth=0)
[WARN] Fetch failed (403 Client Error: Forbidden for url: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html) for https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html — retrying in 1s...


  features["has_ip"] = bool(IP_RE.match(ext.registered_domain or domain))
  domain_norm = (ext.registered_domain or domain).lower()


[WARN] Fetch failed (403 Client Error: Forbidden for url: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html) for https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html — retrying in 3s...
[WARN] Fetch failed (403 Client Error: Forbidden for url: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html) for https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html — retrying in 6s...


2025-08-30 12:04:44,045 - whois.whois - ERROR - Error trying to connect to socket: closing socket - [Errno -2] Name or service not known
ERROR:whois.whois:Error trying to connect to socket: closing socket - [Errno -2] Name or service not known


[ERROR] Could not fetch https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html after 3 retries.

---------------------------------
URL: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html
Final URL: https://pub-60733cb9a98d4473bc08598e30d7edda.r2.dev/newlinkedin.html
Classification: Healthy
Reasons: []
Domain age (days): None
SSL valid: True
Suspicious TLD: False
Typosquat: False
Homograph: False
Randomized label: False
Num links: 0, External ratio: 0.0
External sample: []
[INFO] CSV exported to phish_report.csv
[INFO] JSON exported to phish_report.json


  now = datetime.datetime.utcnow()


In [None]:
!pip install whois

Collecting whois
  Downloading whois-1.20240129.2-py3-none-any.whl.metadata (1.3 kB)
Downloading whois-1.20240129.2-py3-none-any.whl (61 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.8/61.8 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: whois
Successfully installed whois-1.20240129.2


In [None]:
#!/usr/bin/env python3
"""
phish_detector.py

Real-time Phishing Detector (modular)
- Lexical features
- Host-based features (WHOIS, SSL, GeoIP, blacklist)
- Content-based features (redirects, iframes, login forms, brand mismatch)
- Behavioral features (file downloads, mouseover mismatch, popups)
- 1-hop crawling and external link validation
- Exports CSV and JSON report
"""

import re
import math
import time
import random
import csv
import json
import socket
import ssl
import datetime
from urllib.parse import urlparse, urljoin
from difflib import SequenceMatcher
from collections import Counter

import requests
from bs4 import BeautifulSoup
import tldextract
import whois

# ---------------------------
# Config
# ---------------------------
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:119.0) Gecko/20100101 Firefox/119.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36",
]

SUSPICIOUS_TLDS = {"xyz", "tk", "top", "click", "link", "info", "pw", "cn"}
KEYWORD_LIST = {"login", "secure", "verify", "update", "bank", "free", "account", "signin"}
BLACKLIST = set()
MAX_RETRIES = 3
RETRY_BACKOFF = [1, 3, 6]
REQUEST_TIMEOUT = 10
SLEEP_JITTER = (0.8, 2.0)
MAX_EXTERNAL_SAMPLE = 20
SKIP_DOMAINS_CONTAINS = ["googletagmanager", "googlesyndication", "doubleclick", "facebook", "instagram", "twitter"]
BRAND_LIST = ["paypal", "google", "amazon", "facebook", "microsoft", "apple", "bank"]

# ---------------------------
# Networking helpers
# ---------------------------
def get_headers():
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
    }

def fetch_url(url, allow_redirects=True):
    for attempt in range(MAX_RETRIES):
        try:
            resp = requests.get(url, headers=get_headers(), timeout=REQUEST_TIMEOUT, allow_redirects=allow_redirects)
            resp.raise_for_status()
            time.sleep(random.uniform(*SLEEP_JITTER))
            return resp
        except requests.exceptions.RequestException as e:
            wait = RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF) - 1)]
            print(f"[WARN] Fetch failed ({e}) for {url} — retrying in {wait}s...")
            time.sleep(wait + random.random())
    print(f"[ERROR] Could not fetch {url} after {MAX_RETRIES} retries.")
    return None

# ---------------------------
# Link extraction
# ---------------------------
BAD_HREFS = {"", "#", "/", "/undefined"}

def is_junk_href(href):
    if not href:
        return True
    lower = href.strip().lower()
    if lower in BAD_HREFS:
        return True
    if lower.startswith(("javascript:", "mailto:", "tel:", "data:")):
        return True
    return False

def extract_links(base_url, resp):
    try:
        soup = BeautifulSoup(resp.text, "html.parser")
    except Exception:
        soup = None
    links = []
    if not soup:
        return links, None
    for a in soup.find_all("a", href=True):
        href = a["href"].strip()
        if is_junk_href(href):
            continue
        abs_url = urljoin(base_url, href)
        parsed = urlparse(abs_url)
        if parsed.scheme not in ("http", "https"):
            continue
        if any(skip in parsed.netloc for skip in SKIP_DOMAINS_CONTAINS):
            continue
        links.append(abs_url)
    return links, soup

# ---------------------------
# Lexical features
# ---------------------------
IP_RE = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")

def lexical_features(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    ext = tldextract.extract(domain)
    features = {
        "url_length": len(url),
        "scheme": parsed.scheme.lower(),
        "domain": domain.lower(),
        "tld": ext.suffix.lower() if ext.suffix else "",
        "subdomain_count": 0 if not ext.subdomain else len(ext.subdomain.split(".")),
    }
    path_netloc = parsed.netloc + parsed.path
    features["special_char_count"] = sum(path_netloc.count(ch) for ch in ['@', '-', '_', '%', '=', '&'])
    features["has_ip"] = bool(IP_RE.match(ext.registered_domain or domain))
    whitelist = ["google.com", "facebook.com", "amazon.com", "paypal.com", "microsoft.com", "apple.com"]
    domain_norm = (ext.registered_domain or domain).lower()
    def typosquat_check(dnm):
        for legit in whitelist:
            ratio = SequenceMatcher(None, dnm, legit).ratio()
            if 0.85 < ratio < 0.995 and dnm != legit:
                return True
        return False
    features["typosquat"] = typosquat_check(domain_norm)
    try:
        domain.encode("ascii")
        features["homograph"] = False
    except Exception:
        features["homograph"] = True
    label = ext.domain or ""
    if label:
        freq = Counter(label)
        L = len(label)
        entropy = -sum((v / L) * math.log2(v / L) for v in freq.values())
    else:
        entropy = 0.0
    features["label_entropy"] = round(entropy, 3)
    features["randomized_label"] = (len(label) >= 8 and entropy > 3.8)
    features["contains_keyword"] = any(k in (parsed.path + ".".join([ext.subdomain or ""])).lower() for k in KEYWORD_LIST)
    features["incorrect_casing"] = any(ch.isupper() for ch in domain) and not domain.isupper()
    features["suspicious_tld"] = features["tld"] in SUSPICIOUS_TLDS
    return features

# ---------------------------
# Content features
# ---------------------------
def content_features_from_response(url, resp, soup=None):
    cf = {}
    if resp is None:
        cf.update({
            "num_redirects": None,
            "hidden_iframes": None,
            "obfuscated_scripts": None,
            "login_forms_count": None,
            "domain_brand_mismatch": None,
        })
        return cf
    cf["num_redirects"] = len(resp.history) if hasattr(resp, "history") else 0
    if not soup:
        try:
            soup = BeautifulSoup(resp.text, "html.parser")
        except Exception:
            soup = None
    cf["hidden_iframes"] = None if not soup else sum(
        1 for iframe in soup.find_all("iframe")
        if "display:none" in (iframe.get("style", "")).lower()
        or "visibility:hidden" in (iframe.get("style", "")).lower()
        or iframe.get("width") == "0" or iframe.get("height") == "0"
    )
    obf = 0
    if soup:
        for s in soup.find_all("script"):
            txt = (s.string or "") or ""
            if len(txt) > 2000 or "eval(" in txt or "document.write(unescape" in txt or ("_0x" in txt and "push(" in txt):
                obf += 1
    cf["obfuscated_scripts"] = None if not soup else obf
    if soup:
        login_forms = sum(1 for form in soup.find_all("form") if form.find("input", {"type": "password"}))
        cf["login_forms_count"] = login_forms
    else:
        cf["login_forms_count"] = None
    domain = urlparse(resp.url).netloc.lower() if resp else urlparse(url).netloc.lower()
    if soup:
        text = ((soup.title.string.strip() if soup.title and soup.title.string else "") + " " + soup.get_text(" ", strip=True)[:20000]).lower()
        cf["domain_brand_mismatch"] = any(brand in text and brand not in domain for brand in BRAND_LIST)
    else:
        cf["domain_brand_mismatch"] = None
    return cf

# ---------------------------
# Behavioral features
# ---------------------------
def behavioral_features_from_soup(soup):
    bf = {}
    if not soup:
        return {"suspicious_download_links": None, "mouseover_mismatch_count": None, "popup_like_counts": None}
    suspicious_exts = (".exe", ".scr", ".apk", ".bat", ".msi")
    bf["suspicious_download_links"] = sum(1 for a in soup.find_all("a", href=True) if any(a["href"].lower().endswith(ext) for ext in suspicious_exts))
    mm = 0
    for a in soup.find_all("a", href=True):
        txt = (a.text or "").strip()
        href = a["href"].strip()
        if txt and href and txt.count(".") >= 1:
            try:
                txt_host = urlparse(txt if txt.startswith("http") else "http://" + txt).netloc.lower()
                href_host = urlparse(urljoin("http://dummy", href)).netloc.lower()
                if txt_host and href_host and txt_host != href_host:
                    mm += 1
            except Exception:
                pass
    bf["mouseover_mismatch_count"] = mm
    pop_count = 0
    for s in soup.find_all("script"):
        txt = (s.string or "") or ""
        low = txt.lower()
        pop_count += low.count("alert(") + low.count("confirm(") + low.count("prompt(") + low.count("window.open")
    bf["popup_like_counts"] = pop_count
    return bf

# ---------------------------
# Logo helper
# ---------------------------
def get_site_logo(base_url, soup):
    def _abs(u): return urljoin(base_url, u) if u else None
    candidates = []
    if soup:
        for link in soup.find_all("link", href=True):
            rel = " ".join(link.get("rel", [])).lower()
            href = link["href"]
            if "apple-touch-icon" in rel or "icon" in rel or "shortcut icon" in rel:
                candidates.append((link.get("sizes", ""), _abs(href)))
    if candidates:
        return candidates[0][1]
    parsed = urlparse(base_url)
    return f"{parsed.scheme}://{parsed.netloc}/favicon.ico" if parsed.scheme and parsed.netloc else None

# ---------------------------
# Classification
# ---------------------------
def collect_reasons_and_classify(features):
    if features.get("fetch_failed"):
        return "Fetch_Failed", ["Fetch failed"]

    reasons = []
    # ---------------------------
# Classification (collects ALL reasons)
# ---------------------------
def collect_reasons_and_classify(features):
    """
    Input: features dict containing lexical, host, content, behavioral
    Return: (classification, reasons_list)
    """
    reasons = []

    # Scheme / SSL reasons
    scheme = features.get("lexical", {}).get("scheme", "")
    ssl_info = features.get("host", {}).get("ssl", {}) or {}
    if scheme != "https":
        reasons.append("No HTTPS")
    else:
        if not ssl_info.get("ssl_valid", True):
            # identify specific SSL reasons
            nb = ssl_info.get("not_before")
            na = ssl_info.get("not_after")
            if nb and isinstance(nb, datetime.datetime) and datetime.datetime.utcnow() < nb:
                reasons.append("SSL Not Yet Valid")
            if na and isinstance(na, datetime.datetime) and datetime.datetime.utcnow() > na:
                reasons.append("SSL Expired")
            if ssl_info.get("self_signed"):
                reasons.append("SSL Self-Signed")
            if "SSL Not Yet Valid" not in reasons and "SSL Expired" not in reasons and "SSL Self-Signed" not in reasons:
                reasons.append("Invalid SSL")

    # Host-based reasons
    if features.get("host", {}).get("blacklisted"):
        reasons.append("Blacklisted")
    domain_age = features.get("host", {}).get("whois", {}).get("age_days")
    if domain_age is not None and domain_age < 30:
        reasons.append("New Domain (<30 days)")
    if features.get("host", {}).get("whois", {}).get("whois_privacy"):
        reasons.append("WHOIS Privacy / Redacted")

    # Lexical reasons
    lex = features.get("lexical", {})
    if lex.get("suspicious_tld"):
        reasons.append("Suspicious TLD")
    if lex.get("typosquat"):
        reasons.append("Typosquat-like Domain")
    if lex.get("homograph"):
        reasons.append("Possible Homograph (Unicode)")
    if lex.get("randomized_label"):
        reasons.append("Randomized Domain Label")
    if lex.get("contains_keyword"):
        reasons.append("Suspicious Keyword in URL")
    if lex.get("incorrect_casing"):
        reasons.append("Incorrect / Mixed Casing in Domain")

    # Content-based reasons
    cf = features.get("content", {})
    if cf.get("num_redirects", 0) > 3:
        reasons.append("Excessive Redirects")
    if cf.get("hidden_iframes", 0) > 0:
        reasons.append("Hidden Iframes")
    if cf.get("obfuscated_scripts", 0) > 0:
        reasons.append("Obfuscated Scripts")
    if cf.get("login_forms_count", 0) > 0 and lex.get("scheme") != "https":
        reasons.append("Login Form on Insecure Page")
    if cf.get("domain_brand_mismatch"):
        reasons.append("Domain/Brand Mismatch in Content")

    # Behavioral reasons
    bf = features.get("behavioral", {})
    if bf.get("suspicious_download_links", 0) > 0:
        reasons.append("Suspicious Download Links")
    if bf.get("mouseover_mismatch_count", 0) > 0:
        reasons.append("Mouseover URL Mismatch")
    if bf.get("popup_like_counts", 0) > 2:
        reasons.append("Excessive Popup-like Scripts")

    classification = "Healthy" if not reasons else "Unhealthy"
    return classification, reasons

    # trimmed here for brevity, keep original classification logic
    classification = "Healthy" if not reasons else "Unhealthy"

    return classification, reasons

# ---------------------------
# Export helpers
# ---------------------------
def export_report_csv(report, filename="phish_report.csv"):
    fieldnames = [
        "url", "final_url", "domain", "classification", "reasons",
        "scheme", "tld", "url_length", "subdomain_count", "special_char_count",
        "has_ip", "typosquat", "homograph", "label_entropy", "randomized_label",
        "contains_keyword", "incorrect_casing", "num_links", "external_ratio",
        "num_redirects", "hidden_iframes", "obfuscated_scripts", "login_forms_count",
        "domain_brand_mismatch", "suspicious_download_links", "mouseover_mismatch_count",
        "popup_like_counts", "domain_age_days", "whois_privacy", "registrant",
        "hosting_country", "blacklisted", "ssl_valid", "ssl_not_before", "ssl_not_after", "ssl_self_signed",
        "parent"
    ]
    rows = []
    for url, f in report.items():
        parsed_final = urlparse(f.get("final_url") or url)
        domain = parsed_final.netloc
        who = f.get("host", {}).get("whois", {}) or {}
        ssl = f.get("host", {}).get("ssl", {}) or {}
        row = {
            "url": url,
            "final_url": f.get("final_url"),
            "domain": domain,
            "classification": f.get("classification"),
            "reasons": ";".join(f.get("reasons", [])),
            "scheme": f.get("lexical", {}).get("scheme"),
            "tld": f.get("lexical", {}).get("tld"),
            "url_length": f.get("lexical", {}).get("url_length"),
            "subdomain_count": f.get("lexical", {}).get("subdomain_count"),
            "special_char_count": f.get("lexical", {}).get("special_char_count"),
            "has_ip": f.get("lexical", {}).get("has_ip"),
            "typosquat": f.get("lexical", {}).get("typosquat"),
            "homograph": f.get("lexical", {}).get("homograph"),
            "label_entropy": f.get("lexical", {}).get("label_entropy"),
            "randomized_label": f.get("lexical", {}).get("randomized_label"),
            "contains_keyword": f.get("lexical", {}).get("contains_keyword"),
            "incorrect_casing": f.get("lexical", {}).get("incorrect_casing"),
            "num_links": f.get("num_links"),
            "external_ratio": f.get("external_ratio"),
            "num_redirects": f.get("content", {}).get("num_redirects"),
            "hidden_iframes": f.get("content", {}).get("hidden_iframes"),
            "obfuscated_scripts": f.get("content", {}).get("obfuscated_scripts"),
            "login_forms_count": f.get("content", {}).get("login_forms_count"),
            "domain_brand_mismatch": f.get("content", {}).get("domain_brand_mismatch"),
            "suspicious_download_links": f.get("behavioral", {}).get("suspicious_download_links"),
            "mouseover_mismatch_count": f.get("behavioral", {}).get("mouseover_mismatch_count"),
            "popup_like_counts": f.get("behavioral", {}).get("popup_like_counts"),
            "domain_age_days": who.get("age_days"),
            "whois_privacy": who.get("whois_privacy"),
            "registrant": who.get("registrant"),
            "hosting_country": f.get("host", {}).get("hosting_country"),
            "blacklisted": f.get("host", {}).get("blacklisted"),
            "ssl_valid": ssl.get("ssl_valid"),
            "ssl_not_before": ssl.get("not_before"),
            "ssl_not_after": ssl.get("not_after"),
            "ssl_self_signed": ssl.get("self_signed"),
            "parent": f.get("parent"),
        }
        rows.append(row)

    with open(filename, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=fieldnames)
        writer.writeheader()
        for r in rows:
            # Convert datetimes to ISO strings
            if r["ssl_not_before"] and isinstance(r["ssl_not_before"], datetime.datetime):
                r["ssl_not_before"] = r["ssl_not_before"].isoformat()
            if r["ssl_not_after"] and isinstance(r["ssl_not_after"], datetime.datetime):
                r["ssl_not_after"] = r["ssl_not_after"].isoformat()
            writer.writerow(r)
    print(f"[INFO] CSV exported to {filename}")


def export_report_json(report, filename="phish_report.json"):
    # convert non-serializable types minimally
    serializable = {}
    for url, f in report.items():
        ff = json.loads(json.dumps(f, default=str))
        serializable[url] = ff
    with open(filename, "w", encoding="utf-8") as fh:
        json.dump(serializable, fh, indent=2)
    print(f"[INFO] JSON exported to {filename}")


# ---------------------------
# Analyzer
# ---------------------------
def analyze_url(start_url, depth=1, max_pages=1000):
    seen, report = set(), {}
    def _analyze(url, d, parent=None):
        if url in seen or len(seen) >= max_pages: return
        seen.add(url)
        print(f"[INFO] Analyzing: {url} (depth={d})")
        lex = lexical_features(url)
        resp = fetch_url(url, allow_redirects=True)
        final_url = resp.url if resp else url
        domain = urlparse(final_url).netloc.lower()
        host = {}  # omitted: same as your original host_features(domain)
        links, soup = ([], None)
        if resp: links, soup = extract_links(final_url, resp)
        cf = content_features_from_response(final_url, resp, soup)
        bf = behavioral_features_from_soup(soup)
        logo_url = get_site_logo(final_url, soup)
        features = {"lexical": lex, "host": host, "content": cf, "behavioral": bf,
                    "num_links": len(links), "external_ratio": 0, "external_links_sample": [],
                    "parent": parent, "final_url": final_url, "logo_url": logo_url,
                    "fetch_failed": resp is None}
        classification, reasons = collect_reasons_and_classify(features)
        features["classification"], features["reasons"] = classification, reasons
        report[url] = features
    _analyze(start_url, 0, parent=None)
    return report


if __name__ == "__main__":
    TEST_URLS = ["https://telstra-109995.weeblysite.com/"]
    all_report = {}
    for url in TEST_URLS:
        all_report.update(analyze_url(url))
    export_report_csv(all_report, "phish_report.csv")
    export_report_json(all_report, "phish_report.json")


[INFO] Analyzing: https://telstra-109995.weeblysite.com/ (depth=0)


  features["has_ip"] = bool(IP_RE.match(ext.registered_domain or domain))
  domain_norm = (ext.registered_domain or domain).lower()


[INFO] CSV exported to phish_report.csv
[INFO] JSON exported to phish_report.json


In [None]:
!pip install requests beautifulsoup4 tldextract pandas pyarrow selenium webdriver-manager

Collecting selenium
  Downloading selenium-4.35.0-py3-none-any.whl.metadata (7.4 kB)
Collecting webdriver-manager
  Downloading webdriver_manager-4.0.2-py2.py3-none-any.whl.metadata (12 kB)
Collecting trio~=0.30.0 (from selenium)
  Downloading trio-0.30.0-py3-none-any.whl.metadata (8.5 kB)
Collecting trio-websocket~=0.12.2 (from selenium)
  Downloading trio_websocket-0.12.2-py3-none-any.whl.metadata (5.1 kB)
Collecting typing-extensions>=4.0.0 (from beautifulsoup4)
  Downloading typing_extensions-4.14.1-py3-none-any.whl.metadata (3.0 kB)
Collecting outcome (from trio~=0.30.0->selenium)
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl.metadata (2.6 kB)
Collecting wsproto>=0.14 (from trio-websocket~=0.12.2->selenium)
  Downloading wsproto-1.2.0-py3-none-any.whl.metadata (5.6 kB)
Downloading selenium-4.35.0-py3-none-any.whl (9.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m82.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading webdriver_manager

In [None]:
#!/usr/bin/env python3
"""
site_intel_parquet_llm.py

What it does:
 - Scrape pages via requests + BeautifulSoup; fallback to Selenium when needed
 - Extract DOM fields (title/meta/text/links/forms/images/logo/scripts/inline handlers)
 - Classify site category (banking, ecommerce, education, ...)
 - Detect facilities (login, password field, payment, OTP, uploads, downloads, admin links)
 - Build an LLM prompt and optionally call OpenAI Chat API (gpt-3.5-turbo)
 - Save an extracted record per URL to a Parquet file (and JSON per-run)

Dependencies:
 pip install requests beautifulsoup4 tldextract pandas pyarrow selenium webdriver-manager

Set OPENAI_API_KEY environment variable to enable LLM calls (optional).
"""

import os
import re
import json
import time
import datetime
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
import tldextract
import pandas as pd

# Selenium imports
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

# -------------------------
# CONFIG
# -------------------------
REQUEST_TIMEOUT = 12
USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
              "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36")
HEADERS = {"User-Agent": USER_AGENT}

PARQUET_OUTPUT = "site_intel.parquet"
JSON_DIR = "site_reports_json"
os.makedirs(JSON_DIR, exist_ok=True)

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")  # set this env var to call OpenAI
OPENAI_MODEL = "gpt-3.5-turbo"
CALL_OPENAI = bool(OPENAI_API_KEY)  # toggle based on presence of key

MAX_TEXT_CHARS = 200000

# Category keywords (rule-based)
CATEGORY_KEYWORDS = {
    "banking": ["bank", "account", "transfer", "loan", "atm", "routing", "credit card", "debit"],
    "ecommerce": ["add to cart", "shopping cart", "checkout", "price", "buy now", "order", "product", "cart"],
    "education": ["course", "university", "college", "admissions", "syllabus", "lecture"],
    "government": ["gov", "official", "public service", "tax", "ministry"],
    "social": ["follow", "share", "profile", "timeline", "friends"],
    "media": ["news", "article", "press", "blog", "journal"],
    "cloud_storage": ["upload", "download", "file", "storage", "drive", "share file"],
    "crypto": ["wallet", "private key", "crypto", "ethereum", "bitcoin"],
    "healthcare": ["appointment", "clinic", "medical", "doctor", "patient"],
    "travel": ["booking", "flight", "hotel", "reservation"],
}

BRAND_KEYWORDS = ["paypal", "google", "facebook", "bank", "amazon", "microsoft", "apple"]

# -------------------------
# Fetch helpers
# -------------------------
def fetch_with_requests(url: str, timeout: int = REQUEST_TIMEOUT) -> Tuple[Optional[requests.Response], Optional[str]]:
    """Fetch via requests. Returns (response, None) or (None, error_string)."""
    try:
        resp = requests.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True)
        resp.raise_for_status()
        # consider small responses potentially needing fallback
        if not resp.text or len(resp.text.strip()) < 120:
            return resp, "content_too_small"
        return resp, None
    except Exception as e:
        return None, str(e)


def fetch_with_selenium(url: str, timeout: int = 20) -> Tuple[Optional[Tuple[str, str]], Optional[str]]:
    """
    Use Selenium headless Chrome to load the page and return (final_url, html) or (None, error).
    """
    chrome_options = Options()
    chrome_options.headless = True
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument(f"user-agent={USER_AGENT}")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--window-size=1920,1080")
    driver = None
    try:
        driver = webdriver.Chrome(ChromeDriverManager().install(), options=chrome_options)
        driver.set_page_load_timeout(timeout)
        driver.get(url)
        time.sleep(1.0)  # let some JS run
        html = driver.page_source
        final = driver.current_url
        return (final, html), None
    except Exception as e:
        return None, str(e)
    finally:
        if driver:
            try:
                driver.quit()
            except Exception:
                pass

# -------------------------
# Extraction helpers
# -------------------------
def sanitize_text(s: Optional[str]) -> str:
    if not s:
        return ""
    return re.sub(r"\s+", " ", s).strip()

def extract_basic_dom_fields(base_url: str, html: str) -> Dict[str, Any]:
    """Parse HTML and extract canonical DOM fields."""
    soup = BeautifulSoup(html or "", "html.parser")
    out: Dict[str, Any] = {}
    out["title"] = sanitize_text(soup.title.string) if soup.title and soup.title.string else ""
    meta_tag = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property": "og:description"})
    out["meta_description"] = sanitize_text(meta_tag.get("content")) if meta_tag and meta_tag.get("content") else ""
    out["text"] = sanitize_text(soup.get_text(separator=" ", strip=True))[:MAX_TEXT_CHARS]

    anchors = []
    for a in soup.find_all("a", href=True):
        href = a.get("href").strip()
        abs_href = urljoin(base_url, href)
        text = (a.get_text(" ", strip=True) or "").strip()
        anchors.append({"href": abs_href, "text": text})
    out["links"] = anchors

    forms = []
    for f in soup.find_all("form"):
        action = f.get("action") or ""
        method = (f.get("method") or "GET").upper()
        inputs = []
        for i in f.find_all(["input", "select", "textarea", "button"]):
            itype = (i.get("type") or i.name or "").lower()
            name = (i.get("name") or "").strip()
            placeholder = (i.get("placeholder") or "").strip()
            inputs.append({"type": itype, "name": name, "placeholder": placeholder})
        forms.append({"action": urljoin(base_url, action), "method": method, "inputs": inputs})
    out["forms"] = forms

    images = []
    for img in soup.find_all("img", src=True):
        try:
            src = img.get("src")
            images.append({"src": urljoin(base_url, src), "alt": (img.get("alt") or "").strip()})
        except Exception:
            continue
    out["images"] = images

    # logo (favicon or og:image)
    ico = None
    link_icon = soup.find("link", rel=lambda v: v and "icon" in v.lower())
    og_image = soup.find("meta", property="og:image")
    if link_icon and link_icon.get("href"):
        ico = urljoin(base_url, link_icon["href"])
    elif og_image and og_image.get("content"):
        ico = urljoin(base_url, og_image["content"])
    out["logo"] = ico or ""

    out["scripts_count"] = len(soup.find_all("script"))
    inline_handlers = 0
    for tag in soup.find_all(True):
        for attr in tag.attrs.keys():
            if isinstance(attr, str) and attr.lower().startswith("on"):
                inline_handlers += 1
    out["inline_event_handlers"] = inline_handlers

    return out

# -------------------------
# Classification & facility detection
# -------------------------
def classify_site_from_text(title: str, meta: str, text: str) -> str:
    score = {k: 0 for k in CATEGORY_KEYWORDS.keys()}
    source = " ".join([title or "", meta or "", text or ""]).lower()
    for cat, kws in CATEGORY_KEYWORDS.items():
        for kw in kws:
            if title and kw in (title or "").lower():
                score[cat] += 3
            if meta and kw in (meta or "").lower():
                score[cat] += 2
            if kw in source:
                score[cat] += 1
    best = max(score.items(), key=lambda x: x[1])
    if best[1] == 0:
        return "other"
    second = sorted(score.items(), key=lambda x: x[1], reverse=True)[1]
    if best[1] - second[1] < 2:
        return "other"
    return best[0]

def detect_facilities(dom: Dict[str, Any]) -> Dict[str, Any]:
    """Return dict of boolean indicators for facilities present on the page.
    This is the same implementation you asked to preserve, with minimal addition to reliably detect login forms.
    """
    facilities = {
        "has_login_form": False,
        "has_password_field": False,
        "has_payment_checkout": False,
        "has_cart_keywords": False,
        "has_file_upload": False,
        "has_contact_form": False,
        "has_otp_field": False,
        "has_social_login": False,
        "has_download_links": False,
        "has_admin_panel_links": False,
    }

    text = (dom.get("title") or "") + " " + (dom.get("meta_description") or "") + " " + (dom.get("text") or "")
    t = text.lower()

    # login / password detection (minimal edits from previous function)
    for f in dom.get("forms", []):
        inputs = f.get("inputs", [])
        # if there is any explicit password input
        if any((i.get("type") or "").lower() == "password" for i in inputs):
            facilities["has_login_form"] = True
            facilities["has_password_field"] = True
            if any(k in f.get("action", "").lower() for k in ["/login", "/signin", "/auth", "account", "session"]):
                facilities["has_login_form"] = True

        # MINIMAL ADDITION:
        # detect common login forms that don't have type="password"
        # e.g., email or username field plus a submit/button input
        names = [ (i.get("name") or "").lower() for i in inputs ]
        placeholders = [ (i.get("placeholder") or "").lower() for i in inputs ]
        types = [ (i.get("type") or "").lower() for i in inputs ]
        # if there's an email/username field and a submit/button, consider login form present
        if (any("user" in n or "email" in n or "login" in n or "username" in n for n in names) or
            any("email" in p or "user" in p or "login" in p or "username" in p for p in placeholders)):
            if any(t in ("submit", "button") or t == "" for t in types):
                facilities["has_login_form"] = True

        # file upload
        if any((i.get("type") or "").lower() in ("file", "fileupload") for i in inputs):
            facilities["has_file_upload"] = True

        # contact form (presence of name/email/message fields)
        if any("email" in n or "message" in n or "contact" in n for n in names):
            facilities["has_contact_form"] = True

        # OTP field detection
        if any("otp" in (i.get("name") or "").lower() or "one-time" in (i.get("placeholder") or "").lower() for i in inputs):
            facilities["has_otp_field"] = True

        # textual indicators for social login (within page text)
        if any(kw in t for kw in ["login with google", "login with facebook", "sign in with google", "oauth"]):
            facilities["has_social_login"] = True

    # payment / cart detection via keywords or links
    if any(kw in t for kw in ["checkout", "add to cart", "shopping cart", "place order", "card number", "cvv", "billing"]):
        facilities["has_payment_checkout"] = True
    if any(kw in t for kw in ["cart", "basket", "sku", "price"]):
        facilities["has_cart_keywords"] = True

    # social login from outside forms
    if any(kw in t for kw in ["login with google", "login with facebook", "sign in with google", "oauth"]):
        facilities["has_social_login"] = True

    # download links & admin links from anchors
    for a in dom.get("links", []):
        href = (a.get("href") or "").lower()
        if href.endswith((".exe", ".apk", ".msi", ".zip", ".scr")):
            facilities["has_download_links"] = True
        if any(x in href for x in ["/admin", "/wp-admin", "panel", "/manage"]):
            facilities["has_admin_panel_links"] = True

    # final sanity: prominent 'login' text
    if "log in" in t or "sign in" in t or "sign-in" in t:
        facilities["has_login_form"] = True

    return facilities

# -------------------------
# LLM prompt builder & call
# -------------------------
def build_llm_prompt(url: str, site_category: str, facilities: Dict[str, Any], dom: Dict[str, Any]) -> str:
    lines = []
    lines.append("You are a cybersecurity analyst. Analyze this website and list realistic phishing or malicious scenarios.")
    lines.append(f"URL: {url}")
    lines.append(f"Estimated category: {site_category}")
    lines.append("Detected facilities (True/False):")
    for k, v in facilities.items():
        lines.append(f"  - {k}: {v}")
    lines.append("\nPage title: " + (dom.get("title") or ""))
    lines.append("Meta description: " + (dom.get("meta_description") or ""))
    sample_text = (dom.get("text") or "")[:1000]
    lines.append("Top text snippet: " + sample_text)
    sample_links = dom.get("links", [])[:10]
    if sample_links:
        lines.append("Sample links (href -> text):")
        for a in sample_links:
            lines.append(f"  - {a.get('href')} -> '{a.get('text')}'")
    sample_forms = dom.get("forms", [])[:5]
    if sample_forms:
        lines.append("Sample forms (action, method, inputs):")
        for f in sample_forms:
            lines.append(f"  - action: {f.get('action')}, method: {f.get('method')}, inputs: {f.get('inputs')}")
    lines.append("\nInstructions:")
    lines.append("  1) Provide 3-6 plausible phishing or malicious scenarios that attackers might attempt on this site.")
    lines.append("  2) For each scenario, give: attack vector, preconditions, user-visible indicators (what users might see).")
    lines.append("  3) Provide 2 prioritized mitigations or detection steps for defenders.")
    lines.append("Return the answer as JSON with keys: scenarios (list of objects) and mitigations (list).")
    return "\n".join(lines)


def call_openai_chat(api_key: str, prompt: str, model="gpt-3.5-turbo", max_tokens=700) -> Tuple[Optional[Dict], Optional[str]]:
    if not api_key:
        return None, "no-api-key-provided"
    try:
        url = "https://api.openai.com/v1/chat/completions"
        headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
        body = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.2,
            "max_tokens": max_tokens,
        }
        resp = requests.post(url, headers=headers, json=body, timeout=60)
        resp.raise_for_status()
        return resp.json(), None
    except Exception as e:
        return None, str(e)

# -------------------------
# Orchestrator: scrape -> classify -> facilities -> LLM -> persist
# -------------------------
def analyze_and_persist(url: str, parquet_path: str = PARQUET_OUTPUT, selenium_fallback: bool = True, call_llm: bool = CALL_OPENAI) -> Dict[str, Any]:
    report: Dict[str, Any] = {
        "url": url,
        "scraped_at": datetime.datetime.utcnow().isoformat() + "Z",
        "fetch_method": None,
        "fetch_error": None,
        "final_url": None,
        "dom": None,
        "category": None,
        "facilities": None,
        "llm_prompt": None,
        "llm_response": None,
        "notes": [],
    }

    # 1) Try requests
    resp, err = fetch_with_requests(url)
    html = None
    if resp and not err:
        html = resp.text
        report["final_url"] = resp.url
        report["fetch_method"] = "requests"
    else:
        report["fetch_method"] = "requests_failed"
        report["fetch_error"] = err
        report["notes"].append(f"requests failed: {err}")

    # 2) If content too small or missing, optionally use selenium fallback
    if (not html or len(html.strip()) < 120) and selenium_fallback:
        report["notes"].append("Attempting Selenium fallback (requests insufficient)")
        selenium_result, s_err = fetch_with_selenium(url)
        if selenium_result:
            final, html = selenium_result
            report["final_url"] = final or report["final_url"] or url
            report["fetch_method"] = "selenium"
            report["notes"].append("Selenium fetch succeeded")
        else:
            report["notes"].append(f"Selenium fetch failed: {s_err}")
            report["fetch_error"] = report["fetch_error"] or s_err

    # 3) If no html available -> fill NA for content-based/behavioral and persist minimal info
    if not html:
        report["dom"] = {
            "title": "",
            "meta_description": "",
            "text": "",
            "links": [],
            "forms": [],
            "images": [],
            "logo": "",
            "scripts_count": "NA",
            "inline_event_handlers": "NA",
        }
        report["category"] = "unknown"
        report["facilities"] = {k: "NA" for k in ["has_login_form","has_password_field","has_payment_checkout",
                                                   "has_cart_keywords","has_file_upload","has_contact_form",
                                                   "has_otp_field","has_social_login","has_download_links",
                                                   "has_admin_panel_links"]}
        # Save and return
        _append_record_to_parquet_and_json(report, parquet_path)
        return report

    # 4) Parse DOM & extract fields
    dom = extract_basic_dom_fields(report.get("final_url") or url, html)
    report["dom"] = dom

    # 5) classify & detect facilities
    category = classify_site_from_text(dom.get("title"), dom.get("meta_description"), dom.get("text"))
    report["category"] = category
    facilities = detect_facilities(dom)
    report["facilities"] = facilities

    # 6) Build LLM prompt
    prompt = build_llm_prompt(report["final_url"], category, facilities, dom)
    report["llm_prompt"] = prompt

    # 7) Call LLM if requested
    if call_llm:
        rjson, err = call_openai_chat(OPENAI_API_KEY, prompt, model=OPENAI_MODEL)
        if err:
            report["llm_error"] = err
            report["notes"].append(f"OpenAI call failed: {err}")
        else:
            report["llm_response"] = rjson

    # 8) Persist record
    _append_record_to_parquet_and_json(report, parquet_path)
    return report

# -------------------------
# Persistence helpers
# -------------------------
def _append_record_to_parquet_and_json(record: Dict[str, Any], parquet_path: str):
    # Flatten fields for parquet
    row = {
        "url": record.get("url"),
        "scraped_at": record.get("scraped_at"),
        "final_url": record.get("final_url"),
        "fetch_method": record.get("fetch_method"),
        "fetch_error": record.get("fetch_error"),
        "category": record.get("category"),
        "title": record.get("dom", {}).get("title"),
        "meta_description": record.get("dom", {}).get("meta_description"),
        "text_snippet": (record.get("dom", {}).get("text") or "")[:2000],
        "links_count": len(record.get("dom", {}).get("links") or []),
        "forms_count": len(record.get("dom", {}).get("forms") or []),
        "images_count": len(record.get("dom", {}).get("images") or []),
        "logo": record.get("dom", {}).get("logo"),
        "scripts_count": record.get("dom", {}).get("scripts_count"),
        "inline_event_handlers": record.get("dom", {}).get("inline_event_handlers"),
        "facilities": json.dumps(record.get("facilities") or {}, ensure_ascii=False),
        "llm_response": json.dumps(record.get("llm_response") or {}, ensure_ascii=False),
        "notes": json.dumps(record.get("notes") or [], ensure_ascii=False),
    }
    df_row = pd.DataFrame([row])

    # append or create parquet
    try:
        if os.path.exists(parquet_path):
            existing = pd.read_parquet(parquet_path)
            combined = pd.concat([existing, df_row], ignore_index=True)
            combined.to_parquet(parquet_path, index=False)
        else:
            df_row.to_parquet(parquet_path, index=False)
        print(f"[INFO] Appended record for {row['url']} to {parquet_path}")
    except Exception as e:
        print(f"[ERROR] Could not write parquet: {e}")
        # fallback: write JSON file
        ts = int(time.time())
        fname = os.path.join(JSON_DIR, f"site_report_fallback_{ts}.json")
        with open(fname, "w", encoding="utf-8") as fh:
            json.dump(record, fh, ensure_ascii=False, indent=2)
        print(f"[INFO] Wrote fallback JSON to {fname}")

    # also always write a full JSON snapshot for easy inspection
    ts = int(time.time())
    fname = os.path.join(JSON_DIR, f"site_report_{ts}.json")
    try:
        with open(fname, "w", encoding="utf-8") as fh:
            json.dump(record, fh, ensure_ascii=False, indent=2)
        print(f"[INFO] Wrote JSON snapshot to {fname}")
    except Exception as e:
        print(f"[WARN] Could not write JSON snapshot: {e}")

# -------------------------
# CLI demo
# -------------------------
if __name__ == "__main__":
    TEST_URLS = [
        # Modify/add URLs for testing (be mindful of terms of service and legality)
        #"https://www.paypal.com/signin",
        #"https://example.com",
        "https://telstra-109995.weeblysite.com/"
        # Add known phishing templates if you have them locally or test pages you control
    ]

    for u in TEST_URLS:
        print("\n" + "=" * 60)
        print(f"[RUN] Analyzing {u}")
        rpt = analyze_and_persist(u, parquet_path=PARQUET_OUTPUT, selenium_fallback=True, call_llm=CALL_OPENAI)
        print("Final URL:", rpt.get("final_url"))
        print("Category:", rpt.get("category"))
        print("Logo:", rpt.get("dom", {}).get("logo"))
        print("Facilities:", json.dumps(rpt.get("facilities"), indent=2))
        if rpt.get("llm_response"):
            # Try to print a short preview of the model's answer (if present)
            try:
                preview = rpt["llm_response"]["choices"][0]["message"]["content"]
                print("LLM preview (truncated):\n", preview[:800])
            except Exception:
                print("LLM response present but could not parse preview.")
        if rpt.get("fetch_error"):
            print("Fetch error:", rpt.get("fetch_error"))
    print("\nDone. Parquet file:", PARQUET_OUTPUT)



[RUN] Analyzing https://telstra-109995.weeblysite.com/


  "scraped_at": datetime.datetime.utcnow().isoformat() + "Z",


[INFO] Appended record for https://telstra-109995.weeblysite.com/ to site_intel.parquet
[INFO] Wrote JSON snapshot to site_reports_json/site_report_1756561929.json
Final URL: https://telstra-109995.weeblysite.com/
Category: other
Logo: https://www.weebly.com/favicon.ico
Facilities: {
  "has_login_form": false,
  "has_password_field": false,
  "has_payment_checkout": false,
  "has_cart_keywords": false,
  "has_file_upload": false,
  "has_contact_form": false,
  "has_otp_field": false,
  "has_social_login": false,
  "has_download_links": false,
  "has_admin_panel_links": false
}

Done. Parquet file: site_intel.parquet


#### **Logo Matching**

In [None]:
# ===== Install dependencies (Colab) =====
#!pip install torch torchvision pillow faiss-cpu --quiet

import torch, torch.nn.functional as F
from torchvision import models, transforms
from pathlib import Path
from PIL import Image
import faiss
import numpy as np

# ===== Config =====
FOLDER = Path("/content/sample_data/images")   # folder with dataset images
IMG_SIZE = 224
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ===== Model & Preprocessing =====
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
model.fc = torch.nn.Identity()   # remove classification head → feature extractor
model.eval().to(DEVICE)

transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

def get_features(img_path: Path):
    img = Image.open(img_path).convert("RGB")
    x = transform(img).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        feat = model(x).cpu().numpy()
    return feat / np.linalg.norm(feat)  # L2 normalize

# ===== Index dataset images =====
image_paths = list(FOLDER.glob("*.jpg")) + list(FOLDER.glob("*.png"))
features = np.vstack([get_features(p) for p in image_paths])

index = faiss.IndexFlatL2(features.shape[1])
index.add(features)

print(f"Indexed {len(image_paths)} images.")

# ===== Function to query top-5 similar images =====
def find_similar(input_img: str, topk=5):
    q_feat = get_features(Path(input_img))
    D, I = index.search(q_feat, topk)  # D=distances, I=indices
    results = [image_paths[i] for i in I[0]]
    return results, D[0]

# ===== Example usage =====
query_img = "/content/sample_data/images/sample.jpg"   # replace with your input image
matches, scores = find_similar(query_img)

print("Top matches:")
for path, score in zip(matches, scores):
    print(f"{path} (distance={score:.4f})")

In [None]:
#!/usr/bin/env python3
"""
site_scrape_bs_selenium.py

Scrape webpages using Requests + BeautifulSoup. If requests fails to fetch the page
(or returns an empty/minimal response), use Selenium (headless Chrome) to load the page
and then parse the page source with BeautifulSoup.

Extracted data (title, meta, text, links, forms, images, scripts, inline handlers, logo, fetch info)
is saved to a Parquet file (one row per URL).

Usage:
  - Edit TEST_URLS and run: python site_scrape_bs_selenium.py
  - Requires chromedriver; webdriver-manager will auto-install it.
"""

import json
import re
import time
import datetime
from typing import Dict, Any, List, Tuple, Optional
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
import tldextract
import pandas as pd

# Selenium imports
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException, TimeoutException
from webdriver_manager.chrome import ChromeDriverManager

# -------------------------
# Config
# -------------------------
REQUEST_TIMEOUT = 12
USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
              "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0 Safari/537.36")
HEADERS = {"User-Agent": USER_AGENT}

PARQUET_OUTPUT = "scraped_pages.parquet"
MAX_TEXT_CHARS = 200000  # limit text stored

# -------------------------
# Scraping helpers
# -------------------------
def fetch_with_requests(url: str, timeout: int = REQUEST_TIMEOUT) -> Tuple[Optional[requests.Response], Optional[str]]:
    """Try a GET via requests. Return (response, error_string)"""
    try:
        resp = requests.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True)
        resp.raise_for_status()
        # treat very small responses as insufficient
        if not resp.text or len(resp.text.strip()) < 100:
            return resp, "fetched_but_content_small"
        return resp, None
    except Exception as e:
        return None, str(e)


def fetch_with_selenium(url: str, timeout: int = 20) -> Tuple[Optional[str], Optional[str]]:
    """
    Use Selenium + headless Chrome to load the page and return page_source (HTML) or error.
    webdriver-manager used to automatically provide chromedriver.
    """
    chrome_options = Options()
    chrome_options.headless = True
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument(f"user-agent={USER_AGENT}")
    chrome_options.add_argument("--disable-extensions")
    chrome_options.add_argument("--window-size=1920,1080")

    driver = None
    try:
        driver = webdriver.Chrome(ChromeDriverManager().install(), options=chrome_options)
        driver.set_page_load_timeout(timeout)
        driver.get(url)
        # give some time for JS to run (small, non-blocking)
        time.sleep(1.0)
        html = driver.page_source
        final_url = driver.current_url
        return (final_url, html), None
    except Exception as e:
        return None, str(e)
    finally:
        if driver:
            try:
                driver.quit()
            except Exception:
                pass


# -------------------------
# Extraction helpers
# -------------------------
def sanitize_text(s: Optional[str]) -> str:
    if not s:
        return ""
    return re.sub(r"\s+", " ", s).strip()

def extract_basic_dom_fields(base_url: str, html: str) -> Dict[str, Any]:
    """
    Parse HTML with BeautifulSoup and extract:
      - title, meta description
      - text (trimmed)
      - links: list of dicts {href, text}
      - forms: list of dicts {action, method, inputs: [{type,name,placeholder}]}
      - images: list of dicts {src, alt}
      - scripts_count, inline_event_handlers_count
      - logo (favicon or og:image)
    """
    soup = BeautifulSoup(html or "", "html.parser")
    out: Dict[str, Any] = {}
    out["title"] = sanitize_text(soup.title.string) if soup.title and soup.title.string else ""
    meta_desc_tag = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property": "og:description"})
    out["meta_description"] = sanitize_text(meta_desc_tag.get("content")) if meta_desc_tag and meta_desc_tag.get("content") else ""
    # page text (limited)
    out["text"] = sanitize_text(soup.get_text(separator=" ", strip=True))[:MAX_TEXT_CHARS]

    # anchors / links
    anchors = []
    for a in soup.find_all("a", href=True):
        try:
            href = a.get("href").strip()
            abs_href = urljoin(base_url, href)
            text = (a.get_text(" ", strip=True) or "").strip()
            anchors.append({"href": abs_href, "text": text})
        except Exception:
            continue
    out["links"] = anchors

    # forms
    forms = []
    for f in soup.find_all("form"):
        action = f.get("action") or ""
        method = (f.get("method") or "GET").upper()
        inputs = []
        for inp in f.find_all(["input", "select", "textarea", "button"]):
            itype = (inp.get("type") or inp.name or "").lower()
            name = (inp.get("name") or "").strip()
            placeholder = (inp.get("placeholder") or "").strip()
            inputs.append({"type": itype, "name": name, "placeholder": placeholder})
        forms.append({"action": urljoin(base_url, action), "method": method, "inputs": inputs})
    out["forms"] = forms

    # images
    images = []
    for img in soup.find_all("img", src=True):
        try:
            src = img.get("src")
            images.append({"src": urljoin(base_url, src), "alt": (img.get("alt") or "").strip()})
        except Exception:
            continue
    out["images"] = images

    # favicon / og:image as logo
    ico = None
    link_icon = soup.find("link", rel=lambda v: v and "icon" in v.lower())
    og_image = soup.find("meta", property="og:image")
    if link_icon and link_icon.get("href"):
        ico = urljoin(base_url, link_icon["href"])
    elif og_image and og_image.get("content"):
        ico = urljoin(base_url, og_image["content"])
    out["logo"] = ico or ""

    # scripts and inline handlers
    out["scripts_count"] = len(soup.find_all("script"))
    # inline handlers: count tags having attributes starting with "on" (onclick, onmouseover...)
    inline_handlers = 0
    for tag in soup.find_all(True):
        for attr in tag.attrs.keys():
            if isinstance(attr, str) and attr.lower().startswith("on"):
                inline_handlers += 1
    out["inline_event_handlers"] = inline_handlers

    return out


# -------------------------
# Orchestrator: fetch -> extract -> save
# -------------------------
def scrape_page_and_save(url: str, parquet_path: str = PARQUET_OUTPUT, selenium_fallback: bool = True) -> Dict[str, Any]:
    """
    Scrape a single page:
     - Try requests first
     - If requests fails or returns tiny content, try Selenium fallback to fetch rendered HTML
     - Extract DOM fields via BeautifulSoup
     - Save extracted record to Parquet (append if file exists)
    Returns the record dict (also written to parquet).
    """
    record: Dict[str, Any] = {
        "url": url,
        "scraped_at": datetime.datetime.utcnow().isoformat() + "Z",
        "final_url": None,
        "fetch_method": None,
        "fetch_error": None,
        "html_length": 0,
        "dom": None,
    }

    # 1) Try requests
    resp, err = fetch_with_requests(url)
    html = None
    if resp and not err:
        html = resp.text
        record["final_url"] = resp.url
        record["fetch_method"] = "requests"
    else:
        # record the error
        record["fetch_error"] = err
        record["fetch_method"] = "requests_failed"

    # 2) If requests returned but content small/insufficient, or requests failed -> attempt Selenium fallback
    need_selenium = False
    if not html or len(html.strip()) < 120:
        need_selenium = True

    if need_selenium and selenium_fallback:
        # try selenium to render JS & get page_source
        selenium_result, s_err = fetch_with_selenium(url)
        if selenium_result:
            final_url, html = selenium_result
            record["final_url"] = final_url or record.get("final_url") or url
            record["fetch_method"] = "selenium"
            record["fetch_error"] = record.get("fetch_error") or None
        else:
            # selenium failed too
            record["fetch_error"] = (record.get("fetch_error") or "") + ("; selenium_failed: " + str(s_err) if s_err else "")
            # leave html None and proceed to set dom=NA later

    # 3) If no html, set dom to NA and write minimal record
    if not html:
        record["html_length"] = 0
        record["dom"] = {
            "title": "",
            "meta_description": "",
            "text": "",
            "links": [],
            "forms": [],
            "images": [],
            "logo": "",
            "scripts_count": "NA",
            "inline_event_handlers": "NA",
        }
        # persist and return
        _append_record_to_parquet(record, parquet_path)
        return record

    # 4) Extract DOM via BeautifulSoup
    dom = extract_basic_dom_fields(record.get("final_url") or url, html)
    record["html_length"] = len(html)
    record["dom"] = dom

    # 5) Save record to parquet (append)
    _append_record_to_parquet(record, parquet_path)
    return record


# -------------------------
# Parquet helper
# -------------------------
def _append_record_to_parquet(record: Dict[str, Any], parquet_path: str):
    """
    Append a record to a parquet file. If file exists, read it and append; else create new.
    For nested dicts (dom), convert to JSON string for compact storage, but keep separate columns for key fields.
    """
    # Flatten / prepare row
    row = {
        "url": record.get("url"),
        "scraped_at": record.get("scraped_at"),
        "final_url": record.get("final_url"),
        "fetch_method": record.get("fetch_method"),
        "fetch_error": record.get("fetch_error"),
        "html_length": record.get("html_length"),
        # Dom top-level items for convenience
        "title": record.get("dom", {}).get("title") if record.get("dom") else None,
        "meta_description": record.get("dom", {}).get("meta_description") if record.get("dom") else None,
        "text": record.get("dom", {}).get("text") if record.get("dom") else None,
        "links_count": len(record.get("dom", {}).get("links") or []) if record.get("dom") else 0,
        "forms_count": len(record.get("dom", {}).get("forms") or []) if record.get("dom") else 0,
        "images_count": len(record.get("dom", {}).get("images") or []) if record.get("dom") else 0,
        "logo": record.get("dom", {}).get("logo") if record.get("dom") else None,
        "scripts_count": record.get("dom", {}).get("scripts_count") if record.get("dom") else None,
        "inline_event_handlers": record.get("dom", {}).get("inline_event_handlers") if record.get("dom") else None,
        # Full DOM JSON for reproducibility
        "dom_json": json.dumps(record.get("dom") or {}, ensure_ascii=False),
    }
    df_row = pd.DataFrame([row])

    # Append or create
    try:
        # If parquet exists, read and concat (keeps schema consistent)
        import os
        if os.path.exists(parquet_path):
            old = pd.read_parquet(parquet_path)
            new = pd.concat([old, df_row], ignore_index=True)
            new.to_parquet(parquet_path, index=False)
        else:
            df_row.to_parquet(parquet_path, index=False)
        print(f"[INFO] Appended record for {row['url']} to {parquet_path}")
    except Exception as e:
        print(f"[ERROR] Could not write parquet: {e}")
        # fallback: write a JSON file with timestamp
        ts = int(time.time())
        fallback = f"scrape_fallback_{ts}.json"
        with open(fallback, "w", encoding="utf-8") as fh:
            json.dump(record, fh, ensure_ascii=False, indent=2)
        print(f"[INFO] Wrote fallback JSON to {fallback}")


# -------------------------
# Example usage & tests
# -------------------------
if __name__ == "__main__":
    TEST_URLS = [
        #"https://www.example.com",
        "https://telstra-109995.weeblysite.com/"
        # add more URLs for bulk scraping
    ]

    results = []
    for u in TEST_URLS:
        print(f"[INFO] Scraping: {u}")
        rec = scrape_page_and_save(u, parquet_path=PARQUET_OUTPUT, selenium_fallback=True)
        results.append(rec)

    print("[INFO] Done. Records saved to", PARQUET_OUTPUT)


[INFO] Scraping: https://telstra-109995.weeblysite.com/


  "scraped_at": datetime.datetime.utcnow().isoformat() + "Z",


[INFO] Appended record for https://telstra-109995.weeblysite.com/ to scraped_pages.parquet
[INFO] Done. Records saved to scraped_pages.parquet


In [None]:
import re
import requests
from bs4 import BeautifulSoup

SAFE_KEYWORDS = [
    "google-analytics", "gtag", "fbq", "hotjar", "adsbygoogle", "googletag",
    "jquery", "bootstrap", "react", "vue", "angular"
]

MALICIOUS_PATTERNS = [
    r"eval\(", r"atob\(", r"unescape\(", r"fromCharCode",
    r"window\.location\s*=", r"document\.onkeypress",
    r"\.exe", r"\.apk", r"\.scr", r"cryptominer", r"WebAssembly"
]

def analyze_scripts(url: str):
    try:
        res = requests.get(url, timeout=10)
        res.raise_for_status()
        soup = BeautifulSoup(res.text, "html.parser")

        script_results = []
        for script in soup.find_all("script"):
            code = script.string or ""
            src = script.get("src", "")

            # External script (by src URL)
            if src:
                benign = any(safe in src.lower() for safe in SAFE_KEYWORDS)
                script_results.append({
                    "src": src,
                    "type": "external",
                    "status": "benign" if benign else "unknown"
                })
                continue

            # Inline script
            if not code.strip():
                continue

            malicious_hits = [p for p in MALICIOUS_PATTERNS if re.search(p, code)]
            benign_hits = [k for k in SAFE_KEYWORDS if k in code.lower()]

            if malicious_hits and not benign_hits:
                status = "malicious"
            elif benign_hits:
                status = "benign"
            else:
                status = "unknown"

            script_results.append({
                "src": None,
                "type": "inline",
                "status": status,
                "malicious_hits": malicious_hits,
                "benign_hits": benign_hits,
                "preview": code[:200]  # first 200 chars
            })

        return script_results

    except Exception as e:
        return [{"error": str(e)}]


# Example
scripts = analyze_scripts("https://telstra-109995.weeblysite.com/")
for s in scripts:
    print(s)

{'src': 'https://cdn3.editmysite.com/app/checkout/assets/checkout/system.c3b89b0b94f4ef0671b1.js', 'type': 'external', 'status': 'unknown'}
{'src': 'https://cdn3.editmysite.com/app/checkout/assets/checkout/imports.en.74ef33aa0ad930b9.js', 'type': 'external', 'status': 'unknown'}
{'src': 'https://cdn3.editmysite.com/app/checkout/assets/checkout/locale-imports-map.680f7c3236b652da.json', 'type': 'external', 'status': 'unknown'}
{'src': 'https://cdn3.editmysite.com/app/website/js/runtime.7ee5d63b96d4f5d36052.js', 'type': 'external', 'status': 'unknown'}
{'src': 'https://cdn3.editmysite.com/app/website/js/vue-modules.4a41b3ba298bf4563d97.js', 'type': 'external', 'status': 'benign'}
{'src': 'https://cdn3.editmysite.com/app/website/js/languages/en.77c86c931176c693e0d9.js', 'type': 'external', 'status': 'unknown'}
{'src': 'https://cdn3.editmysite.com/app/website/js/site.2f7222717d1bb29e7c3e.js', 'type': 'external', 'status': 'unknown'}
{'src': None, 'type': 'inline', 'status': 'benign', 'mali

#### END