# ZIA Threat Hunting Lab — Search Engine Queries

This notebook:
1. Loads `../data/lab_dprk.csv`
2. Extracts **host**, **registrable domain**, and **TLD** from the `URL` field
3. Filters to **search-engine** traffic (Google/Bing/Yahoo/DuckDuckGo/Yandex/etc.)
4. Carves out the **search terms** from the URL querystring
5. Makes search terms **searchable** (interactive UI)
6. Flags any search terms matching a **dirty wordlist** from `../data/dirty_wordlist.txt` (case-insensitive)

> Tip: This is a *behavioral* lens. You’re looking for potentially risky intent, not making accusations.


In [None]:
# --- Imports ---
import re
import math
import pandas as pd

from urllib.parse import urlsplit, parse_qs, unquote_plus

import ipywidgets as widgets
import plotly.express as px
from IPython.display import display, HTML

# Optional: PSL-aware registrable-domain extraction
# If missing, install once:  !pip -q install tldextract
try:
    import tldextract
    _TLDEXTRACT = tldextract.TLDExtract(cache_dir=False)
except Exception:
    _TLDEXTRACT = None
    print("WARNING: tldextract not available. Falling back to naive domain parsing.")


In [None]:
_SCHEME_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.-]*://")

def _normalize_url_for_parsing(u: str) -> str:
    """Ensure the URL has a scheme so urlsplit can reliably find hostname."""
    if u is None or (isinstance(u, float) and math.isnan(u)):
        return ""
    u = str(u).strip().strip('"').strip("'")
    if not u:
        return ""
    if not _SCHEME_RE.match(u):
        u = "https://" + u
    return u

def extract_host_domain_tld(raw_url: str) -> tuple[str, str, str]:
    """
    Returns:
      url_host: full host (incl. subdomain)
      domain: registrable domain (e.g., cloudflare-dns.com)
      tld: public suffix (e.g., com, co.uk)
    """
    u = _normalize_url_for_parsing(raw_url)
    if not u:
        return ("", "", "")
    try:
        parts = urlsplit(u)
        host = (parts.hostname or "").lower().rstrip(".")
    except Exception:
        return ("", "", "")
    if not host:
        return ("", "", "")

    if _TLDEXTRACT is not None:
        ext = _TLDEXTRACT(host)
        tld = (ext.suffix or "").lower()
        if ext.domain and ext.suffix:
            domain = f"{ext.domain}.{ext.suffix}".lower()
        elif ext.domain:
            domain = ext.domain.lower()
        else:
            domain = host
        return (host, domain, tld)

    # Naive fallback (less accurate for multi-part suffixes)
    labels = host.split(".")
    if len(labels) >= 2:
        tld = labels[-1].lower()
        domain = ".".join(labels[-2:]).lower()
    else:
        tld = ""
        domain = host
    return (host, domain, tld)

def safe_str(x) -> str:
    return "" if x is None or (isinstance(x, float) and math.isnan(x)) else str(x)


In [None]:
LAB_PATH = "../data/lab_dprk.csv"
DIRTY_PATH = "../data/dirty_wordlist.txt"

# Read CSV safely (avoid mixed-type warnings)
df = pd.read_csv(LAB_PATH, low_memory=False)
df.columns = df.columns.str.strip()

# Parse URL -> host/domain/tld
parsed = df["URL"].apply(extract_host_domain_tld)
df[["url_host", "domain", "tld"]] = pd.DataFrame(parsed.tolist(), index=df.index)

display(df[["URL","url_host","domain","tld"]].head(5))
print("Rows:", len(df))


In [None]:
# -----------------------------
# Search engine identification
# -----------------------------
# We use registrable domain to match families (e.g., google.com, google.co.uk all map to google.<tld> as domain).
# If your dataset includes many country Google domains, you can expand patterns.

