# Deconstruction Policy Corpus -- Semantic Filtering Pipeline

**Purpose.** This notebook implements the data pre-processing and semantic
filtering stages (Section 3.1) and the optional LLM-judge precision filter (Section 3.2)
described in the paper's Methods section.  It processes a heterogeneous
corpus of Australian policy PDFs (*n* = 95) through the following stages:

1. **Page-level text extraction** with multi-engine fallback and OCR
   (`pdfplumber` -> `PyMuPDF` -> `Tesseract OCR`).
2. **Explicit table extraction** to recover tabular content that
   standard text extraction may miss or linearise incorrectly.
3. **Text segmentation** into overlapping analysis units with page
   provenance (LangChain `RecursiveCharacterTextSplitter`).
4. **Keyword gating** for high-recall pre-screening.
5. **Vector embedding and FAISS similarity search** with calibrated
   relevance scoring.
6. **Optional LLM judge** for precision filtering.

### Outputs (written under `OUT_DIR/outputs/`)

| Sub-folder | File | Description |
|---|---|---|
| `full_text/` | `<pdf>.pages.jsonl` | Page-by-page full text with extraction metadata and quality flags |
| `kept_chunks/` | `<pdf>.kept.jsonl` | Semantic-filtered chunks per PDF |
| `kept_chunks/` | `MASTER_kept.jsonl` | Aggregated kept chunks across the corpus |
| `kept_chunks/` | `MASTER_kept_judged.jsonl` | After optional LLM judge |
| `faiss/` | `<pdf>/` | Per-PDF FAISS index (for re-querying without re-embedding) |

> **Note on OCR:** Pages with very low extracted text (likely scanned/image-only)
> are automatically processed with Tesseract OCR as a fallback.  Pages that
> remain low-text after all extraction attempts are flagged in the JSONL output.

## 0) Prerequisites

- Run in **Google Colab** (GPU not required; CPU is sufficient).
- Store the PDF corpus in a Google Drive folder.
- Add your **LLM API key** in Colab -> **Settings -> Secrets**.
  - For OpenAI models: add as `OPENAI_API_KEY`
  - For Google Gemini models: add as `GOOGLE_API_KEY`
  - For Anthropic Claude models: add as `ANTHROPIC_API_KEY`

In [1]:
# @title 1) Install & Imports (run once)
!pip install -q langchain langchain-openai langchain-google-genai langchain-anthropic \
    langchain-community langchain-text-splitters \
    pypdf tiktoken faiss-cpu pandas numpy tqdm \
    pdfplumber PyMuPDF pytesseract pdf2image

# Tesseract OCR engine + Poppler (PDF-to-image renderer)
!apt-get -qq install -y tesseract-ocr poppler-utils > /dev/null 2>&1

import os, re, json, time, math, hashlib, logging, warnings
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from google.colab import drive, userdata

import pdfplumber
import fitz  # PyMuPDF
import pytesseract
from pdf2image import convert_from_path

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS

# Suppress noisy pdfminer warnings (non-fatal ToUnicode CMap issues)
logging.getLogger("pdfminer").setLevel(logging.ERROR)
warnings.filterwarnings("ignore", message=".*ToUnicode.*")

print("All imports OK. Tesseract version:", pytesseract.get_tesseract_version())

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.9/67.9 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.8/85.8 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.5/66.5 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.8/46.8 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m43.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m330.6/330.6 kB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m54.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# @title 2) Mount Drive + Set Paths (EDIT THESE)
drive.mount('/content/drive')

PDF_DIR = "/content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/Governance"   # <-- EDIT
OUT_DIR = "/content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/data_analysis/batch_enhanced_KG_outputs"  # <-- EDIT

Path(OUT_DIR).mkdir(parents=True, exist_ok=True)
print("PDF_DIR:", PDF_DIR)
print("OUT_DIR:", OUT_DIR)

Mounted at /content/drive
PDF_DIR: /content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/Governance
OUT_DIR: /content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/data_analysis/batch_enhanced_KG_outputs


## 3) Configuration

### 3A) LLM Provider and Model Selection

The pipeline uses LLMs at two points: **embeddings** (for FAISS indexing)
and an optional **judge** (for precision filtering).  The cell below lets
you select your provider and model for each.  Currently supported:

