In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from visca.browser import (
    create_driver,
    ensure_page_loaded,
    capture_full_page_screenshot
)
from visca.element_extractor import (
    extract_elements_from_driver,
    save_elements
)
from visca.dedup import deduplicate_screenshots
from visca.virtual_node import (
    VirtualNode,
    build_dom_tree
)
from visca.segment import (
    tag_multiset,
    jaccard_distance,
    subtree_size,
    calculate_psi_avg,
    calculate_psi_sum,
    gather_instances,
    ascii_tree,
)

# Initialization

In [3]:
import time
import os
import io
import time
import json
from typing import Tuple

from selenium.webdriver import Chrome
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement

# DOM Element Extraction

In [4]:
def extraction(url: str, out_dir: str):
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--window-size=1920,1080")
    chrome_options.add_argument("--hide-scrollbars")  # Hide scrollbars to avoid affecting layout
    chrome_options.add_argument("--force-device-scale-factor=1")  # Force known scale factor
    chrome_options.add_argument("--disable-gpu")
    chrome_options.page_load_strategy = "eager" # <- Eager loading 

    chrome_path = ChromeDriverManager().install()
    if "THIRD_PARTY_NOTICES.chromedriver" in chrome_path:
        chrome_path = chrome_path.replace("THIRD_PARTY_NOTICES.chromedriver", "chromedriver")
    os.chmod(chrome_path, 755)

    driver = Chrome(
        service=Service(ChromeDriverManager().install()),
        options=chrome_options
    )

    driver.get(url)

    time.sleep(5)
    driver.execute_script("window.stop();")
        
    dom_elements = extract_elements_from_driver(driver)

    dom_elements_with_screenshot = save_elements(
    driver=driver,
    result_dir=out_dir,
    dom_elements=dom_elements
    )

    dom_elements_with_screenshot = list(filter(lambda x: 'screenshot' in x, dom_elements_with_screenshot))

    deduplicated_elements = deduplicate_screenshots(dom_elements_with_screenshot)

    reduced_tree = build_dom_tree(deduplicated_elements)

    return reduced_tree

# Segmentation

### Sum

In [5]:
def segmentation(reduced_tree: VirtualNode, out_dir: str):
    calculate_psi_sum(reduced_tree)                           # new size logic in use
    instances = gather_instances(reduced_tree)

    print("Number of Segments: ", len(instances.keys()))

    for xp, size in instances.items():
        print(f"{xp:<60}  subtree-nodes = {size}")

    with open(f'{out_dir}/segments.json','r',encoding='utf-8') as f:
        leaves = json.load(f)

    # group leaf XPaths under each root‑XPath
    instance_details = {}
    for root_xpath, count in instances.items():
        
        grouped = [
        l for l in leaves
        if l['xpath'] != root_xpath                 # not the root itself
        and l['xpath'].startswith(root_xpath + '/') # true descendants only
        ]

        instance_details[root_xpath] = {
            'count':  count,
            'leaves': [l['xpath'] for l in grouped]
        }

    with open(f"{out_dir}/segmentation_xpath_aa.json", 'w', encoding='utf-8') as out:
        json.dump(instance_details, out, indent=2, ensure_ascii=False)


# Visualize

In [6]:
import json, textwrap, time, io
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from PIL import Image   # only if you want to display inline in a notebook