SEARCH_ENGINES = {
    # key: engine_name, value: dict(domain_match=set/patterns, query_params=preferred order)
    "Google": {
        "domain_allow": {"google.com"},
        "host_regex": re.compile(r"(^|\.)google\.[a-z.]+$", re.I),  # handles google.co.uk if you have PSL extraction
        "query_params": ["q"],
        "path_regex": re.compile(r"^/search", re.I),
    },
    "Bing": {
        "domain_allow": {"bing.com"},
        "host_regex": re.compile(r"(^|\.)bing\.com$", re.I),
        "query_params": ["q"],
        "path_regex": re.compile(r"^/search", re.I),
    },
    "Yahoo": {
        "domain_allow": {"search.yahoo.com", "yahoo.com"},
        "host_regex": re.compile(r"(^|\.)search\.yahoo\.com$|(^|\.)yahoo\.com$", re.I),
        "query_params": ["p"],
        "path_regex": re.compile(r"^/search", re.I),
    },
    "DuckDuckGo": {
        "domain_allow": {"duckduckgo.com"},
        "host_regex": re.compile(r"(^|\.)duckduckgo\.com$", re.I),
        "query_params": ["q"],
        "path_regex": re.compile(r"^/\??$", re.I),  # DDG often uses /
    },
    "Yandex": {
        "domain_allow": {"yandex.com", "yandex.ru"},
        "host_regex": re.compile(r"(^|\.)yandex\.[a-z.]+$", re.I),
        "query_params": ["text", "query"],
        "path_regex": re.compile(r"^/search", re.I),
    },
    "Brave": {
        "domain_allow": {"search.brave.com", "brave.com"},
        "host_regex": re.compile(r"(^|\.)search\.brave\.com$", re.I),
        "query_params": ["q"],
        "path_regex": re.compile(r"^/search", re.I),
    },
    "Startpage": {
        "domain_allow": {"startpage.com"},
        "host_regex": re.compile(r"(^|\.)startpage\.com$", re.I),
        "query_params": ["query", "q"],
        "path_regex": re.compile(r"^/do/search|^/sp/search", re.I),
    },
    "Ecosia": {
        "domain_allow": {"ecosia.org"},
        "host_regex": re.compile(r"(^|\.)ecosia\.org$", re.I),
        "query_params": ["q"],
        "path_regex": re.compile(r"^/search", re.I),
    },
}

def identify_search_engine(url_host: str, domain: str, path: str) -> str:
    host = (url_host or "").lower()
    dom = (domain or "").lower()
    pth = safe_str(path)
    for engine, spec in SEARCH_ENGINES.items():
        # Match by host regex first (most robust across country Google/Yandex, etc.)
        if spec.get("host_regex") and spec["host_regex"].search(host):
            # Optionally require a search-ish path
            pr = spec.get("path_regex")
            if pr is None or pr.search(pth):
                return engine
        # Fallback to known domains
        if dom in spec.get("domain_allow", set()):
            pr = spec.get("path_regex")
            if pr is None or pr.search(pth):
                return engine
    return ""

def extract_search_term(raw_url: str, engine: str) -> str:
    """Extract the search term from the URL based on engine-specific query params."""
    if not engine:
        return ""
    u = _normalize_url_for_parsing(raw_url)
    if not u:
        return ""
    parts = urlsplit(u)
    qs = parse_qs(parts.query, keep_blank_values=True)

    params = SEARCH_ENGINES.get(engine, {}).get("query_params", ["q"])
    for p in params:
        if p in qs and len(qs[p]) > 0:
            # parse_qs already decodes percent-encoding and '+' as space for querystring,
            # but we defensively run unquote_plus in case the field is pre-escaped.
            term = safe_str(qs[p][0])
            term = unquote_plus(term)
            return term.strip()
    return ""

# Build a search-only dataframe
u = df["URL"].apply(_normalize_url_for_parsing).apply(urlsplit)
df["_path"] = u.apply(lambda x: x.path)
df["search_engine"] = df.apply(lambda r: identify_search_engine(r["url_host"], r["domain"], r["_path"]), axis=1)
df["search_term"] = df.apply(lambda r: extract_search_term(r["URL"], r["search_engine"]), axis=1)

search_df = df[(df["search_engine"] != "") & (df["search_term"].fillna("").str.len() > 0)].copy()

display(search_df[["Logged Time","User","URL","search_engine","search_term","domain"]].head(10))
print("Search events:", len(search_df))
print("Engines found:", search_df["search_engine"].value_counts().to_dict())


In [None]:
# -----------------------------
# Dirty wordlist matching
# -----------------------------
def load_dirty_wordlist(path: str) -> list[str]:
    words = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            s = line.strip()
            if not s:
                continue
            if s.startswith("#"):
                continue
            words.append(s.lower())
    # De-dupe, preserve order
    seen = set()
    out = []
    for w in words:
        if w not in seen:
            out.append(w)
            seen.add(w)
    return out

dirty_words = load_dirty_wordlist(DIRTY_PATH)
print("Dirty terms loaded:", len(dirty_words))
print(dirty_words[:10])

def dirty_matches(term: str, dirty_list: list[str]) -> list[str]:
    t = safe_str(term).lower()
    hits = []
    for w in dirty_list:
        # phrase match, case-insensitive
        if w in t:
            hits.append(w)
    return hits

search_df["dirty_hits"] = search_df["search_term"].apply(lambda t: dirty_matches(t, dirty_words))
search_df["is_dirty"] = search_df["dirty_hits"].apply(lambda x: len(x) > 0)

display(search_df[["search_engine","search_term","is_dirty","dirty_hits"]].head(25))
print("Dirty search events:", int(search_df["is_dirty"].sum()))