| Provider | Embedding models | Chat/Judge models | API key secret name |
|---|---|---|---|
| **OpenAI** | `text-embedding-3-small`, `text-embedding-3-large` | `gpt-4o-mini`, `gpt-4o`, `gpt-4.1-mini-2025-04-14`, `gpt-4.1-2025-04-14` | `OPENAI_API_KEY` |
| **Google Gemini** | `models/text-embedding-004` | `gemini-2.0-flash`, `gemini-2.5-flash-preview-05-20`, `gemini-2.5-pro-preview-05-06` | `GOOGLE_API_KEY` |
| **Anthropic Claude** | *(use OpenAI or Google for embeddings)* | `claude-sonnet-4-20250514`, `claude-haiku-4-5-20241022` | `ANTHROPIC_API_KEY` |

> **Mixed providers:** You can use one provider for embeddings and a different
> one for the judge (e.g., OpenAI embeddings + Gemini judge).

### 3B) Chunking, Thresholds, and Retrieval

| Parameter | Value | Rationale |
|---|---|---|
| Chunk size | 1 400 chars | Preserves legal definitions and qualification clauses |
| Chunk overlap | 200 chars | Maintains cross-reference context at boundaries |
| FAISS normalisation | `normalize_L2=True` | Enables stable L2 -> cosine conversion |
| Relevance score | `1 - d^2/2`, clipped to [0, 1] | Bounded cosine-derived metric |
| Relevance threshold | 0.55 | Calibrated via pilot runs for recall/precision balance |
| Top-k per query | 25 | Balances retrieval depth with precision |

In [3]:
# @title 3A) LLM Provider & Model Selection
# ============================================================
# EDIT THESE to switch between providers and models.
# ============================================================

# -- Embedding provider -------------------------------------------
# Options: "openai" or "google"
EMBED_PROVIDER = "openai"

# OpenAI embedding models: "text-embedding-3-small", "text-embedding-3-large"
# Google embedding models: "models/text-embedding-004"
EMBED_MODEL = "text-embedding-3-large"

# -- Judge / Chat LLM provider -----------------------------------
# Options: "openai", "google", or "anthropic"
JUDGE_PROVIDER = "openai"

# OpenAI:    "gpt-4o-mini", "gpt-4o", "gpt-4.1-mini-2025-04-14", "gpt-4.1-2025-04-14"
# Google:    "gemini-2.0-flash", "gemini-2.5-flash-preview-05-20", "gemini-2.5-pro-preview-05-06"
# Anthropic: "claude-sonnet-4-20250514", "claude-haiku-4-5-20241022"
JUDGE_MODEL = "gpt-4.1-2025-04-14"

# -- Pipeline toggles ---------------------------------------------
USE_LLM_JUDGE = True

# ============================================================
# API key loading (auto-selects based on provider choices above)
# ============================================================
_required_keys = set()
if EMBED_PROVIDER == "openai" or JUDGE_PROVIDER == "openai":
    _required_keys.add("OPENAI_API_KEY")
if EMBED_PROVIDER == "google" or JUDGE_PROVIDER == "google":
    _required_keys.add("GOOGLE_API_KEY")
if JUDGE_PROVIDER == "anthropic":
    _required_keys.add("ANTHROPIC_API_KEY")

_api_keys = {}
for key_name in _required_keys:
    val = userdata.get(key_name)
    if not val:
        raise ValueError(f"Missing {key_name}. Add it in Colab: Settings -> Secrets -> {key_name}")
    _api_keys[key_name] = val
    os.environ[key_name] = val

# -- Instantiate embedding model ----------------------------------
if EMBED_PROVIDER == "openai":
    from langchain_openai import OpenAIEmbeddings
    embeddings = OpenAIEmbeddings(
        model=EMBED_MODEL,
        api_key=_api_keys["OPENAI_API_KEY"]
    )
elif EMBED_PROVIDER == "google":
    from langchain_google_genai import GoogleGenerativeAIEmbeddings
    embeddings = GoogleGenerativeAIEmbeddings(
        model=EMBED_MODEL,
        google_api_key=_api_keys["GOOGLE_API_KEY"]
    )
else:
    raise ValueError(f"Unsupported EMBED_PROVIDER: {EMBED_PROVIDER}")

