In [1]:
import requests
import json
from pathlib import Path
from lxml import etree as ET  # Replace the existing xml.etree.ElementTree import
import re
import unicodedata
import html
import requests
from bs4 import BeautifulSoup

In [2]:
# Get IDs
# Get IDs
# URL of the policy instruments page
url = 'https://www.tbs-sct.canada.ca/pol/a-z-eng.aspx'

# Set headers to mimic a browser visit
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
}

# Fetch the page content
response = requests.get(url, headers=headers)
response.raise_for_status()  # Raise an error for bad status codes

# Parse the HTML content
soup = BeautifulSoup(response.text, 'html.parser')

# Find all anchor tags with the document links and extract IDs
doc_ids_set = set()
for tag in soup.find_all('a', href=True):
    match = re.search(r'doc-eng\.aspx\?id=(\d+)', tag['href'])
    if match:
        doc_ids_set.add(int(match.group(1)))

print("Extracted document IDs:")
doc_ids = sorted(doc_ids_set)
print(doc_ids)

Extracted document IDs:
[12084, 12111, 12129, 12139, 12141, 12143, 12160, 12182, 12323, 12453, 12510, 12522, 12553, 12563, 12583, 12588, 12595, 12601, 12602, 12607, 12610, 12614, 13342, 13525, 13583, 13589, 13593, 13602, 13603, 13616, 13663, 13685, 13697, 13832, 13848, 13890, 13937, 13953, 13954, 14208, 14219, 14265, 15772, 15773, 15774, 15796, 16484, 16553, 16557, 16577, 16578, 17065, 17067, 17151, 17280, 17284, 17590, 18309, 18310, 19061, 19420, 19421, 19422, 20008, 20930, 21104, 22370, 22379, 23601, 24227, 24970, 25049, 25583, 25593, 25600, 25748, 25761, 25845, 25857, 25867, 25868, 25875, 26160, 26163, 26164, 26168, 26262, 26295, 26332, 26952, 26953, 26954, 27088, 27146, 27228, 27256, 27807, 28108, 28203, 28305, 28699, 28700, 30656, 30678, 30682, 30683, 31300, 31306, 32495, 32499, 32502, 32503, 32504, 32505, 32509, 32510, 32511, 32512, 32513, 32514, 32515, 32516, 32517, 32518, 32519, 32520, 32521, 32522, 32523, 32524, 32525, 32526, 32527, 32528, 32529, 32530, 32533, 32563, 32573, 32

In [10]:
skip_ids = ["12182"]

In [4]:
def clean_text_for_ai(text: str) -> str:
    # Normalize unicode
    text = unicodedata.normalize("NFKC", text)
    # Unescape HTML entities
    text = html.unescape(text)
    # Remove non-printable/control characters
    text = ''.join(c for c in text if c.isprintable())
    # Collapse whitespace
    text = re.sub(r"\s+", " ", text)
    # Remove leading/trailing whitespace again
    text = text.strip()
    return text

def fetch_xml(doc_id: int, lang: str) -> str:
    """
    Fetches the XML section for the given document ID and language code.
    """
    url = f"https://www.tbs-sct.canada.ca/pol/doc-{lang}.aspx?id={doc_id}&section=xml"
    resp = requests.get(url)
    resp.raise_for_status()
    return resp.content

In [5]:
def extract_nodes_xml(xml_bytes: bytes) -> list[str]:
    """
    Parses XML bytes, finds all <p> and <li> within the main body,
    excluding content in appendices sections.
    Returns their text contents as a list.
    """
    parser = ET.XMLParser(encoding='utf-8')
    root = ET.fromstring(xml_bytes, parser=parser)
    items = []
    
    # Find all appendices sections first
    appendices = root.findall(".//appendices")
    
    def is_in_appendices(elem):
        """Check if element is inside any appendices section"""
        parent = elem.getparent()
        while parent is not None:
            if parent in appendices:
                return True
            parent = parent.getparent()
        return False

    # Find all p and li elements
    for elem in root.findall(".//p") + root.findall(".//li") + root.findall(".//clause"):
        if is_in_appendices(elem):
            continue
            
        # Get all text content, including tail text
        text_parts = []
        if elem.text:
            text_parts.append(elem.text)
        for child in elem:
            if child.tail:
                text_parts.append(child.tail)
        
        text = " ".join(text_parts)
        clean = clean_text_for_ai(text)
        if len(clean) > 2:  # skip empty/very short
            items.append(clean)
            
    return items

def scrape_doc_bilingual(doc_id: int) -> dict[str, list[str]]:
    data = {}
    for lang_code, key in [("eng", "en"), ("fra", "fr")]:
        xml = fetch_xml(doc_id, lang_code)
        items = extract_nodes_xml(xml)
        data[key] = items
    # Align by index, truncate to shortest length
    min_len = min(len(data["en"]), len(data["fr"]))
    pairs = [{"en": data["en"][i], "fr": data["fr"][i]} for i in range(min_len)]
    return pairs
    

In [6]:
def extract_block_children_with_anchor(xml_bytes: bytes) -> list[dict]:
    """
    For each element with an anchor, extract each direct child block-level element as its own entry,
    using the parent's anchor. Skips elements with class containing 'hidden' or 'invisible'.
    Returns a list of {"anchor": anchor, "tag": tag, "text": text}.
    """
    BLOCK_TAGS = {
        "section", "div", "p", "li", "h1", "h2", "h3", "h4", "h5", "h6",
        "chapter", "clause", "article", "ul", "ol", "table", "thead", "tbody", "tr", "td", "th"
    }
    parser = ET.XMLParser(encoding='utf-8')
    root = ET.fromstring(xml_bytes, parser=parser)
    items = []

    appendices = root.findall(".//appendices")

    def is_in_appendices(elem):
        parent = elem.getparent()
        while parent is not None:
            if parent in appendices:
                return True
            parent = parent.getparent()
        return False

    def has_hidden_class(elem):
        cls = elem.get("class", "")
        return any(word in cls for word in ("hidden", "invisible"))

    for elem in root.iter():
        anchor = elem.get("anchor")
        if anchor:
            for child in elem:
                if child.tag in BLOCK_TAGS and not is_in_appendices(child):
                    # Skip if this element or any descendant has a hidden/invisible class
                    skip = False
                    for descendant in child.iter():
                        if has_hidden_class(descendant):
                            skip = True
                            break
                    if skip:
                        continue
                    text = "".join(child.itertext()).strip()
                    clean = clean_text_for_ai(text)
                    if clean and len(clean) > 2:
                        items.append({
                            "anchor": anchor,
                            "tag": child.tag,
                            "text": clean
                        })
    return items

def scrape_doc_bilingual_by_anchor(doc_id: int) -> list[dict]:
    """
    Aligns English and French block-level nodes by anchor property and order.
    Returns a list of {"en": ..., "fr": ..., "anchor": ..., "tag": ...}
    """
    data = {}
    for lang_code, key in [("eng", "en"), ("fra", "fr")]:
        xml = fetch_xml(doc_id, lang_code)
        items = extract_block_children_with_anchor(xml)
        data[key] = items

    # Build anchor -> list of items mapping for each language
    def build_map(items):
        amap = {}
        for item in items:
            if item["anchor"]:
                amap.setdefault(item["anchor"], []).append(item)
        return amap

    en_map = build_map(data["en"])
    fr_map = build_map(data["fr"])

    # Only keep anchors present in both
    common_anchors = set(en_map) & set(fr_map)
    pairs = []
    for anchor in sorted(common_anchors):
        en_items = en_map[anchor]
        fr_items = fr_map[anchor]
        min_len = min(len(en_items), len(fr_items))
        for i in range(min_len):
            pairs.append({
                "en": en_items[i]["text"],
                "fr": fr_items[i]["text"],
                "anchor": anchor,
                "tag": en_items[i]["tag"]
            })
    return pairs

In [7]:
print("Starting new scraping run...\n")
all_docs = {}

# Ensure output directory exists
out_path = Path("output/docs")
out_path.mkdir(exist_ok=True)

for did in doc_ids:
    out_file = out_path / f"{did}.json"
    if out_file.exists():
        print(f"Skipping doc_id {did} (output exists)")
        continue
    try:
        all_docs[str(did)] = scrape_doc_bilingual_by_anchor(did)
        print("doc_id: ", did,  " - Pairs: ", len(all_docs[str(did)]))
        with open(out_path / f"{did}.json", "w", encoding="utf-8") as f:
            json.dump(all_docs[str(did)], f, ensure_ascii=False, indent=2)
    except Exception as e:
        print(f"Error on ID {did}: {e}")

Starting new scraping run...

Skipping doc_id 12084 (output exists)
Skipping doc_id 12111 (output exists)
Skipping doc_id 12129 (output exists)
Skipping doc_id 12139 (output exists)
Skipping doc_id 12141 (output exists)
Skipping doc_id 12143 (output exists)
Skipping doc_id 12160 (output exists)
Skipping doc_id 12182 (output exists)
Skipping doc_id 12323 (output exists)
Skipping doc_id 12453 (output exists)
Skipping doc_id 12510 (output exists)
Skipping doc_id 12522 (output exists)
Skipping doc_id 12553 (output exists)
Skipping doc_id 12563 (output exists)
Skipping doc_id 12583 (output exists)
Skipping doc_id 12588 (output exists)
Skipping doc_id 12595 (output exists)
Skipping doc_id 12601 (output exists)
Skipping doc_id 12602 (output exists)
Skipping doc_id 12607 (output exists)
Skipping doc_id 12610 (output exists)
Skipping doc_id 12614 (output exists)
Skipping doc_id 13342 (output exists)
Skipping doc_id 13525 (output exists)
Skipping doc_id 13583 (output exists)
Skipping doc_id 1358

In [13]:
# Collect all en/fr pairs from each JSON file in output/docs
all_pairs = []
all_pairs_indexed = []
docs_path = Path("output/docs")


for file in docs_path.glob("*.json"):
    did = file.stem
    if did in skip_ids:
        continue
    with open(file, "r", encoding="utf-8") as f:
        doc_pairs = json.load(f)
        # If the file contains a dict (old format), flatten its values
        if isinstance(doc_pairs, dict):
            for pairs in doc_pairs.values():
                for pair in pairs:
                    all_pairs.append({"en": pair.get("en", ""), "fr": pair.get("fr", "")})
                    indexed = dict(pair)
                    indexed["did"] = did
                    all_pairs_indexed.append(indexed)
        else:
            for pair in doc_pairs:
                all_pairs.append({"en": pair.get("en", ""), "fr": pair.get("fr", "")})
                indexed = dict(pair)
                indexed["did"] = did
                all_pairs_indexed.append(indexed)

# Write the flattened list to a new JSON file
with open(Path("output") / "all_pairs.json", "w", encoding="utf-8") as f:
    json.dump(all_pairs, f, ensure_ascii=False, indent=2)

    # Write the indexed list to a new JSON file
with open(Path("output") / "all_pairs_indexed.json", "w", encoding="utf-8") as f:
    json.dump(all_pairs_indexed, f, ensure_ascii=False, indent=2)

print(f"Wrote {len(all_pairs)} pairs to {Path('output') / 'all_pairs.json'}")

Wrote 8128 pairs to output\all_pairs.json