In [None]:
# -----------------------------
# Interactive UI
# -----------------------------
engine_dd = widgets.Dropdown(
    options=[("All", "")] + [(e, e) for e in sorted(search_df["search_engine"].unique())],
    value="",
    description="Engine:"
)

user_txt = widgets.Text(
    value="",
    placeholder="Filter by user contains (e.g., karen@)",
    description="User:"
)

term_txt = widgets.Text(
    value="",
    placeholder="Search terms contains (e.g., bypass, zia, test123)",
    description="Term:"
)

only_dirty_cb = widgets.Checkbox(value=False, description="Only dirty matches", indent=False)

topn = widgets.IntSlider(value=50, min=10, max=500, step=10, description="Rows:", continuous_update=False)

out = widgets.Output()

controls = widgets.VBox([
    widgets.HBox([engine_dd, only_dirty_cb, topn]),
    widgets.HBox([user_txt, term_txt]),
])

display(controls, out)

def _filter():
    d = search_df
    if engine_dd.value:
        d = d[d["search_engine"] == engine_dd.value]
    if only_dirty_cb.value:
        d = d[d["is_dirty"]]
    if user_txt.value.strip():
        q = user_txt.value.strip().lower()
        d = d[d["User"].fillna("").str.lower().str.contains(q, na=False)]
    if term_txt.value.strip():
        q = term_txt.value.strip().lower()
        d = d[d["search_term"].fillna("").str.lower().str.contains(q, na=False)]
    return d

def _style(d: pd.DataFrame):
    cols = ["Logged Time","User","search_engine","search_term","dirty_hits","URL","Policy Action","Request Method","Response Code","Total Bytes"]
    cols = [c for c in cols if c in d.columns]
    view = d[cols].copy()

    def row_style(row):
        styles = [""] * len(view.columns)
        if bool(row.get("is_dirty", False)):
            styles = ["background-color: #ffe6e6;"] * len(view.columns)  # light red
            # add a darker left border on the term cell
            if "search_term" in view.columns:
                idx = list(view.columns).index("search_term")
                styles[idx] += "border-left: 6px solid #dc3545;"
        return styles

    return view.style.apply(row_style, axis=1).set_properties(**{"font-size": "12px"})

def render():
    with out:
        out.clear_output()
        d = _filter()

        # Summary cards
        total = len(d)
        dirty = int(d["is_dirty"].sum())
        uniq_terms = d["search_term"].nunique(dropna=True)
        display(HTML(f'''
        <div style="display:flex; gap:16px; flex-wrap:wrap; margin:6px 0 10px 0;">
          <div style="padding:10px 12px; border:1px solid #ddd; border-radius:12px;">
            <div style="font-size:12px; color:#666;">Filtered searches</div>
            <div style="font-size:22px; font-weight:700;">{total:,}</div>
          </div>
          <div style="padding:10px 12px; border:1px solid #ddd; border-radius:12px;">
            <div style="font-size:12px; color:#666;">Unique terms</div>
            <div style="font-size:22px; font-weight:700;">{uniq_terms:,}</div>
          </div>
          <div style="padding:10px 12px; border:1px solid #ddd; border-radius:12px;">
            <div style="font-size:12px; color:#666;">Dirty hits</div>
            <div style="font-size:22px; font-weight:700;">{dirty:,}</div>
          </div>
        </div>
        '''))

        # Charts
        if total > 0:
            vc = d["search_engine"].value_counts().reset_index()
            vc.columns = ["search_engine","count"]
            fig = px.bar(vc, x="count", y="search_engine", orientation="h", title="Search engine volume (filtered)")
            fig.update_layout(height=280, margin=dict(l=10,r=10,t=45,b=10))
            display(fig)

            # Top terms (stack count)
            tc = d["search_term"].value_counts().head(20).reset_index()
            tc.columns = ["search_term","count"]
            fig2 = px.bar(tc, x="count", y="search_term", orientation="h", title="Top search terms (filtered, top 20)")
            fig2.update_layout(height=420, margin=dict(l=10,r=10,t=45,b=10))
            display(fig2)

        # Table
        show = d.sort_values(["is_dirty","Logged Time"], ascending=[False, True]).head(int(topn.value))
        display(_style(show))

for w in [engine_dd, only_dirty_cb, topn, user_txt, term_txt]:
    w.observe(lambda change: render(), names="value")

render()


In [None]:
# Optional: export the extracted searches for sharing / offline review
OUT_CSV = "../data/extracted_search_terms.csv"
cols = ["Logged Time","Event Time","User","search_engine","search_term","is_dirty","dirty_hits","URL","Policy Action","Request Method","Response Code","Total Bytes","url_host","domain","tld"]
cols = [c for c in cols if c in search_df.columns]
search_df[cols].to_csv(OUT_CSV, index=False)
print("Wrote:", OUT_CSV)