# -- Instantiate judge LLM (if enabled) ---------------------------
judge_llm = None
if USE_LLM_JUDGE:
    if JUDGE_PROVIDER == "openai":
        from langchain_openai import ChatOpenAI
        judge_llm = ChatOpenAI(
            model=JUDGE_MODEL,
            api_key=_api_keys["OPENAI_API_KEY"],
            temperature=0
        )
    elif JUDGE_PROVIDER == "google":
        from langchain_google_genai import ChatGoogleGenerativeAI
        judge_llm = ChatGoogleGenerativeAI(
            model=JUDGE_MODEL,
            google_api_key=_api_keys["GOOGLE_API_KEY"],
            temperature=0
        )
    elif JUDGE_PROVIDER == "anthropic":
        from langchain_anthropic import ChatAnthropic
        judge_llm = ChatAnthropic(
            model=JUDGE_MODEL,
            api_key=_api_keys["ANTHROPIC_API_KEY"],
            temperature=0
        )
    else:
        raise ValueError(f"Unsupported JUDGE_PROVIDER: {JUDGE_PROVIDER}")

print(f"Embedding:  {EMBED_PROVIDER} / {EMBED_MODEL}")
print(f"Judge LLM:  {JUDGE_PROVIDER} / {JUDGE_MODEL} (enabled={USE_LLM_JUDGE})")

Embedding:  openai / text-embedding-3-large
Judge LLM:  openai / gpt-4.1-2025-04-14 (enabled=True)


In [4]:
# @title 3B) Chunking, thresholds, and retrieval settings
CHUNK_SIZE = 1400
CHUNK_OVERLAP = 200

USE_KEYWORD_GATE = True

TOP_K_PER_QUERY = 25
RELEVANCE_THRESHOLD = 0.55

JUDGE_KEEP_LABELS = {"keep", "maybe"}  # set to {"keep"} for stricter precision

# -- PDF extraction settings --------------------------------------
# Minimum character count to consider a page as having usable text.
# Pages below this trigger fallback extraction (PyMuPDF -> OCR).
MIN_PAGE_TEXT_LEN = 50

# Tesseract OCR language (install extra packs if needed)
OCR_LANG = "eng"

# -- Resumability -------------------------------------------------
SKIP_ALREADY_PROCESSED = True

# -- Retry settings for API calls ---------------------------------
MAX_RETRIES = 3
RETRY_BACKOFF = 5  # seconds, multiplied by attempt number

print("Chunk:", CHUNK_SIZE, "/", CHUNK_OVERLAP)
print("Keyword gate:", USE_KEYWORD_GATE)
print("Relevance threshold:", RELEVANCE_THRESHOLD)
print("Min page text length for OCR trigger:", MIN_PAGE_TEXT_LEN)
print("Skip already processed:", SKIP_ALREADY_PROCESSED)

Chunk: 1400 / 200
Keyword gate: True
Relevance threshold: 0.55
Min page text length for OCR trigger: 50
Skip already processed: True


In [5]:
# @title 3C) Queries and keyword patterns (EDIT as needed)
FILTER_QUERIES = [
    "building deconstruction and disassembly methods",
    "selective deconstruction versus demolition for material recovery",
    "design for deconstruction (DfD) requirements and guidance",
    "reuse of secondary construction materials from deconstruction",
    "salvage, recovery, and reuse pathways in building deconstruction",
    "circular economy policy instruments related to deconstruction and reuse",
]

# High-recall keyword patterns (regex)
KEYWORD_PATTERNS = [
    r"\bdeconstruct(ion|ing)?\b",
    r"\bselective\s+demolition\b",
    r"\bsoft\s+strip(ping)?\b",
    r"\bdisassembl(y|e|ing)\b",
    r"\bdesign\s+for\s+deconstruction\b",
    r"\b(material|component)\s+reuse\b",
    r"\bsalvag(e|ing)\b",
    r"\brecover(y|ed)\b",
    r"\bsecondary\s+materials?\b",
    r"\bcircular\s+econom(y|ic)\b",
]
KW_REGEX = re.compile("|".join(KEYWORD_PATTERNS), re.IGNORECASE)

print("Queries:", len(FILTER_QUERIES), "| Keyword patterns:", len(KEYWORD_PATTERNS))

Queries: 6 | Keyword patterns: 10


In [6]:
# @title 4) Helper functions

# ==================================================================
#  Utility helpers
# ==================================================================

def safe_slug(name: str) -> str:
    """Convert a filename to a filesystem-safe slug (max 180 chars)."""
    s = re.sub(r"[^\w\-\.]+", "_", name.strip())
    return s[:180]

def sha1(text: str) -> str:
    """Return the SHA-1 hex digest of a text string (for chunk dedup)."""
    return hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest()

def list_pdfs(pdf_dir: str) -> List[str]:
    """Recursively list all PDF files under a directory."""
    p = Path(pdf_dir)
    return sorted([str(x) for x in p.glob("**/*.pdf")])