In [7]:
def export_parent_bboxes(
    url: str,
    seg_json_path: str,
    out_json_path: str = "output.json",
    delay: float = 5.0,         # allow SPA hydration etc.
):
    """
    Build an “auto-assert” segmentation file that contains one polygon
    (= the left/top/right/bottom edges) for every **parent** segment
    found in *seg_json_path*.

    Output schema
    -------------
    {
    "id": "<file name you chose>",
    "height": <full-page CSS px>,
    "width":  <full-page CSS px>,
    "number_of_segments": <int>,
    "segmentations": {
        "auto-assert": [
        [ [ [ [x, y], [x, y], [x, y], [x, y] ] ] ],   # segment-1 polygon
        ...
        ]
    }
    }
    """
    # ───────────────────────────────────
    # 1)  Launch Chrome & open page
    # ───────────────────────────────────
    chrome_options = Options()
    chrome_options.add_argument("--headless=new")
    chrome_options.add_argument("--hide-scrollbars")
    chrome_options.add_argument("--force-device-scale-factor=1")
    chrome_options.page_load_strategy = "eager"

    chromedriver_path = ChromeDriverManager().install()
    if chromedriver_path.endswith("THIRD_PARTY_NOTICES.chromedriver"):
        chromedriver_path = chromedriver_path.replace(
            "THIRD_PARTY_NOTICES.chromedriver", "chromedriver"
        )
    os.chmod(chromedriver_path, 0o755)

    driver = Chrome(service=Service(chromedriver_path), options=chrome_options)
    driver.get(url)

    # let scripts, fonts, etc. settle
    time.sleep(delay)
    driver.execute_script("window.stop();")

    # page dimensions in CSS px (same technique you used)
    width = driver.execute_script(
        "return Math.max(document.documentElement.scrollWidth, document.body.scrollWidth);"
    )
    height = driver.execute_script(
        "return Math.max(document.documentElement.scrollHeight, document.body.scrollHeight);"
    )
    driver.set_window_size(width, height)

    # ───────────────────────────────────
    # 2)  Load segmentation file
    # ───────────────────────────────────
    with open(seg_json_path, encoding="utf-8") as f:
        seg_raw = json.load(f)           # parent_xpath → {count, leaves, …}

    parent_xpaths = list(seg_raw.keys())

    # ───────────────────────────────────
    # 3)  Grab each parent’s bounding box
    # ───────────────────────────────────
    def get_box(xp):
        # JavaScript helper returns dict {left, top, right, bottom}
        return driver.execute_script(
            textwrap.dedent(
                """
                const xp = arguments[0];
                const el = document.evaluate(
                    xp, document, null,
                    XPathResult.FIRST_ORDERED_NODE_TYPE, null
                ).singleNodeValue;
                if (!el) return null;
                const r = el.getBoundingClientRect();
                return {
                left:   Math.round(r.left  + window.scrollX),
                top:    Math.round(r.top   + window.scrollY),
                right:  Math.round(r.right + window.scrollX),
                bottom: Math.round(r.bottom+ window.scrollY)
                };
                """
            ),
            xp,
        )

    polygons = []
    for xp in parent_xpaths:
        box = get_box(xp)
        if box is None:
            # element disappeared; skip gracefully
            continue

        # Clock-wise polygon: TL → BL → BR → TR
        poly = [
            [box["left"],  box["top"]],
            [box["left"],  box["bottom"]],
            [box["right"], box["bottom"]],
            [box["right"], box["top"]],
        ]
        polygons.append([[ [ poly ] ]])   # → [[[ [x,y] … ]]] nesting like your example

    driver.quit()

    # ───────────────────────────────────
    # 4)  Assemble & save result JSON
    # ───────────────────────────────────
    out = {
        "id": Path(out_json_path).name,
        "height": height,
        "width":  width,
        "number_of_segments": len(polygons),
        "segmentations": {
            "auto-assert": polygons
        }
    }

    Path(out_json_path).write_text(json.dumps(out, indent=2))
    print(f"Wrote {len(polygons)} segments → {out_json_path}")

    return out_json_path