def call_with_retry(fn, *args, max_retries=MAX_RETRIES, backoff=RETRY_BACKOFF, **kwargs):
    """Call fn() with exponential-backoff retry on transient API errors."""
    for attempt in range(1, max_retries + 1):
        try:
            return fn(*args, **kwargs)
        except Exception as e:
            err_str = str(e).lower()
            is_transient = any(kw in err_str for kw in [
                "rate limit", "rate_limit", "429", "timeout",
                "connection", "server error", "500", "502", "503",
                "overloaded", "resource_exhausted"
            ])
            if is_transient and attempt < max_retries:
                wait = backoff * attempt
                print(f"  Retry {attempt}/{max_retries} after {wait}s -- {e}")
                time.sleep(wait)
            else:
                raise


# ==================================================================
#  PDF extraction: multi-engine with OCR fallback
# ==================================================================

def _extract_text_pdfplumber(pdf_path: str, page_num: int) -> Tuple[str, str]:
    """Extract text from a single page using pdfplumber.

    Also attempts explicit table extraction: pdfplumber can miss the
    rightmost column of wide tables or linearise multi-column layouts
    incorrectly.  We extract tables separately and append their content
    to capture data that body-text extraction may drop.

    Returns (text, method_used).
    """
    try:
        with pdfplumber.open(pdf_path) as pdf:
            if page_num - 1 >= len(pdf.pages):
                return ("", "pdfplumber_out_of_range")
            page = pdf.pages[page_num - 1]

            # Body text
            body = page.extract_text() or ""

            # Explicit table extraction (compensates for dropped columns)
            table_text_parts = []
            try:
                tables = page.extract_tables()
                for table in (tables or []):
                    for row in table:
                        cleaned = [str(cell).strip() if cell else "" for cell in row]
                        table_text_parts.append(" | ".join(cleaned))
            except Exception:
                pass  # table extraction is best-effort

            # Merge: append table text only if it adds content not in body
            table_block = "\n".join(table_text_parts)
            if table_block and len(table_block.strip()) > 20:
                combined = body + "\n\n[TABLE CONTENT]\n" + table_block
            else:
                combined = body

            return (combined, "pdfplumber")
    except Exception as e:
        return ("", f"pdfplumber_error:{e}")


def _extract_text_pymupdf(pdf_path: str, page_num: int) -> Tuple[str, str]:
    """Fallback text extraction using PyMuPDF (fitz).

    PyMuPDF uses a different PDF parser (MuPDF) and often handles
    encoding edge-cases, complex layouts, and embedded fonts that
    pdfplumber/pdfminer struggle with.
    """
    try:
        doc = fitz.open(pdf_path)
        if page_num - 1 >= len(doc):
            doc.close()
            return ("", "pymupdf_out_of_range")
        page = doc[page_num - 1]
        text = page.get_text("text") or ""
        doc.close()
        return (text, "pymupdf")
    except Exception as e:
        return ("", f"pymupdf_error:{e}")


def _extract_text_ocr(pdf_path: str, page_num: int) -> Tuple[str, str]:
    """Last-resort OCR extraction using Tesseract via pdf2image.

    Converts the target page to an image, then runs Tesseract OCR.
    Suitable for scanned / image-only pages.
    """
    try:
        images = convert_from_path(
            pdf_path,
            first_page=page_num,
            last_page=page_num,
            dpi=300
        )
        if not images:
            return ("", "ocr_no_image")
        text = pytesseract.image_to_string(images[0], lang=OCR_LANG)
        return (text or "", "ocr_tesseract")
    except Exception as e:
        return ("", f"ocr_error:{e}")


def extract_pages(pdf_path: str) -> List[Dict[str, Any]]:
    """Extract text from every page of a PDF using a three-tier strategy:

    Tier 1: pdfplumber (with explicit table extraction)
    Tier 2: PyMuPDF (different parser; handles some encodings better)
    Tier 3: Tesseract OCR (for scanned / image-only pages)

    Each page record includes:
    - page_num, text, text_len
    - extraction_method: which engine produced the final text
    - fallback_attempted: whether fallback engines were tried
    - likely_image_or_scan: flagged if text remains very short after all attempts
    """
    # Determine page count via PyMuPDF (fast, reliable)
    try:
        doc = fitz.open(pdf_path)
        n_pages = len(doc)
        doc.close()
    except Exception as e:
        print(f"  Cannot open PDF {os.path.basename(pdf_path)}: {e}")
        return [{"page_num": 1, "text": "", "text_len": 0,
                 "extraction_method": "failed", "fallback_attempted": False,
                 "likely_image_or_scan": True}]

    pages = []
    for pg in range(1, n_pages + 1):
        text = ""
        method = ""
        fallback = False

        # Tier 1: pdfplumber (with table extraction)
        text, method = _extract_text_pdfplumber(pdf_path, pg)

        # Tier 2: PyMuPDF fallback if pdfplumber returned very little
        if len(text.strip()) < MIN_PAGE_TEXT_LEN:
            fallback = True
            text2, method2 = _extract_text_pymupdf(pdf_path, pg)
            if len(text2.strip()) > len(text.strip()):
                text, method = text2, method2

        # Tier 3: OCR fallback if still very little text
        if len(text.strip()) < MIN_PAGE_TEXT_LEN:
            fallback = True
            text3, method3 = _extract_text_ocr(pdf_path, pg)
            if len(text3.strip()) > len(text.strip()):
                text, method = text3, method3

        # Flag pages that remain low-text after all attempts
        is_low = len(text.strip()) < MIN_PAGE_TEXT_LEN

        pages.append({
            "page_num": pg,
            "text": text,
            "text_len": len(text),
            "extraction_method": method,
            "fallback_attempted": fallback,
            "likely_image_or_scan": is_low,
        })

    return pages


# ==================================================================
#  JSONL I/O
# ==================================================================

def save_pages_jsonl(pages: List[Dict[str, Any]], out_path: str, source_file: str) -> None:
    """Write page records to JSONL (one JSON object per page)."""
    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        for p in pages:
            rec = {
                "source_file": source_file,
                "page_num": p["page_num"],
                "text": p["text"],
                "text_len": p["text_len"],
                "extraction_method": p.get("extraction_method", "unknown"),
                "fallback_attempted": p.get("fallback_attempted", False),
                "likely_image_or_scan": p.get("likely_image_or_scan", False),
            }
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")


def save_jsonl(records: List[Dict[str, Any]], out_path: str) -> None:
    """Write a list of dicts to a JSONL file (overwrite)."""
    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")


def append_jsonl(records: List[Dict[str, Any]], out_path: str) -> None:
    """Append a list of dicts to a JSONL file."""
    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "a", encoding="utf-8") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")


# ==================================================================
#  Chunking, keyword gating, relevance scoring
# ==================================================================

def chunk_pages(pages: List[Dict[str, Any]], chunk_size: int, chunk_overlap: int) -> List[Dict[str, Any]]:
    """Concatenate pages with [PAGE x] markers, split into overlapping
    chunks, and map each chunk back to its source page (best-effort)."""
    parts = []
    for p in pages:
        parts.append(f"\n\n[PAGE {p['page_num']}]\n")
        parts.append(p["text"] or "")
    full_text = "".join(parts)

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    raw_chunks = splitter.split_text(full_text)

    marker = re.compile(r"\[PAGE\s+(\d+)\]")
    chunks = []
    for idx, ch in enumerate(raw_chunks):
        m = marker.search(ch)
        page_num = int(m.group(1)) if m else None
        chunks.append({"chunk_index": idx, "page_num": page_num, "text": ch.strip()})
    return chunks