In [8]:
def screenshot_segments(
    url: str,
    seg_json_path: str,
    out_dir: str = "results",
    fname: str = "segmented.png",
    delay: float = 5,              # wait for SPA hydration etc.
):
    """Load segmentation JSON, overlay parent/leaf boxes in the browser,
       and write a PNG screenshot to *out_dir/fname*.
    """
    # ───────────────────────────────────
    # 1)  Launch Chrome
    # ───────────────────────────────────
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    # chrome_options.add_argument("--window-size=1920,1080")
    chrome_options.add_argument("--hide-scrollbars")  # Hide scrollbars to avoid affecting layout
    chrome_options.add_argument("--force-device-scale-factor=1")  # Force known scale factor
    chrome_options.add_argument("--disable-gpu")
    chrome_options.page_load_strategy = "eager" # <- Eager loading 

    chrome_path = ChromeDriverManager().install()
    if "THIRD_PARTY_NOTICES.chromedriver" in chrome_path:
        chrome_path = chrome_path.replace("THIRD_PARTY_NOTICES.chromedriver", "chromedriver")
    os.chmod(chrome_path, 755)

    driver = Chrome(
        service=Service(ChromeDriverManager().install()),
        options=chrome_options
    )

    driver.get(url)


    w = driver.execute_script("return document.body.scrollWidth")
    h = driver.execute_script("return document.body.scrollHeight")
    driver.set_window_size(w, h)

    time.sleep(delay)
    driver.execute_script("window.stop();")

    # ───────────────────────────────────
    # 2)  Load your segmentation
    # ───────────────────────────────────
    with open(seg_json_path, encoding="utf-8") as f:
        seg = json.load(f)           # dict[parent_xpath] → {"count":…, "leaves":[…]}

    # Full scrollable page size (CSS px) so the screenshot isn’t clipped
    page_w = driver.execute_script(
        "return Math.max(document.documentElement.scrollWidth, document.body.scrollWidth);")
    page_h = driver.execute_script(
        "return Math.max(document.documentElement.scrollHeight, document.body.scrollHeight);")
    driver.set_window_size(page_w, page_h)

    # ───────────────────────────────────
    # 3)  Build & inject the overlay script
    # ───────────────────────────────────
    overlay_js = textwrap.dedent(f"""
  (function drawSegmentOverlays(segments) {{
    const node = (xp) => document.evaluate(
      xp, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null
    ).singleNodeValue;

    /* ---------- overlay cosmetics ---------- */
    const style = document.createElement('style');
    style.textContent = `
      .overlay-parent,
      .overlay-leaf {{
        position:absolute;
        z-index:2147483647;
        pointer-events:none;
      }}
      /* parent → translucent RED with bold outline */
      .overlay-parent {{
        background:rgba(255,0,0,.25);
        outline:2px solid red;
      }}
      /* leaf → translucent GREEN, **no outline**  */
      .overlay-leaf {{
        background:rgba(0,255,0,.25);
      }}`;
    document.head.appendChild(style);

    /**
     * Draw a rectangle; if inset>0, shrink it on every side
     * so the parent’s red outline isn’t hidden.
     */
    const draw = (rect, cls, inset = 0) => {{
      const d = document.createElement('div');
      d.className = cls;
      d.style.left   = (rect.left  + window.scrollX + inset) + 'px';
      d.style.top    = (rect.top   + window.scrollY + inset) + 'px';
      d.style.width  = Math.max(0, rect.width  - inset*2) + 'px';
      d.style.height = Math.max(0, rect.height - inset*2) + 'px';
      document.body.appendChild(d);
    }};

    Object.entries(segments).forEach(([parentXP, info]) => {{
      const p = node(parentXP);
      if (p) draw(p.getBoundingClientRect(), 'overlay-parent', 0);   // no inset
      (info.leaves || []).forEach(leafXP => {{
        const l = node(leafXP);
        if (l) draw(l.getBoundingClientRect(), 'overlay-leaf', 1);  // 1-px inset
      }});
    }});
  }})(JSON.parse({json.dumps(json.dumps(seg))}));
""")

    driver.execute_script(overlay_js)

    # ───────────────────────────────────
    # 4)  Screenshot
    # ───────────────────────────────────
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    png = driver.get_screenshot_as_png()
    with open(Path(out_dir) / fname, "wb") as fh:
        fh.write(png)

    # Optional inline preview (e.g. in a Jupyter notebook)
    # display(Image.open(io.BytesIO(png)))

    driver.quit()
    print(f"Screenshot written → {Path(out_dir)/fname}")