def keyword_gate(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Retain only chunks matching the keyword regex (high-recall filter)."""
    return [c for c in chunks if KW_REGEX.search(c["text"])]


def l2_to_relevance(l2_dist: float) -> float:
    """Convert FAISS L2 distance (normalised vectors) to [0, 1] relevance.
    With L2-normalised vectors: cos_sim = 1 - d^2 / 2."""
    d = float(l2_dist)
    cos = 1.0 - (d * d) / 2.0
    return max(0.0, min(1.0, cos))


print("Helper functions defined.")

Helper functions defined.


## 5) Batch Processing -- Extract Full Text + Find Deconstruction Passages

This step loops over every PDF in the corpus.  For each document it:

1. **Extracts full text** page-by-page using the three-tier strategy
   (pdfplumber -> PyMuPDF -> Tesseract OCR), with explicit table extraction
   and quality flags -> `full_text/<pdf>.pages.jsonl`
2. **Splits** text into overlapping chunks (Section 3.1.2)
3. **Applies keyword gating** for high-recall screening (Section 3.1.3)
4. **Embeds** gated chunks and builds a per-document FAISS index (Section 3.1.4)
5. **Queries** the index with the construct-operationalising query set
6. **Retains** chunks above the calibrated relevance threshold -> `kept_chunks/`
7. **Appends** kept chunks to the master file `MASTER_kept.jsonl`

**Robustness:** Each PDF is processed inside a `try/except` block.
If a single document fails, the pipeline logs the failure and continues.
A summary table and failure report are printed at the end.

> **Tip:** For a quick pilot, uncomment `pdf_files = pdf_files[:3]` below.

In [7]:
# @title 5) Run batch passage-finding (main)
pdf_files = list_pdfs(PDF_DIR)
if not pdf_files:
    raise FileNotFoundError(f"No PDFs found under: {PDF_DIR}")
print("PDFs found:", len(pdf_files))

# Uncomment for a pilot run:
# pdf_files = pdf_files[:3]

full_text_dir = str(Path(OUT_DIR) / "outputs" / "full_text")
kept_dir      = str(Path(OUT_DIR) / "outputs" / "kept_chunks")
faiss_dir     = str(Path(OUT_DIR) / "outputs" / "faiss")
Path(full_text_dir).mkdir(parents=True, exist_ok=True)
Path(kept_dir).mkdir(parents=True, exist_ok=True)
Path(faiss_dir).mkdir(parents=True, exist_ok=True)

master_kept_path = str(Path(kept_dir) / "MASTER_kept.jsonl")
if not SKIP_ALREADY_PROCESSED:
    open(master_kept_path, "w", encoding="utf-8").close()

run_stats = []
failed_pdfs = []

for pdf_path in tqdm(pdf_files, desc="Processing PDFs"):
    t0 = time.time()
    pdf_name = os.path.basename(pdf_path)
    stem = safe_slug(Path(pdf_name).stem)

    # -- Resumability: skip if output already exists ---------------
    kept_out = str(Path(kept_dir) / f"{stem}.kept.jsonl")
    if SKIP_ALREADY_PROCESSED and Path(kept_out).exists():
        run_stats.append({
            "pdf": pdf_name, "pages": "-", "chunks": "-",
            "kw_chunks": "-", "kept": "-", "ocr_pages": "-",
            "sec": 0, "status": "skipped"
        })
        continue

    try:
        # -- 1) Extract full text (page-by-page, multi-engine) ----
        pages = extract_pages(pdf_path)
        full_out = str(Path(full_text_dir) / f"{stem}.pages.jsonl")
        save_pages_jsonl(pages, full_out, source_file=pdf_name)

        ocr_count = sum(1 for p in pages if p.get("fallback_attempted"))
        flagged_count = sum(1 for p in pages if p.get("likely_image_or_scan"))

        # -- 2) Chunk ----------------------------------------------
        chunks = chunk_pages(pages, CHUNK_SIZE, CHUNK_OVERLAP)

        # -- 3) Keyword gate ---------------------------------------
        chunks_for_embedding = keyword_gate(chunks) if USE_KEYWORD_GATE else chunks

        if not chunks_for_embedding:
            save_jsonl([], kept_out)
            run_stats.append({
                "pdf": pdf_name, "pages": len(pages), "chunks": len(chunks),
                "kw_chunks": 0, "kept": 0, "ocr_pages": ocr_count,
                "sec": round(time.time() - t0, 1), "status": "ok (no kw matches)"
            })
            continue

        texts = [c["text"] for c in chunks_for_embedding]
        metadatas = [{
            "source_file": pdf_name,
            "page_num": c["page_num"],
            "chunk_index": c["chunk_index"],
            "chunk_sha1": sha1(c["text"])[:16],
        } for c in chunks_for_embedding]

        # -- 4) Build FAISS ----------------------------------------
        vs = call_with_retry(
            FAISS.from_texts, texts, embeddings,
            metadatas=metadatas, normalize_L2=True
        )
        pdf_faiss_out = str(Path(faiss_dir) / stem)
        vs.save_local(pdf_faiss_out)

        # -- 5) Query and collect kept chunks ----------------------
        kept_map = {}
        for q in FILTER_QUERIES:
            results = call_with_retry(
                vs.similarity_search_with_score, q, k=TOP_K_PER_QUERY
            )
            for doc, score in results:
                rel = l2_to_relevance(score)
                if rel < RELEVANCE_THRESHOLD:
                    continue

                md_dict = dict(doc.metadata)
                key = f"{md_dict.get('chunk_index')}|{md_dict.get('chunk_sha1')}"
                rec = {
                    "chunk_id": f"{stem}::c{md_dict.get('chunk_index')}::{md_dict.get('chunk_sha1')}",
                    "source_file": md_dict.get("source_file"),
                    "page_num": md_dict.get("page_num"),
                    "chunk_index": md_dict.get("chunk_index"),
                    "chunk_sha1": md_dict.get("chunk_sha1"),
                    "matched_query": q,
                    "relevance": round(float(rel), 4),
                    "text": doc.page_content,
                }
                if key not in kept_map or rec["relevance"] > kept_map[key]["relevance"]:
                    kept_map[key] = rec

        kept_chunks = sorted(
            kept_map.values(),
            key=lambda r: (-r["relevance"], r["chunk_index"])
        )

        save_jsonl(kept_chunks, kept_out)
        append_jsonl(kept_chunks, master_kept_path)

        run_stats.append({
            "pdf": pdf_name, "pages": len(pages), "chunks": len(chunks),
            "kw_chunks": len(chunks_for_embedding), "kept": len(kept_chunks),
            "ocr_pages": ocr_count,
            "sec": round(time.time() - t0, 1), "status": "ok"
        })

    except Exception as e:
        failed_pdfs.append({"pdf": pdf_name, "error": str(e)})
        run_stats.append({
            "pdf": pdf_name, "pages": "?", "chunks": "?",
            "kw_chunks": "?", "kept": "?", "ocr_pages": "?",
            "sec": round(time.time() - t0, 1), "status": f"FAILED: {e}"
        })
        print(f"  FAILED: {pdf_name} -- {e}")

# -- Summary -------------------------------------------------------
stats_df = pd.DataFrame(run_stats)
display(stats_df)
ok_count = len([s for s in run_stats if str(s.get("status","")).startswith("ok")])
skip_count = len([s for s in run_stats if "skipped" in str(s.get("status",""))])
print(f"\nTotal: {len(pdf_files)} | OK: {ok_count} | Skipped: {skip_count} | Failed: {len(failed_pdfs)}")
print("Master kept chunks:", master_kept_path)

if failed_pdfs:
    print("\nFAILED PDFs:")
    for fp in failed_pdfs:
        print(f"  - {fp['pdf']}: {fp['error']}")

PDFs found: 95


Processing PDFs:   0%|          | 0/95 [00:00<?, ?it/s]

Unnamed: 0,pdf,pages,chunks,kw_chunks,kept,ocr_pages,sec,status
0,1.national-waste-and-resource-recovery-report-...,104,298,215,2,0,65.0,ok
1,10.Report Dust.pdf,2,2,0,0,2,16.8,ok (no kw matches)
2,11.Draft queenslands-waste-strategy2025-2030.pdf,18,56,18,0,1,32.6,ok
3,12.NT Budget 2024-25-bp2-book.pdf,126,308,8,0,7,140.0,ok
4,13.Delivery Program 2020-2026 annual progress ...,104,521,26,0,3,70.0,ok
...,...,...,...,...,...,...,...,...
90,91.A case study of construction and demolition...,12,40,11,4,0,11.0,ok
91,92.Transformation towards a circular economy i...,58,110,37,10,0,16.9,ok
92,93.ACE_Hub_Circularity_In_Australian_Business_...,39,58,47,0,1,19.8,ok
93,94.Circular-Economy-Impact-Note_Final-for-Publ...,8,20,19,0,1,10.5,ok



Total: 95 | OK: 95 | Skipped: 0 | Failed: 0
Master kept chunks: /content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/data_analysis/batch_enhanced_KG_outputs/outputs/kept_chunks/MASTER_kept.jsonl


## 6) Optional: LLM Judge -- Precision Filter (Section 3.2)

Each kept chunk is classified by the judge LLM as:

- **`keep`** -- Directly about building deconstruction, disassembly, soft
  strip, or selective demolition for material recovery/reuse; or identifies
  formal definitions, requirements, responsible agencies/stakeholders,
  permitted/prohibited practices, certification pathways for reused materials,
  or documented barriers to deconstruction practice.
- **`maybe`** -- Partial or contextual relevance.
- **`drop`** -- Irrelevant.

The judge uses deterministic decoding (`temperature=0`) and strict JSON output.

Output: `outputs/kept_chunks/MASTER_kept_judged.jsonl`

In [9]:
if USE_LLM_JUDGE and judge_llm is not None:
    in_path  = master_kept_path
    out_path = str(Path(kept_dir) / "MASTER_kept_judged.jsonl")
    open(out_path, "w", encoding="utf-8").close()

    JUDGE_PROMPT_TEMPLATE = (
        "You are a precision filter for a research pipeline analysing Australian "
        "building deconstruction governance.\n\n"
        "Return JSON only with keys: label, confidence, reason.\n\n"
        'label must be one of: "keep", "maybe", "drop"\n'
        "confidence must be a float between 0 and 1\n"
        "reason must be <= 25 words.\n\n"
        "Definitions:\n"
        "- keep: The passage directly concerns building deconstruction, disassembly, "
        "soft strip, or selective demolition for material recovery/reuse. This includes: "
        "formal definitions, legislative requirements, responsible agencies or stakeholders, "
        "permitted or prohibited practices, certification pathways for reused materials, "
        "or documented barriers (regulatory, economic, institutional) to deconstruction.\n"
        "- maybe: Partial or contextual relevance (e.g., general C&D waste management "
        "that may touch on reuse).\n"
        "- drop: No substantive connection to deconstruction, disassembly, or material reuse.\n\n"
        "Passage:\n{text}"
    )

    def judge_one(text: str) -> Dict[str, Any]:
        """Classify a single chunk as keep / maybe / drop."""
        prompt = JUDGE_PROMPT_TEMPLATE.format(text=text[:4000])
        resp = call_with_retry(judge_llm.invoke, prompt)
        try:
            content = resp.content if hasattr(resp, "content") else str(resp)
            # Strip markdown code fences if present
            content = re.sub(r"^```(?:json)?\s*", "", content.strip())
            content = re.sub(r"\s*```$", "", content.strip())
            parsed_json = json.loads(content)
            if isinstance(parsed_json, list) and len(parsed_json) > 0 and isinstance(parsed_json[0], dict):
                # If it's a list containing dicts, take the first dict
                return parsed_json[0]
            elif isinstance(parsed_json, dict):
                return parsed_json
            else:
                # If it's neither a dict nor a list of dicts, default
                return {"label": "maybe", "confidence": 0.5, "reason": "Malformed JSON response; defaulted."}
        except Exception:
            return {"label": "maybe", "confidence": 0.5, "reason": "Non-JSON response; defaulted."}

    total_lines = sum(1 for _ in open(in_path, "r", encoding="utf-8"))
    judge_stats = {"keep": 0, "maybe": 0, "drop": 0}

    with open(in_path, "r", encoding="utf-8") as f_in, \
         open(out_path, "a", encoding="utf-8") as f_out:
        for line in tqdm(f_in, desc="Judging chunks", total=total_lines):
            rec = json.loads(line)
            rec["judge"] = judge_one(rec["text"])
            label = rec["judge"].get("label", "maybe")
            judge_stats[label] = judge_stats.get(label, 0) + 1
            f_out.write(json.dumps(rec, ensure_ascii=False) + "\n")

    print("Judged file:", out_path)
    print("Label distribution:", judge_stats)
else:
    print("USE_LLM_JUDGE=False or judge_llm not initialised; skipping.")

Judging chunks:   0%|          | 0/364 [00:00<?, ?it/s]

Judged file: /content/drive/MyDrive/ACTIVE/AU_deconstruction_domain/data_analysis/batch_enhanced_KG_outputs/outputs/kept_chunks/MASTER_kept_judged.jsonl
Label distribution: {'keep': 152, 'maybe': 205, 'drop': 7}


## 7) Runtime Guidance

### Typical processing times for ~95 PDFs

| Stage | Cost driver | Typical time |
|---|---|---|
| Text extraction (with OCR fallback) | I/O + PDF parsing + OCR on flagged pages | ~5-15 min |
| Embedding | Number of keyword-gated chunks x API latency | ~5-15 min |
| LLM judge | Number of kept chunks x judge model latency | ~10-30 min |

### Speeding up

- `USE_KEYWORD_GATE = True` reduces embedding volume by ~60-80%
- Increase `CHUNK_SIZE` for fewer, coarser chunks
- Lower `TOP_K_PER_QUERY` for fewer candidate matches
- `SKIP_ALREADY_PROCESSED = True` enables crash-safe resumability
- Pilot on 2-3 PDFs first, then scale

### Resumability

When `SKIP_ALREADY_PROCESSED = True`, the pipeline checks whether a
`<pdf>.kept.jsonl` already exists.  If so, that PDF is skipped.  You can
safely restart after a crash and processing continues from where it stopped.

### Extraction quality flags

Each page in the `full_text/*.pages.jsonl` files includes:

- `extraction_method`: which engine produced the text (`pdfplumber`, `pymupdf`, `ocr_tesseract`)
- `fallback_attempted`: whether fallback engines were tried
- `likely_image_or_scan`: flagged `true` if text remains < 50 chars after all attempts

These flags support targeted follow-up extraction (e.g., specialised table
parsing or manual review) on pages where automated extraction was insufficient.