# Cache

In [9]:
# ─── status_cache.py ───────────────────────────────────────────────────────────
from __future__ import annotations

import json, tempfile, os, logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Any

CACHE_PATH = Path("run_status.json")

def _now() -> str:
    """Return current time in ISO-8601 with timezone, e.g. 2025-05-26T01:47:03-07:00"""
    return datetime.now().astimezone().isoformat(timespec="seconds")

def load_cache() -> Dict[str, Any]:
    if CACHE_PATH.is_file():
        try:
            return json.loads(CACHE_PATH.read_text())
        except Exception as exc:               # corrupted cache? start fresh but keep a backup
            logging.warning("Status cache unreadable – starting new file (%s)", exc)
            CACHE_PATH.rename(CACHE_PATH.with_suffix(".bak"))
    return {}

def save_cache(cache: Dict[str, Any]) -> None:
    # atomic write: write to tmp file and rename over the old one
    tmp = CACHE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(cache, indent=2))
    tmp.replace(CACHE_PATH)

def mark_result(cache: Dict[str, Any],
                site: str,
                doc: int | str,
                ok: bool,
                message: str | None = None) -> None:
    entry = cache.setdefault(site, {})           # one sub-object per site
    entry[str(doc)] = {
        "status"    : "success" if ok else "failure",
        "timestamp" : _now(),
        **({"error": message} if not ok else {})
    }
    save_cache(cache)                            # flush immediately
# ───────────────────────────────────────────────────────────────────────────────


# Individual Segmentation

In [10]:
RESULTS_ROOT = Path("evaluation/segmentation/results")      

webpage = "eu.real.com"
url = f"file:///Users/martintang/Desktop/Github/auto-assert/evaluation/segmentation/datasets/dataset-popular/{webpage}/{webpage}/index.dom.html"
dir = f"{RESULTS_ROOT}/{webpage}/auto-assert"
print(url, dir)

node = extraction(url, dir)
segmentation(node, dir)
screenshot_segments(
    url=url,
    seg_json_path=f"{dir}/segmentation_xpath_aa.json",
    out_dir=dir,
    fname="auto_assert_boxed.png",
)

export_parent_bboxes(
    url=url,
    seg_json_path=f"{dir}/segmentation_xpath_aa.json",
    out_json_path=f"{dir}/segmentation_bbox_aa.json",
    delay=5.0,   
)

file:///Users/martintang/Desktop/Github/auto-assert/evaluation/segmentation/datasets/dataset-popular/eu.real.com/eu.real.com/index.dom.html evaluation/segmentation/results/eu.real.com/auto-assert
Processing file:///Users/martintang/Desktop/Github/auto-assert/evaluation/segmentation/datasets/dataset-popular/eu.real.com/eu.real.com/index.dom.html...
Element screenshots saved to evaluation/segmentation/results/eu.real.com/auto-assert
Computing image hashes for 140 segments...
Found 6 exact hash duplicates
Found 44 padding duplicates
Removing 50 duplicate screenshots...
Deduplication complete. Kept 90 of 140 segments.
Number of Segments:  21
//html[1]/body[1]/div[1]/div[1]                               subtree-nodes = 1
//html[1]/body[1]/div[1]/div[2]                               subtree-nodes = 8
//html[1]/body[1]/div[1]/div[3]                               subtree-nodes = 1
//html[1]/body[1]/div[2]/div[1]/div[1]/div[1]/ul[1]/li[1]/a[1]  subtree-nodes = 1
//html[1]/body[1]/div[2]/div[1]/

'evaluation/segmentation/results/eu.real.com/auto-assert/segmentation_bbox_aa.json'

# Batch Segmentation

In [11]:
# from pathlib import Path
# import re
# from urllib.parse import urlparse

# DATASET_ROOT   = Path("evaluation/segmentation/datasets/dataset-popular")
# RESULTS_ROOT = Path("evaluation/segmentation/results")   
# DATASET_MAPPING = DATASET_ROOT / "mapping.txt"

# pattern = re.compile(r'^\s*"(?P<key>.*?)"\s*:\s*"(?P<value>.*?)"\s*,?$')
# prefix_to_remove = '/opt/dataset-popular/'          # path prefix inside mapping.txt

# def normalize_key(raw_key: str) -> str:
#     p = urlparse(raw_key)
#     return (p.netloc + p.path).rstrip('/')

# dataset = {}
# with open(DATASET_MAPPING, encoding="utf-8") as f:
#     for line in f:
#         line = line.strip()
#         if not line:
#             continue

#         # strip comment markers (# or :) that head some lines
#         if line.startswith(('#', ':')):
#             line = line[1:].strip()
#             if not line:
#                 continue

#         m = pattern.match(line)
#         if not m:
#             continue

#         raw_key   = m.group('key')
#         raw_value = m.group('value')

#         # trim prefix added inside the container
#         if raw_value.startswith(prefix_to_remove):
#             raw_value = raw_value[len(prefix_to_remove):]

#         # ───────────────────────────────────────────────────────
#         # Prefer the DOM-annotated snapshot if it exists
#         # ───────────────────────────────────────────────────────
#         rel_path  = Path(raw_value)                      # e.g.  www.site.com/index.html
#         if rel_path.name == "index.html":
#             dom_candidate = rel_path.with_name("index.dom.html")
#             if (DATASET_ROOT / dom_candidate).is_file():   # ← check once on disk
#                 raw_value = str(dom_candidate)

#         dataset[normalize_key(raw_key)] = raw_value

# print("Dataset dictionary created.")

# for site_name, html_rel_path in dataset.items():
#     (RESULTS_ROOT / site_name).mkdir(parents=True, exist_ok=True)
#     print(f"✓ {site_name}: {html_rel_path}")

# print(f"\nAll {len(dataset)} result folders are ready.")


In [12]:
# success = 0
# import shutil
# from pathlib import Path

# for webpage, html_path in dataset.items():
#     url = f"file:///Users/martintang/Desktop/Github/auto-assert/evaluation/segmentation/datasets/dataset-popular/{html_path}"
#     out_dir = RESULTS_ROOT / webpage / "auto-assert"
#     print(f"\nProcessing {webpage!r} | {html_path} → {out_dir}")

#     # if it already exists, delete it (and everything inside)
#     if out_dir.exists():
#         shutil.rmtree(out_dir)

#     # now recreate it from scratch
#     out_dir.mkdir(parents=True, exist_ok=True)

#     # 1) extraction
#     try:
#         node = extraction(url, str(out_dir))
#     except Exception as e:
#         print(f"[ERROR] extraction failed for {webpage!r}: {e}")
#         # skip to next page entirely
#         continue

#     # 2) segmentation
#     try:
#         segmentation(node, str(out_dir))
#     except Exception as e:
#         print(f"[WARN] segmentation failed for {webpage!r}: {e}")

#     # 3) screenshot
#     try:
#         screenshot_segments(
#             url=url,
#             seg_json_path=f"{out_dir}/segmentation_xpath_aa.json",
#             out_dir=str(out_dir),
#             fname="segmentation_aa.png",
#         )
#     except Exception as e:
#         print(f"[WARN] screenshot_segments failed for {webpage!r}: {e}")

#     # 4) export bboxes
#     try:
#         export_parent_bboxes(
#             url=url,
#             seg_json_path=f"{out_dir}/segmentation_xpath_aa.json",
#             out_json_path=f"{out_dir}/segmentation_bbox_aa.json",
#             delay=5.0,   
#         )
#     except Exception as e:
#         print(f"[WARN] export_parent_bboxes failed for {webpage!r}: {e}")

#     success += 1

# print(f"\nAuto-Assert Segmentation ran on {success} webpages.")